src/pqc_bootloader/audit.py
| 1 | """Append-only log of boot attempts.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | from dataclasses import asdict, dataclass |
| 7 | from datetime import datetime, timezone |
| 8 | from typing import Any |
| 9 | |
| 10 | |
| 11 | @dataclass |
| 12 | class BootAttemptEntry: |
| 13 | timestamp: str |
| 14 | firmware_name: str |
| 15 | firmware_version: str |
| 16 | firmware_hash: str |
| 17 | decision: str # "accept" | "reject" |
| 18 | reason: str |
| 19 | device_id: str = "" # identifier of the appliance |
| 20 | pcr_value_after: str = "" # final PCR after measurements (if captured) |
| 21 | |
| 22 | def to_dict(self) -> dict[str, Any]: |
| 23 | return asdict(self) |
| 24 | |
| 25 | |
| 26 | class BootAttestationLog: |
| 27 | def __init__(self, max_entries: int = 100_000) -> None: |
| 28 | self._entries: list[BootAttemptEntry] = [] |
| 29 | self._max = max_entries |
| 30 | |
| 31 | def log_accept( |
| 32 | self, |
| 33 | firmware_name: str, |
| 34 | firmware_version: str, |
| 35 | firmware_hash: str, |
| 36 | reason: str = "", |
| 37 | device_id: str = "", |
| 38 | pcr_value_after: str = "", |
| 39 | ) -> BootAttemptEntry: |
| 40 | entry = BootAttemptEntry( |
| 41 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 42 | firmware_name=firmware_name, |
| 43 | firmware_version=firmware_version, |
| 44 | firmware_hash=firmware_hash, |
| 45 | decision="accept", |
| 46 | reason=reason or "all checks passed", |
| 47 | device_id=device_id, |
| 48 | pcr_value_after=pcr_value_after, |
| 49 | ) |
| 50 | self._append(entry) |
| 51 | return entry |
| 52 | |
| 53 | def log_reject( |
| 54 | self, |
| 55 | firmware_name: str, |
| 56 | firmware_version: str, |
| 57 | firmware_hash: str, |
| 58 | reason: str, |
| 59 | device_id: str = "", |
| 60 | ) -> BootAttemptEntry: |
| 61 | entry = BootAttemptEntry( |
| 62 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 63 | firmware_name=firmware_name, |
| 64 | firmware_version=firmware_version, |
| 65 | firmware_hash=firmware_hash, |
| 66 | decision="reject", |
| 67 | reason=reason, |
| 68 | device_id=device_id, |
| 69 | ) |
| 70 | self._append(entry) |
| 71 | return entry |
| 72 | |
| 73 | def _append(self, entry: BootAttemptEntry) -> None: |
| 74 | if len(self._entries) >= self._max: |
| 75 | self._entries.pop(0) |
| 76 | self._entries.append(entry) |
| 77 | |
| 78 | def entries( |
| 79 | self, |
| 80 | limit: int = 100, |
| 81 | decision: str | None = None, |
| 82 | ) -> list[BootAttemptEntry]: |
| 83 | out = self._entries |
| 84 | if decision: |
| 85 | out = [e for e in out if e.decision == decision] |
| 86 | return out[-limit:][::-1] |
| 87 | |
| 88 | def export_json(self) -> str: |
| 89 | return json.dumps([e.to_dict() for e in self._entries], indent=2) |
| 90 | |
| 91 | def __len__(self) -> int: |
| 92 | return len(self._entries) |
| 93 | |