src/pqc_rag_signing/audit.py
| 1 | """Audit log for RAG retrieval events.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | from dataclasses import asdict, dataclass |
| 7 | from datetime import datetime, timezone |
| 8 | |
| 9 | |
| 10 | @dataclass |
| 11 | class RAGAuditEntry: |
| 12 | """A single RAG retrieval event logged for audit.""" |
| 13 | |
| 14 | timestamp: str |
| 15 | operation: str |
| 16 | corpus_id: str | None |
| 17 | chunk_id: str | None |
| 18 | signer_did: str | None |
| 19 | algorithm: str | None |
| 20 | verified: bool |
| 21 | query_hash: str | None = None |
| 22 | details: str | None = None |
| 23 | |
| 24 | def to_dict(self) -> dict: |
| 25 | return asdict(self) |
| 26 | |
| 27 | |
| 28 | class RAGAuditLog: |
| 29 | """Append-only audit log for RAG operations. |
| 30 | |
| 31 | Production usage: persist entries to a real log backend. This class gives |
| 32 | you an in-memory structure with export for integrations. |
| 33 | """ |
| 34 | |
| 35 | def __init__(self, max_entries: int = 100_000) -> None: |
| 36 | self._entries: list[RAGAuditEntry] = [] |
| 37 | self._max = max_entries |
| 38 | |
| 39 | def log(self, entry: RAGAuditEntry) -> None: |
| 40 | if len(self._entries) >= self._max: |
| 41 | self._entries.pop(0) |
| 42 | self._entries.append(entry) |
| 43 | |
| 44 | def log_sign( |
| 45 | self, |
| 46 | corpus_id: str, |
| 47 | chunk_id: str, |
| 48 | signer_did: str, |
| 49 | algorithm: str, |
| 50 | ) -> None: |
| 51 | self.log( |
| 52 | RAGAuditEntry( |
| 53 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 54 | operation="sign_chunk", |
| 55 | corpus_id=corpus_id, |
| 56 | chunk_id=chunk_id, |
| 57 | signer_did=signer_did, |
| 58 | algorithm=algorithm, |
| 59 | verified=True, |
| 60 | ) |
| 61 | ) |
| 62 | |
| 63 | def log_verify( |
| 64 | self, |
| 65 | chunk_id: str, |
| 66 | signer_did: str | None, |
| 67 | algorithm: str | None, |
| 68 | verified: bool, |
| 69 | details: str | None = None, |
| 70 | ) -> None: |
| 71 | self.log( |
| 72 | RAGAuditEntry( |
| 73 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 74 | operation="verify_chunk", |
| 75 | corpus_id=None, |
| 76 | chunk_id=chunk_id, |
| 77 | signer_did=signer_did, |
| 78 | algorithm=algorithm, |
| 79 | verified=verified, |
| 80 | details=details, |
| 81 | ) |
| 82 | ) |
| 83 | |
| 84 | def log_retrieval( |
| 85 | self, |
| 86 | query_hash: str, |
| 87 | verified_count: int, |
| 88 | failed_count: int, |
| 89 | ) -> None: |
| 90 | self.log( |
| 91 | RAGAuditEntry( |
| 92 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 93 | operation="retrieve", |
| 94 | corpus_id=None, |
| 95 | chunk_id=None, |
| 96 | signer_did=None, |
| 97 | algorithm=None, |
| 98 | verified=failed_count == 0, |
| 99 | query_hash=query_hash, |
| 100 | details=f"{verified_count} verified, {failed_count} failed", |
| 101 | ) |
| 102 | ) |
| 103 | |
| 104 | def entries( |
| 105 | self, |
| 106 | limit: int = 100, |
| 107 | operation: str | None = None, |
| 108 | signer_did: str | None = None, |
| 109 | ) -> list[RAGAuditEntry]: |
| 110 | filtered = self._entries |
| 111 | if operation: |
| 112 | filtered = [e for e in filtered if e.operation == operation] |
| 113 | if signer_did: |
| 114 | filtered = [e for e in filtered if e.signer_did == signer_did] |
| 115 | return filtered[-limit:][::-1] |
| 116 | |
| 117 | def export_json(self) -> str: |
| 118 | return json.dumps([e.to_dict() for e in self._entries], indent=2) |
| 119 | |
| 120 | def clear(self) -> None: |
| 121 | self._entries.clear() |
| 122 | |
| 123 | def __len__(self) -> int: |
| 124 | return len(self._entries) |
| 125 | |