src/pqc_rag_signing/audit.py
3.3 KB · 125 lines · python Raw
1 """Audit log for RAG retrieval events."""
2
3 from __future__ import annotations
4
5 import json
6 from dataclasses import asdict, dataclass
7 from datetime import datetime, timezone
8
9
10 @dataclass
11 class RAGAuditEntry:
12 """A single RAG retrieval event logged for audit."""
13
14 timestamp: str
15 operation: str
16 corpus_id: str | None
17 chunk_id: str | None
18 signer_did: str | None
19 algorithm: str | None
20 verified: bool
21 query_hash: str | None = None
22 details: str | None = None
23
24 def to_dict(self) -> dict:
25 return asdict(self)
26
27
28 class RAGAuditLog:
29 """Append-only audit log for RAG operations.
30
31 Production usage: persist entries to a real log backend. This class gives
32 you an in-memory structure with export for integrations.
33 """
34
35 def __init__(self, max_entries: int = 100_000) -> None:
36 self._entries: list[RAGAuditEntry] = []
37 self._max = max_entries
38
39 def log(self, entry: RAGAuditEntry) -> None:
40 if len(self._entries) >= self._max:
41 self._entries.pop(0)
42 self._entries.append(entry)
43
44 def log_sign(
45 self,
46 corpus_id: str,
47 chunk_id: str,
48 signer_did: str,
49 algorithm: str,
50 ) -> None:
51 self.log(
52 RAGAuditEntry(
53 timestamp=datetime.now(timezone.utc).isoformat(),
54 operation="sign_chunk",
55 corpus_id=corpus_id,
56 chunk_id=chunk_id,
57 signer_did=signer_did,
58 algorithm=algorithm,
59 verified=True,
60 )
61 )
62
63 def log_verify(
64 self,
65 chunk_id: str,
66 signer_did: str | None,
67 algorithm: str | None,
68 verified: bool,
69 details: str | None = None,
70 ) -> None:
71 self.log(
72 RAGAuditEntry(
73 timestamp=datetime.now(timezone.utc).isoformat(),
74 operation="verify_chunk",
75 corpus_id=None,
76 chunk_id=chunk_id,
77 signer_did=signer_did,
78 algorithm=algorithm,
79 verified=verified,
80 details=details,
81 )
82 )
83
84 def log_retrieval(
85 self,
86 query_hash: str,
87 verified_count: int,
88 failed_count: int,
89 ) -> None:
90 self.log(
91 RAGAuditEntry(
92 timestamp=datetime.now(timezone.utc).isoformat(),
93 operation="retrieve",
94 corpus_id=None,
95 chunk_id=None,
96 signer_did=None,
97 algorithm=None,
98 verified=failed_count == 0,
99 query_hash=query_hash,
100 details=f"{verified_count} verified, {failed_count} failed",
101 )
102 )
103
104 def entries(
105 self,
106 limit: int = 100,
107 operation: str | None = None,
108 signer_did: str | None = None,
109 ) -> list[RAGAuditEntry]:
110 filtered = self._entries
111 if operation:
112 filtered = [e for e in filtered if e.operation == operation]
113 if signer_did:
114 filtered = [e for e in filtered if e.signer_did == signer_did]
115 return filtered[-limit:][::-1]
116
117 def export_json(self) -> str:
118 return json.dumps([e.to_dict() for e in self._entries], indent=2)
119
120 def clear(self) -> None:
121 self._entries.clear()
122
123 def __len__(self) -> int:
124 return len(self._entries)
125