src/pqc_ebpf_attestation/audit.py
2.1 KB · 76 lines · python Raw
1 """Append-only audit log for eBPF load attempts."""
2
3 from __future__ import annotations
4
5 import json
6 from dataclasses import asdict, dataclass
7 from datetime import datetime, timezone
8 from typing import Any
9
10 from pqc_ebpf_attestation.policy import PolicyDecision
11 from pqc_ebpf_attestation.signer import SignedBPFProgram
12
13
14 @dataclass
15 class AttestationLogEntry:
16 timestamp: str
17 program_name: str
18 program_type: str
19 bytecode_hash: str
20 signer_did: str
21 decision: str # "allow" | "deny"
22 reason: str
23 actor: str = "" # who initiated the load (user/service)
24
25 def to_dict(self) -> dict[str, Any]:
26 return asdict(self)
27
28
29 class AttestationLog:
30 """Append-only in-memory log of eBPF load decisions."""
31
32 def __init__(self, max_entries: int = 100_000) -> None:
33 self._entries: list[AttestationLogEntry] = []
34 self._max = max_entries
35
36 def log(
37 self,
38 signed: SignedBPFProgram,
39 decision: PolicyDecision,
40 reason: str,
41 actor: str = "",
42 ) -> AttestationLogEntry:
43 entry = AttestationLogEntry(
44 timestamp=datetime.now(timezone.utc).isoformat(),
45 program_name=signed.program.metadata.name,
46 program_type=signed.program.metadata.program_type.value,
47 bytecode_hash=signed.program.bytecode_hash,
48 signer_did=signed.signer_did,
49 decision=decision.value,
50 reason=reason,
51 actor=actor,
52 )
53 if len(self._entries) >= self._max:
54 self._entries.pop(0)
55 self._entries.append(entry)
56 return entry
57
58 def entries(
59 self,
60 limit: int = 100,
61 decision: str | None = None,
62 signer_did: str | None = None,
63 ) -> list[AttestationLogEntry]:
64 out = self._entries
65 if decision:
66 out = [e for e in out if e.decision == decision]
67 if signer_did:
68 out = [e for e in out if e.signer_did == signer_did]
69 return out[-limit:][::-1]
70
71 def export_json(self) -> str:
72 return json.dumps([e.to_dict() for e in self._entries], indent=2)
73
74 def __len__(self) -> int:
75 return len(self._entries)
76