examples/retrieve_and_verify.py
| 1 | """ |
| 2 | Retrieve + Verify Example |
| 3 | |
| 4 | Shows the retrieval side: load signed chunks from a vector DB, |
| 5 | verify signatures, pass only verified content to the LLM. |
| 6 | """ |
| 7 | |
| 8 | import hashlib |
| 9 | |
| 10 | from quantumshield import AgentIdentity |
| 11 | |
| 12 | from pqc_rag_signing import ( |
| 13 | Corpus, |
| 14 | InMemoryAdapter, |
| 15 | RAGAuditLog, |
| 16 | RetrievalVerifier, |
| 17 | ) |
| 18 | |
| 19 | |
| 20 | def fake_embed(text: str, dim: int = 32) -> list[float]: |
| 21 | """Deterministic fake embedder for demo purposes.""" |
| 22 | h = hashlib.sha256(text.encode()).digest() |
| 23 | return [(b - 128) / 128.0 for b in h[:dim]] |
| 24 | |
| 25 | |
| 26 | def main() -> None: |
| 27 | # --- Ingest side --- |
| 28 | identity = AgentIdentity.create("rag-ingest") |
| 29 | corpus = Corpus(name="demo", identity=identity) |
| 30 | corpus.add_document( |
| 31 | "doc1.txt", |
| 32 | chunks=[ |
| 33 | "Post-quantum cryptography is required by CNSA 2.0.", |
| 34 | "ML-DSA-87 provides 256-bit post-quantum security.", |
| 35 | "NIST standardized ML-DSA in FIPS 204.", |
| 36 | ], |
| 37 | ) |
| 38 | signed_chunks = corpus.sign_all() |
| 39 | |
| 40 | store = InMemoryAdapter() |
| 41 | embeddings = [fake_embed(c.text) for c in signed_chunks] |
| 42 | store.upsert(signed_chunks, embeddings) |
| 43 | print(f"Stored {store.count()} signed chunks in vector DB") |
| 44 | |
| 45 | # --- Retrieval side --- |
| 46 | verifier = RetrievalVerifier( |
| 47 | trusted_signers={identity.did}, |
| 48 | strict=True, |
| 49 | ) |
| 50 | audit = RAGAuditLog() |
| 51 | |
| 52 | query = "What post-quantum algorithm did NIST standardize?" |
| 53 | query_embedding = fake_embed(query) |
| 54 | retrieved = store.query(query_embedding, top_k=3) |
| 55 | print(f"\nRetrieved {len(retrieved)} candidate chunks for query:") |
| 56 | print(f' "{query}"') |
| 57 | |
| 58 | # Verify everything before using it |
| 59 | result = verifier.verify_retrieved(retrieved) |
| 60 | audit.log_retrieval( |
| 61 | query_hash=hashlib.sha3_256(query.encode()).hexdigest(), |
| 62 | verified_count=result.verified_count, |
| 63 | failed_count=result.failed_count, |
| 64 | ) |
| 65 | |
| 66 | print("\nVerification result:") |
| 67 | print(f" verified: {result.verified_count}") |
| 68 | print(f" failed: {result.failed_count}") |
| 69 | print(f" all valid: {result.all_verified}") |
| 70 | |
| 71 | if result.all_verified: |
| 72 | print("\n[OK] All chunks verified. Safe to pass to LLM:") |
| 73 | for text in result.verified_texts(): |
| 74 | print(f" - {text}") |
| 75 | else: |
| 76 | print("\n[WARN] Some chunks failed verification - DO NOT pass to LLM.") |
| 77 | for chunk, res in result.failed: |
| 78 | print(f" - {chunk.chunk_id}: {res.error}") |
| 79 | |
| 80 | |
| 81 | if __name__ == "__main__": |
| 82 | main() |
| 83 | |