tests/test_corpus.py
| 1 | """Tests for Corpus and CorpusManifest.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from dataclasses import replace |
| 6 | |
| 7 | from quantumshield.identity.agent import AgentIdentity |
| 8 | |
| 9 | from pqc_rag_signing import ( |
| 10 | ChunkMetadata, |
| 11 | ChunkSigner, |
| 12 | Corpus, |
| 13 | CorpusManifest, |
| 14 | ) |
| 15 | |
| 16 | |
| 17 | def _make_corpus( |
| 18 | identity: AgentIdentity, |
| 19 | texts: dict[str, list[str]], |
| 20 | ) -> Corpus: |
| 21 | c = Corpus(name="test", identity=identity) |
| 22 | for source, chunks in texts.items(): |
| 23 | c.add_document(source, chunks) |
| 24 | return c |
| 25 | |
| 26 | |
| 27 | def test_corpus_sign_all( |
| 28 | ingest_identity: AgentIdentity, |
| 29 | sample_corpus_texts: dict[str, list[str]], |
| 30 | ) -> None: |
| 31 | corpus = _make_corpus(ingest_identity, sample_corpus_texts) |
| 32 | signed = corpus.sign_all() |
| 33 | total = sum(len(v) for v in sample_corpus_texts.values()) |
| 34 | assert len(signed) == total |
| 35 | for c in signed: |
| 36 | assert c.corpus_id == corpus.corpus_id |
| 37 | assert c.signer_did == ingest_identity.did |
| 38 | |
| 39 | |
| 40 | def test_build_manifest( |
| 41 | ingest_identity: AgentIdentity, |
| 42 | sample_corpus_texts: dict[str, list[str]], |
| 43 | ) -> None: |
| 44 | corpus = _make_corpus(ingest_identity, sample_corpus_texts) |
| 45 | corpus.sign_all() |
| 46 | manifest = corpus.build_manifest() |
| 47 | assert manifest.corpus_id == corpus.corpus_id |
| 48 | assert manifest.name == "test" |
| 49 | assert manifest.chunk_count == sum( |
| 50 | len(v) for v in sample_corpus_texts.values() |
| 51 | ) |
| 52 | expected_root = CorpusManifest.compute_root(manifest.chunk_hashes) |
| 53 | assert manifest.root == expected_root |
| 54 | assert manifest.signer_did == ingest_identity.did |
| 55 | |
| 56 | |
| 57 | def test_verify_manifest_valid( |
| 58 | ingest_identity: AgentIdentity, |
| 59 | sample_corpus_texts: dict[str, list[str]], |
| 60 | ) -> None: |
| 61 | corpus = _make_corpus(ingest_identity, sample_corpus_texts) |
| 62 | corpus.sign_all() |
| 63 | manifest = corpus.build_manifest() |
| 64 | assert Corpus.verify_manifest(manifest) |
| 65 | |
| 66 | |
| 67 | def test_verify_manifest_tampered_root_fails( |
| 68 | ingest_identity: AgentIdentity, |
| 69 | sample_corpus_texts: dict[str, list[str]], |
| 70 | ) -> None: |
| 71 | corpus = _make_corpus(ingest_identity, sample_corpus_texts) |
| 72 | corpus.sign_all() |
| 73 | manifest = corpus.build_manifest() |
| 74 | bogus_root = "0" * 64 |
| 75 | tampered = replace(manifest, root=bogus_root) |
| 76 | assert not Corpus.verify_manifest(tampered) |
| 77 | |
| 78 | |
| 79 | def test_verify_manifest_tampered_chunk_list_fails( |
| 80 | ingest_identity: AgentIdentity, |
| 81 | sample_corpus_texts: dict[str, list[str]], |
| 82 | ) -> None: |
| 83 | corpus = _make_corpus(ingest_identity, sample_corpus_texts) |
| 84 | corpus.sign_all() |
| 85 | manifest = corpus.build_manifest() |
| 86 | # Append a fake chunk pair - root still matches the old one so the |
| 87 | # recomputed root differs. |
| 88 | bad_list = list(manifest.chunk_hashes) + [("fake-chunk", "00" * 32)] |
| 89 | tampered = replace(manifest, chunk_hashes=bad_list) |
| 90 | assert not Corpus.verify_manifest(tampered) |
| 91 | |
| 92 | |
| 93 | def test_verify_chunks_against_manifest_all_present( |
| 94 | ingest_identity: AgentIdentity, |
| 95 | sample_corpus_texts: dict[str, list[str]], |
| 96 | ) -> None: |
| 97 | corpus = _make_corpus(ingest_identity, sample_corpus_texts) |
| 98 | signed = corpus.sign_all() |
| 99 | manifest = corpus.build_manifest() |
| 100 | ok, missing = Corpus.verify_chunks_against_manifest(signed, manifest) |
| 101 | assert ok |
| 102 | assert missing == [] |
| 103 | |
| 104 | |
| 105 | def test_verify_chunks_against_manifest_extra_chunk_detected( |
| 106 | ingest_identity: AgentIdentity, |
| 107 | sample_corpus_texts: dict[str, list[str]], |
| 108 | ) -> None: |
| 109 | corpus = _make_corpus(ingest_identity, sample_corpus_texts) |
| 110 | signed = corpus.sign_all() |
| 111 | manifest = corpus.build_manifest() |
| 112 | |
| 113 | # Insert a chunk that is NOT committed in the manifest |
| 114 | rogue_signer = ChunkSigner(ingest_identity) |
| 115 | rogue = rogue_signer.sign_chunk( |
| 116 | "not in manifest", |
| 117 | ChunkMetadata(source="rogue.txt", chunk_index=0, total_chunks=1), |
| 118 | ) |
| 119 | chunks_with_rogue = signed + [rogue] |
| 120 | |
| 121 | ok, missing = Corpus.verify_chunks_against_manifest( |
| 122 | chunks_with_rogue, manifest |
| 123 | ) |
| 124 | assert not ok |
| 125 | assert rogue.chunk_id in missing |
| 126 | |
| 127 | |
| 128 | def test_manifest_root_deterministic(ingest_identity: AgentIdentity) -> None: |
| 129 | pairs_a = [ |
| 130 | ("chunk-a", "aa" * 32), |
| 131 | ("chunk-b", "bb" * 32), |
| 132 | ("chunk-c", "cc" * 32), |
| 133 | ] |
| 134 | pairs_b = list(reversed(pairs_a)) # different order |
| 135 | assert CorpusManifest.compute_root(pairs_a) == CorpusManifest.compute_root( |
| 136 | pairs_b |
| 137 | ) |
| 138 | |