src/pqc_ebpf_attestation/audit.py
| 1 | """Append-only audit log for eBPF load attempts.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | from dataclasses import asdict, dataclass |
| 7 | from datetime import datetime, timezone |
| 8 | from typing import Any |
| 9 | |
| 10 | from pqc_ebpf_attestation.policy import PolicyDecision |
| 11 | from pqc_ebpf_attestation.signer import SignedBPFProgram |
| 12 | |
| 13 | |
| 14 | @dataclass |
| 15 | class AttestationLogEntry: |
| 16 | timestamp: str |
| 17 | program_name: str |
| 18 | program_type: str |
| 19 | bytecode_hash: str |
| 20 | signer_did: str |
| 21 | decision: str # "allow" | "deny" |
| 22 | reason: str |
| 23 | actor: str = "" # who initiated the load (user/service) |
| 24 | |
| 25 | def to_dict(self) -> dict[str, Any]: |
| 26 | return asdict(self) |
| 27 | |
| 28 | |
| 29 | class AttestationLog: |
| 30 | """Append-only in-memory log of eBPF load decisions.""" |
| 31 | |
| 32 | def __init__(self, max_entries: int = 100_000) -> None: |
| 33 | self._entries: list[AttestationLogEntry] = [] |
| 34 | self._max = max_entries |
| 35 | |
| 36 | def log( |
| 37 | self, |
| 38 | signed: SignedBPFProgram, |
| 39 | decision: PolicyDecision, |
| 40 | reason: str, |
| 41 | actor: str = "", |
| 42 | ) -> AttestationLogEntry: |
| 43 | entry = AttestationLogEntry( |
| 44 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 45 | program_name=signed.program.metadata.name, |
| 46 | program_type=signed.program.metadata.program_type.value, |
| 47 | bytecode_hash=signed.program.bytecode_hash, |
| 48 | signer_did=signed.signer_did, |
| 49 | decision=decision.value, |
| 50 | reason=reason, |
| 51 | actor=actor, |
| 52 | ) |
| 53 | if len(self._entries) >= self._max: |
| 54 | self._entries.pop(0) |
| 55 | self._entries.append(entry) |
| 56 | return entry |
| 57 | |
| 58 | def entries( |
| 59 | self, |
| 60 | limit: int = 100, |
| 61 | decision: str | None = None, |
| 62 | signer_did: str | None = None, |
| 63 | ) -> list[AttestationLogEntry]: |
| 64 | out = self._entries |
| 65 | if decision: |
| 66 | out = [e for e in out if e.decision == decision] |
| 67 | if signer_did: |
| 68 | out = [e for e in out if e.signer_did == signer_did] |
| 69 | return out[-limit:][::-1] |
| 70 | |
| 71 | def export_json(self) -> str: |
| 72 | return json.dumps([e.to_dict() for e in self._entries], indent=2) |
| 73 | |
| 74 | def __len__(self) -> int: |
| 75 | return len(self._entries) |
| 76 | |