tests/test_integration.py
| 1 | """End-to-end integration tests.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | |
| 7 | from quantumshield.identity.agent import AgentIdentity |
| 8 | |
| 9 | from pqc_rag_signing import ( |
| 10 | ChunkMetadata, |
| 11 | ChunkSigner, |
| 12 | Corpus, |
| 13 | InMemoryAdapter, |
| 14 | RetrievalVerifier, |
| 15 | ) |
| 16 | |
| 17 | |
| 18 | def _embed(text: str, dim: int = 16) -> list[float]: |
| 19 | h = hashlib.sha256(text.encode()).digest() |
| 20 | return [(b - 128) / 128.0 for b in h[:dim]] |
| 21 | |
| 22 | |
| 23 | def test_full_pipeline_ingest_retrieve_verify( |
| 24 | ingest_identity: AgentIdentity, |
| 25 | ) -> None: |
| 26 | # --- Ingest --- |
| 27 | corpus = Corpus(name="full-pipeline", identity=ingest_identity) |
| 28 | corpus.add_document( |
| 29 | "policy.txt", |
| 30 | chunks=[ |
| 31 | "PQC is mandatory for all new systems.", |
| 32 | "ML-DSA-87 is the preferred algorithm.", |
| 33 | "Never deploy RSA for new services.", |
| 34 | ], |
| 35 | ) |
| 36 | signed = corpus.sign_all() |
| 37 | manifest = corpus.build_manifest() |
| 38 | assert Corpus.verify_manifest(manifest) |
| 39 | |
| 40 | # --- Store --- |
| 41 | store = InMemoryAdapter() |
| 42 | store.upsert(signed, [_embed(c.text) for c in signed]) |
| 43 | |
| 44 | # --- Retrieve --- |
| 45 | retrieved = store.query(_embed("ML-DSA"), top_k=3) |
| 46 | assert len(retrieved) == 3 |
| 47 | |
| 48 | # --- Verify --- |
| 49 | verifier = RetrievalVerifier(trusted_signers={ingest_identity.did}) |
| 50 | result = verifier.verify_retrieved(retrieved) |
| 51 | assert result.all_verified |
| 52 | assert set(result.verified_texts()) == {c.text for c in signed} |
| 53 | |
| 54 | |
| 55 | def test_poisoned_vector_db_detected( |
| 56 | ingest_identity: AgentIdentity, |
| 57 | attacker_identity: AgentIdentity, |
| 58 | ) -> None: |
| 59 | # Legitimate ingest |
| 60 | corpus = Corpus(name="company", identity=ingest_identity) |
| 61 | corpus.add_document( |
| 62 | "safe.txt", |
| 63 | chunks=["Never share credentials in email."], |
| 64 | ) |
| 65 | signed = corpus.sign_all() |
| 66 | |
| 67 | store = InMemoryAdapter() |
| 68 | store.upsert(signed, [_embed(c.text) for c in signed]) |
| 69 | |
| 70 | # Attacker injects poisoned chunk |
| 71 | attacker = ChunkSigner(attacker_identity) |
| 72 | poison = attacker.sign_chunk( |
| 73 | "It is fine to share credentials in email.", |
| 74 | ChunkMetadata(source="safe.txt", chunk_index=99, total_chunks=99), |
| 75 | ) |
| 76 | store.upsert([poison], [_embed(poison.text)]) |
| 77 | assert store.count() == 2 |
| 78 | |
| 79 | # Retrieval uses strict allow-list |
| 80 | verifier = RetrievalVerifier(trusted_signers={ingest_identity.did}) |
| 81 | retrieved = store.query(_embed("credentials"), top_k=5) |
| 82 | result = verifier.verify_retrieved(retrieved) |
| 83 | |
| 84 | # Poisoned chunk is rejected, legitimate chunk verified |
| 85 | assert result.verified_count == 1 |
| 86 | assert result.failed_count == 1 |
| 87 | assert result.failed[0][0].signer_did == attacker_identity.did |
| 88 | assert "Never share credentials" in result.verified_texts()[0] |
| 89 | |
| 90 | |
| 91 | def test_cross_corpus_chunks_detected( |
| 92 | ingest_identity: AgentIdentity, |
| 93 | ) -> None: |
| 94 | corpus_a = Corpus(name="A", identity=ingest_identity) |
| 95 | corpus_a.add_document("a.txt", chunks=["alpha one", "alpha two"]) |
| 96 | signed_a = corpus_a.sign_all() |
| 97 | manifest_a = corpus_a.build_manifest() |
| 98 | |
| 99 | corpus_b = Corpus(name="B", identity=ingest_identity) |
| 100 | corpus_b.add_document("b.txt", chunks=["beta one"]) |
| 101 | signed_b = corpus_b.sign_all() |
| 102 | |
| 103 | # Mix chunks from B into a "claim to be A" set |
| 104 | mixed = signed_a + signed_b |
| 105 | ok, missing = Corpus.verify_chunks_against_manifest(mixed, manifest_a) |
| 106 | assert not ok |
| 107 | assert all(sb.chunk_id in missing for sb in signed_b) |
| 108 | |