tests/test_audit.py
| 1 | """Tests for RAGAuditLog.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | |
| 7 | from pqc_rag_signing import RAGAuditLog |
| 8 | |
| 9 | |
| 10 | def test_log_sign_creates_entry(audit_log: RAGAuditLog) -> None: |
| 11 | audit_log.log_sign( |
| 12 | corpus_id="corpus-1", |
| 13 | chunk_id="chunk-1", |
| 14 | signer_did="did:pqaid:abc", |
| 15 | algorithm="ML-DSA-65", |
| 16 | ) |
| 17 | assert len(audit_log) == 1 |
| 18 | entries = audit_log.entries() |
| 19 | assert entries[0].operation == "sign_chunk" |
| 20 | assert entries[0].chunk_id == "chunk-1" |
| 21 | assert entries[0].signer_did == "did:pqaid:abc" |
| 22 | assert entries[0].verified is True |
| 23 | |
| 24 | |
| 25 | def test_log_verify_records_result(audit_log: RAGAuditLog) -> None: |
| 26 | audit_log.log_verify( |
| 27 | chunk_id="chunk-1", |
| 28 | signer_did="did:pqaid:abc", |
| 29 | algorithm="ML-DSA-65", |
| 30 | verified=False, |
| 31 | details="bad signature", |
| 32 | ) |
| 33 | entries = audit_log.entries() |
| 34 | assert entries[0].operation == "verify_chunk" |
| 35 | assert entries[0].verified is False |
| 36 | assert entries[0].details == "bad signature" |
| 37 | |
| 38 | |
| 39 | def test_log_retrieval_aggregates(audit_log: RAGAuditLog) -> None: |
| 40 | audit_log.log_retrieval( |
| 41 | query_hash="a" * 64, |
| 42 | verified_count=4, |
| 43 | failed_count=1, |
| 44 | ) |
| 45 | entries = audit_log.entries() |
| 46 | assert entries[0].operation == "retrieve" |
| 47 | assert entries[0].verified is False |
| 48 | assert entries[0].query_hash == "a" * 64 |
| 49 | assert "4 verified, 1 failed" in (entries[0].details or "") |
| 50 | |
| 51 | |
| 52 | def test_entries_filter_by_operation(audit_log: RAGAuditLog) -> None: |
| 53 | audit_log.log_sign("c1", "k1", "did:x", "ML-DSA-65") |
| 54 | audit_log.log_verify("k1", "did:x", "ML-DSA-65", verified=True) |
| 55 | audit_log.log_retrieval("qh", 1, 0) |
| 56 | signs = audit_log.entries(operation="sign_chunk") |
| 57 | verifies = audit_log.entries(operation="verify_chunk") |
| 58 | retrievals = audit_log.entries(operation="retrieve") |
| 59 | assert len(signs) == 1 and signs[0].operation == "sign_chunk" |
| 60 | assert len(verifies) == 1 and verifies[0].operation == "verify_chunk" |
| 61 | assert len(retrievals) == 1 and retrievals[0].operation == "retrieve" |
| 62 | |
| 63 | |
| 64 | def test_entries_filter_by_signer(audit_log: RAGAuditLog) -> None: |
| 65 | audit_log.log_sign("c1", "k1", "did:alice", "ML-DSA-65") |
| 66 | audit_log.log_sign("c1", "k2", "did:bob", "ML-DSA-65") |
| 67 | audit_log.log_sign("c1", "k3", "did:alice", "ML-DSA-65") |
| 68 | alice_entries = audit_log.entries(signer_did="did:alice") |
| 69 | assert len(alice_entries) == 2 |
| 70 | assert all(e.signer_did == "did:alice" for e in alice_entries) |
| 71 | |
| 72 | |
| 73 | def test_export_json_valid(audit_log: RAGAuditLog) -> None: |
| 74 | audit_log.log_sign("c1", "k1", "did:x", "ML-DSA-65") |
| 75 | audit_log.log_verify("k1", "did:x", "ML-DSA-65", verified=True) |
| 76 | js = audit_log.export_json() |
| 77 | parsed = json.loads(js) |
| 78 | assert isinstance(parsed, list) |
| 79 | assert len(parsed) == 2 |
| 80 | assert parsed[0]["operation"] == "sign_chunk" |
| 81 | |
| 82 | |
| 83 | def test_max_entries_respected() -> None: |
| 84 | log = RAGAuditLog(max_entries=3) |
| 85 | for i in range(5): |
| 86 | log.log_sign("c1", f"k{i}", "did:x", "ML-DSA-65") |
| 87 | assert len(log) == 3 |
| 88 | # Oldest entries dropped, newest kept |
| 89 | ids = [e.chunk_id for e in log.entries()] |
| 90 | assert "k0" not in ids |
| 91 | assert "k4" in ids |
| 92 | |