tests/test_corpus.py
4.2 KB · 138 lines · python Raw
1 """Tests for Corpus and CorpusManifest."""
2
3 from __future__ import annotations
4
5 from dataclasses import replace
6
7 from quantumshield.identity.agent import AgentIdentity
8
9 from pqc_rag_signing import (
10 ChunkMetadata,
11 ChunkSigner,
12 Corpus,
13 CorpusManifest,
14 )
15
16
17 def _make_corpus(
18 identity: AgentIdentity,
19 texts: dict[str, list[str]],
20 ) -> Corpus:
21 c = Corpus(name="test", identity=identity)
22 for source, chunks in texts.items():
23 c.add_document(source, chunks)
24 return c
25
26
27 def test_corpus_sign_all(
28 ingest_identity: AgentIdentity,
29 sample_corpus_texts: dict[str, list[str]],
30 ) -> None:
31 corpus = _make_corpus(ingest_identity, sample_corpus_texts)
32 signed = corpus.sign_all()
33 total = sum(len(v) for v in sample_corpus_texts.values())
34 assert len(signed) == total
35 for c in signed:
36 assert c.corpus_id == corpus.corpus_id
37 assert c.signer_did == ingest_identity.did
38
39
40 def test_build_manifest(
41 ingest_identity: AgentIdentity,
42 sample_corpus_texts: dict[str, list[str]],
43 ) -> None:
44 corpus = _make_corpus(ingest_identity, sample_corpus_texts)
45 corpus.sign_all()
46 manifest = corpus.build_manifest()
47 assert manifest.corpus_id == corpus.corpus_id
48 assert manifest.name == "test"
49 assert manifest.chunk_count == sum(
50 len(v) for v in sample_corpus_texts.values()
51 )
52 expected_root = CorpusManifest.compute_root(manifest.chunk_hashes)
53 assert manifest.root == expected_root
54 assert manifest.signer_did == ingest_identity.did
55
56
57 def test_verify_manifest_valid(
58 ingest_identity: AgentIdentity,
59 sample_corpus_texts: dict[str, list[str]],
60 ) -> None:
61 corpus = _make_corpus(ingest_identity, sample_corpus_texts)
62 corpus.sign_all()
63 manifest = corpus.build_manifest()
64 assert Corpus.verify_manifest(manifest)
65
66
67 def test_verify_manifest_tampered_root_fails(
68 ingest_identity: AgentIdentity,
69 sample_corpus_texts: dict[str, list[str]],
70 ) -> None:
71 corpus = _make_corpus(ingest_identity, sample_corpus_texts)
72 corpus.sign_all()
73 manifest = corpus.build_manifest()
74 bogus_root = "0" * 64
75 tampered = replace(manifest, root=bogus_root)
76 assert not Corpus.verify_manifest(tampered)
77
78
79 def test_verify_manifest_tampered_chunk_list_fails(
80 ingest_identity: AgentIdentity,
81 sample_corpus_texts: dict[str, list[str]],
82 ) -> None:
83 corpus = _make_corpus(ingest_identity, sample_corpus_texts)
84 corpus.sign_all()
85 manifest = corpus.build_manifest()
86 # Append a fake chunk pair - root still matches the old one so the
87 # recomputed root differs.
88 bad_list = list(manifest.chunk_hashes) + [("fake-chunk", "00" * 32)]
89 tampered = replace(manifest, chunk_hashes=bad_list)
90 assert not Corpus.verify_manifest(tampered)
91
92
93 def test_verify_chunks_against_manifest_all_present(
94 ingest_identity: AgentIdentity,
95 sample_corpus_texts: dict[str, list[str]],
96 ) -> None:
97 corpus = _make_corpus(ingest_identity, sample_corpus_texts)
98 signed = corpus.sign_all()
99 manifest = corpus.build_manifest()
100 ok, missing = Corpus.verify_chunks_against_manifest(signed, manifest)
101 assert ok
102 assert missing == []
103
104
105 def test_verify_chunks_against_manifest_extra_chunk_detected(
106 ingest_identity: AgentIdentity,
107 sample_corpus_texts: dict[str, list[str]],
108 ) -> None:
109 corpus = _make_corpus(ingest_identity, sample_corpus_texts)
110 signed = corpus.sign_all()
111 manifest = corpus.build_manifest()
112
113 # Insert a chunk that is NOT committed in the manifest
114 rogue_signer = ChunkSigner(ingest_identity)
115 rogue = rogue_signer.sign_chunk(
116 "not in manifest",
117 ChunkMetadata(source="rogue.txt", chunk_index=0, total_chunks=1),
118 )
119 chunks_with_rogue = signed + [rogue]
120
121 ok, missing = Corpus.verify_chunks_against_manifest(
122 chunks_with_rogue, manifest
123 )
124 assert not ok
125 assert rogue.chunk_id in missing
126
127
128 def test_manifest_root_deterministic(ingest_identity: AgentIdentity) -> None:
129 pairs_a = [
130 ("chunk-a", "aa" * 32),
131 ("chunk-b", "bb" * 32),
132 ("chunk-c", "cc" * 32),
133 ]
134 pairs_b = list(reversed(pairs_a)) # different order
135 assert CorpusManifest.compute_root(pairs_a) == CorpusManifest.compute_root(
136 pairs_b
137 )
138