src/pqc_kv_cache/audit.py
| 1 | """Append-only audit log for KV cache operations.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | from dataclasses import asdict, dataclass |
| 7 | from datetime import datetime, timezone |
| 8 | from typing import Any |
| 9 | |
| 10 | |
| 11 | @dataclass |
| 12 | class KVAuditEntry: |
| 13 | timestamp: str |
| 14 | operation: str # 'encrypt' | 'decrypt' | 'rotate' | 'isolation-violation' |
| 15 | tenant_id: str |
| 16 | session_id: str |
| 17 | layer_idx: int = -1 |
| 18 | position: int = -1 |
| 19 | sequence_number: int = -1 |
| 20 | success: bool = True |
| 21 | details: str = "" |
| 22 | |
| 23 | def to_dict(self) -> dict[str, Any]: |
| 24 | return asdict(self) |
| 25 | |
| 26 | |
| 27 | class KVAuditLog: |
| 28 | def __init__(self, max_entries: int = 1_000_000) -> None: |
| 29 | self._entries: list[KVAuditEntry] = [] |
| 30 | self._max = max_entries |
| 31 | |
| 32 | def log(self, entry: KVAuditEntry) -> None: |
| 33 | if len(self._entries) >= self._max: |
| 34 | self._entries.pop(0) |
| 35 | self._entries.append(entry) |
| 36 | |
| 37 | def log_encrypt( |
| 38 | self, |
| 39 | tenant_id: str, |
| 40 | session_id: str, |
| 41 | layer_idx: int, |
| 42 | position: int, |
| 43 | seq: int, |
| 44 | ) -> None: |
| 45 | self.log( |
| 46 | KVAuditEntry( |
| 47 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 48 | operation="encrypt", |
| 49 | tenant_id=tenant_id, |
| 50 | session_id=session_id, |
| 51 | layer_idx=layer_idx, |
| 52 | position=position, |
| 53 | sequence_number=seq, |
| 54 | success=True, |
| 55 | ) |
| 56 | ) |
| 57 | |
| 58 | def log_decrypt( |
| 59 | self, |
| 60 | tenant_id: str, |
| 61 | session_id: str, |
| 62 | layer_idx: int, |
| 63 | position: int, |
| 64 | seq: int, |
| 65 | success: bool, |
| 66 | details: str = "", |
| 67 | ) -> None: |
| 68 | self.log( |
| 69 | KVAuditEntry( |
| 70 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 71 | operation="decrypt", |
| 72 | tenant_id=tenant_id, |
| 73 | session_id=session_id, |
| 74 | layer_idx=layer_idx, |
| 75 | position=position, |
| 76 | sequence_number=seq, |
| 77 | success=success, |
| 78 | details=details, |
| 79 | ) |
| 80 | ) |
| 81 | |
| 82 | def log_rotate(self, tenant_id: str, session_id: str, trigger: str) -> None: |
| 83 | self.log( |
| 84 | KVAuditEntry( |
| 85 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 86 | operation="rotate", |
| 87 | tenant_id=tenant_id, |
| 88 | session_id=session_id, |
| 89 | success=True, |
| 90 | details=f"trigger={trigger}", |
| 91 | ) |
| 92 | ) |
| 93 | |
| 94 | def log_isolation_violation( |
| 95 | self, attacker_tenant: str, target_tenant: str, details: str = "" |
| 96 | ) -> None: |
| 97 | self.log( |
| 98 | KVAuditEntry( |
| 99 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 100 | operation="isolation-violation", |
| 101 | tenant_id=attacker_tenant, |
| 102 | session_id="", |
| 103 | success=False, |
| 104 | details=f"target={target_tenant}; {details}", |
| 105 | ) |
| 106 | ) |
| 107 | |
| 108 | def entries( |
| 109 | self, |
| 110 | limit: int = 100, |
| 111 | tenant_id: str | None = None, |
| 112 | operation: str | None = None, |
| 113 | ) -> list[KVAuditEntry]: |
| 114 | out = self._entries |
| 115 | if tenant_id: |
| 116 | out = [e for e in out if e.tenant_id == tenant_id] |
| 117 | if operation: |
| 118 | out = [e for e in out if e.operation == operation] |
| 119 | return out[-limit:][::-1] |
| 120 | |
| 121 | def export_json(self) -> str: |
| 122 | return json.dumps([e.to_dict() for e in self._entries], indent=2) |
| 123 | |
| 124 | def __len__(self) -> int: |
| 125 | return len(self._entries) |
| 126 | |