tests/test_audit.py
3.1 KB · 92 lines · python Raw
1 """Tests for RAGAuditLog."""
2
3 from __future__ import annotations
4
5 import json
6
7 from pqc_rag_signing import RAGAuditLog
8
9
10 def test_log_sign_creates_entry(audit_log: RAGAuditLog) -> None:
11 audit_log.log_sign(
12 corpus_id="corpus-1",
13 chunk_id="chunk-1",
14 signer_did="did:pqaid:abc",
15 algorithm="ML-DSA-65",
16 )
17 assert len(audit_log) == 1
18 entries = audit_log.entries()
19 assert entries[0].operation == "sign_chunk"
20 assert entries[0].chunk_id == "chunk-1"
21 assert entries[0].signer_did == "did:pqaid:abc"
22 assert entries[0].verified is True
23
24
25 def test_log_verify_records_result(audit_log: RAGAuditLog) -> None:
26 audit_log.log_verify(
27 chunk_id="chunk-1",
28 signer_did="did:pqaid:abc",
29 algorithm="ML-DSA-65",
30 verified=False,
31 details="bad signature",
32 )
33 entries = audit_log.entries()
34 assert entries[0].operation == "verify_chunk"
35 assert entries[0].verified is False
36 assert entries[0].details == "bad signature"
37
38
39 def test_log_retrieval_aggregates(audit_log: RAGAuditLog) -> None:
40 audit_log.log_retrieval(
41 query_hash="a" * 64,
42 verified_count=4,
43 failed_count=1,
44 )
45 entries = audit_log.entries()
46 assert entries[0].operation == "retrieve"
47 assert entries[0].verified is False
48 assert entries[0].query_hash == "a" * 64
49 assert "4 verified, 1 failed" in (entries[0].details or "")
50
51
52 def test_entries_filter_by_operation(audit_log: RAGAuditLog) -> None:
53 audit_log.log_sign("c1", "k1", "did:x", "ML-DSA-65")
54 audit_log.log_verify("k1", "did:x", "ML-DSA-65", verified=True)
55 audit_log.log_retrieval("qh", 1, 0)
56 signs = audit_log.entries(operation="sign_chunk")
57 verifies = audit_log.entries(operation="verify_chunk")
58 retrievals = audit_log.entries(operation="retrieve")
59 assert len(signs) == 1 and signs[0].operation == "sign_chunk"
60 assert len(verifies) == 1 and verifies[0].operation == "verify_chunk"
61 assert len(retrievals) == 1 and retrievals[0].operation == "retrieve"
62
63
64 def test_entries_filter_by_signer(audit_log: RAGAuditLog) -> None:
65 audit_log.log_sign("c1", "k1", "did:alice", "ML-DSA-65")
66 audit_log.log_sign("c1", "k2", "did:bob", "ML-DSA-65")
67 audit_log.log_sign("c1", "k3", "did:alice", "ML-DSA-65")
68 alice_entries = audit_log.entries(signer_did="did:alice")
69 assert len(alice_entries) == 2
70 assert all(e.signer_did == "did:alice" for e in alice_entries)
71
72
73 def test_export_json_valid(audit_log: RAGAuditLog) -> None:
74 audit_log.log_sign("c1", "k1", "did:x", "ML-DSA-65")
75 audit_log.log_verify("k1", "did:x", "ML-DSA-65", verified=True)
76 js = audit_log.export_json()
77 parsed = json.loads(js)
78 assert isinstance(parsed, list)
79 assert len(parsed) == 2
80 assert parsed[0]["operation"] == "sign_chunk"
81
82
83 def test_max_entries_respected() -> None:
84 log = RAGAuditLog(max_entries=3)
85 for i in range(5):
86 log.log_sign("c1", f"k{i}", "did:x", "ML-DSA-65")
87 assert len(log) == 3
88 # Oldest entries dropped, newest kept
89 ids = [e.chunk_id for e in log.entries()]
90 assert "k0" not in ids
91 assert "k4" in ids
92