src/pqc_kv_cache/audit.py
3.5 KB · 126 lines · python Raw
1 """Append-only audit log for KV cache operations."""
2
3 from __future__ import annotations
4
5 import json
6 from dataclasses import asdict, dataclass
7 from datetime import datetime, timezone
8 from typing import Any
9
10
11 @dataclass
12 class KVAuditEntry:
13 timestamp: str
14 operation: str # 'encrypt' | 'decrypt' | 'rotate' | 'isolation-violation'
15 tenant_id: str
16 session_id: str
17 layer_idx: int = -1
18 position: int = -1
19 sequence_number: int = -1
20 success: bool = True
21 details: str = ""
22
23 def to_dict(self) -> dict[str, Any]:
24 return asdict(self)
25
26
27 class KVAuditLog:
28 def __init__(self, max_entries: int = 1_000_000) -> None:
29 self._entries: list[KVAuditEntry] = []
30 self._max = max_entries
31
32 def log(self, entry: KVAuditEntry) -> None:
33 if len(self._entries) >= self._max:
34 self._entries.pop(0)
35 self._entries.append(entry)
36
37 def log_encrypt(
38 self,
39 tenant_id: str,
40 session_id: str,
41 layer_idx: int,
42 position: int,
43 seq: int,
44 ) -> None:
45 self.log(
46 KVAuditEntry(
47 timestamp=datetime.now(timezone.utc).isoformat(),
48 operation="encrypt",
49 tenant_id=tenant_id,
50 session_id=session_id,
51 layer_idx=layer_idx,
52 position=position,
53 sequence_number=seq,
54 success=True,
55 )
56 )
57
58 def log_decrypt(
59 self,
60 tenant_id: str,
61 session_id: str,
62 layer_idx: int,
63 position: int,
64 seq: int,
65 success: bool,
66 details: str = "",
67 ) -> None:
68 self.log(
69 KVAuditEntry(
70 timestamp=datetime.now(timezone.utc).isoformat(),
71 operation="decrypt",
72 tenant_id=tenant_id,
73 session_id=session_id,
74 layer_idx=layer_idx,
75 position=position,
76 sequence_number=seq,
77 success=success,
78 details=details,
79 )
80 )
81
82 def log_rotate(self, tenant_id: str, session_id: str, trigger: str) -> None:
83 self.log(
84 KVAuditEntry(
85 timestamp=datetime.now(timezone.utc).isoformat(),
86 operation="rotate",
87 tenant_id=tenant_id,
88 session_id=session_id,
89 success=True,
90 details=f"trigger={trigger}",
91 )
92 )
93
94 def log_isolation_violation(
95 self, attacker_tenant: str, target_tenant: str, details: str = ""
96 ) -> None:
97 self.log(
98 KVAuditEntry(
99 timestamp=datetime.now(timezone.utc).isoformat(),
100 operation="isolation-violation",
101 tenant_id=attacker_tenant,
102 session_id="",
103 success=False,
104 details=f"target={target_tenant}; {details}",
105 )
106 )
107
108 def entries(
109 self,
110 limit: int = 100,
111 tenant_id: str | None = None,
112 operation: str | None = None,
113 ) -> list[KVAuditEntry]:
114 out = self._entries
115 if tenant_id:
116 out = [e for e in out if e.tenant_id == tenant_id]
117 if operation:
118 out = [e for e in out if e.operation == operation]
119 return out[-limit:][::-1]
120
121 def export_json(self) -> str:
122 return json.dumps([e.to_dict() for e in self._entries], indent=2)
123
124 def __len__(self) -> int:
125 return len(self._entries)
126