src/pqc_bootloader/audit.py
2.6 KB · 93 lines · python Raw
1 """Append-only log of boot attempts."""
2
3 from __future__ import annotations
4
5 import json
6 from dataclasses import asdict, dataclass
7 from datetime import datetime, timezone
8 from typing import Any
9
10
11 @dataclass
12 class BootAttemptEntry:
13 timestamp: str
14 firmware_name: str
15 firmware_version: str
16 firmware_hash: str
17 decision: str # "accept" | "reject"
18 reason: str
19 device_id: str = "" # identifier of the appliance
20 pcr_value_after: str = "" # final PCR after measurements (if captured)
21
22 def to_dict(self) -> dict[str, Any]:
23 return asdict(self)
24
25
26 class BootAttestationLog:
27 def __init__(self, max_entries: int = 100_000) -> None:
28 self._entries: list[BootAttemptEntry] = []
29 self._max = max_entries
30
31 def log_accept(
32 self,
33 firmware_name: str,
34 firmware_version: str,
35 firmware_hash: str,
36 reason: str = "",
37 device_id: str = "",
38 pcr_value_after: str = "",
39 ) -> BootAttemptEntry:
40 entry = BootAttemptEntry(
41 timestamp=datetime.now(timezone.utc).isoformat(),
42 firmware_name=firmware_name,
43 firmware_version=firmware_version,
44 firmware_hash=firmware_hash,
45 decision="accept",
46 reason=reason or "all checks passed",
47 device_id=device_id,
48 pcr_value_after=pcr_value_after,
49 )
50 self._append(entry)
51 return entry
52
53 def log_reject(
54 self,
55 firmware_name: str,
56 firmware_version: str,
57 firmware_hash: str,
58 reason: str,
59 device_id: str = "",
60 ) -> BootAttemptEntry:
61 entry = BootAttemptEntry(
62 timestamp=datetime.now(timezone.utc).isoformat(),
63 firmware_name=firmware_name,
64 firmware_version=firmware_version,
65 firmware_hash=firmware_hash,
66 decision="reject",
67 reason=reason,
68 device_id=device_id,
69 )
70 self._append(entry)
71 return entry
72
73 def _append(self, entry: BootAttemptEntry) -> None:
74 if len(self._entries) >= self._max:
75 self._entries.pop(0)
76 self._entries.append(entry)
77
78 def entries(
79 self,
80 limit: int = 100,
81 decision: str | None = None,
82 ) -> list[BootAttemptEntry]:
83 out = self._entries
84 if decision:
85 out = [e for e in out if e.decision == decision]
86 return out[-limit:][::-1]
87
88 def export_json(self) -> str:
89 return json.dumps([e.to_dict() for e in self._entries], indent=2)
90
91 def __len__(self) -> int:
92 return len(self._entries)
93