src/pqc_mcp_transport/audit.py
1.6 KB · 57 lines · python Raw
1 """Audit logging for PQC MCP Transport operations."""
2
3 from __future__ import annotations
4
5 import json
6 from dataclasses import dataclass, field, asdict
7 from datetime import datetime, timezone
8
9
10 @dataclass
11 class AuditEntry:
12 """A single entry in the PQC audit log."""
13
14 timestamp: str
15 session_id: str
16 operation: str # 'handshake', 'tool_call', 'tool_response', 'verify'
17 method: str | None
18 signer_did: str
19 peer_did: str | None
20 algorithm: str
21 signature_truncated: str # first 32 chars of hex signature
22 verified: bool
23 details: str | None = None
24
25
26 class AuditLog:
27 """Thread-safe audit log for PQC operations."""
28
29 def __init__(self, max_entries: int = 10000) -> None:
30 self._entries: list[AuditEntry] = []
31 self._max = max_entries
32
33 def log(self, entry: AuditEntry) -> None:
34 """Append an audit entry, evicting oldest if at capacity."""
35 if len(self._entries) >= self._max:
36 self._entries.pop(0)
37 self._entries.append(entry)
38
39 def get_entries(
40 self,
41 limit: int = 100,
42 signer_did: str | None = None,
43 ) -> list[AuditEntry]:
44 """Return recent audit entries, optionally filtered by signer DID."""
45 entries = self._entries
46 if signer_did is not None:
47 entries = [e for e in entries if e.signer_did == signer_did]
48 return entries[-limit:]
49
50 def export_json(self) -> str:
51 """Export the full audit log as a JSON string."""
52 return json.dumps([asdict(e) for e in self._entries], indent=2)
53
54 def clear(self) -> None:
55 """Remove all entries from the log."""
56 self._entries.clear()
57