src/pqc_mcp_transport/audit.py
| 1 | """Audit logging for PQC MCP Transport operations.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | from dataclasses import dataclass, field, asdict |
| 7 | from datetime import datetime, timezone |
| 8 | |
| 9 | |
| 10 | @dataclass |
| 11 | class AuditEntry: |
| 12 | """A single entry in the PQC audit log.""" |
| 13 | |
| 14 | timestamp: str |
| 15 | session_id: str |
| 16 | operation: str # 'handshake', 'tool_call', 'tool_response', 'verify' |
| 17 | method: str | None |
| 18 | signer_did: str |
| 19 | peer_did: str | None |
| 20 | algorithm: str |
| 21 | signature_truncated: str # first 32 chars of hex signature |
| 22 | verified: bool |
| 23 | details: str | None = None |
| 24 | |
| 25 | |
| 26 | class AuditLog: |
| 27 | """Thread-safe audit log for PQC operations.""" |
| 28 | |
| 29 | def __init__(self, max_entries: int = 10000) -> None: |
| 30 | self._entries: list[AuditEntry] = [] |
| 31 | self._max = max_entries |
| 32 | |
| 33 | def log(self, entry: AuditEntry) -> None: |
| 34 | """Append an audit entry, evicting oldest if at capacity.""" |
| 35 | if len(self._entries) >= self._max: |
| 36 | self._entries.pop(0) |
| 37 | self._entries.append(entry) |
| 38 | |
| 39 | def get_entries( |
| 40 | self, |
| 41 | limit: int = 100, |
| 42 | signer_did: str | None = None, |
| 43 | ) -> list[AuditEntry]: |
| 44 | """Return recent audit entries, optionally filtered by signer DID.""" |
| 45 | entries = self._entries |
| 46 | if signer_did is not None: |
| 47 | entries = [e for e in entries if e.signer_did == signer_did] |
| 48 | return entries[-limit:] |
| 49 | |
| 50 | def export_json(self) -> str: |
| 51 | """Export the full audit log as a JSON string.""" |
| 52 | return json.dumps([asdict(e) for e in self._entries], indent=2) |
| 53 | |
| 54 | def clear(self) -> None: |
| 55 | """Remove all entries from the log.""" |
| 56 | self._entries.clear() |
| 57 | |