tests/test_integration.py
| 1 | """End-to-end integration tests.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from pqc_reasoning_ledger import ( |
| 6 | ReasoningProver, |
| 7 | ReasoningRecorder, |
| 8 | SealedTrace, |
| 9 | TraceVerifier, |
| 10 | ) |
| 11 | |
| 12 | |
| 13 | def test_full_lifecycle(sample_trace_started: ReasoningRecorder) -> None: |
| 14 | r = sample_trace_started |
| 15 | r.record_observation("Patient reports chest pain and shortness of breath") |
| 16 | r.record_retrieval("Relevant guideline: ACC/AHA 2021 chest-pain evaluation") |
| 17 | r.record_hypothesis("Possible acute coronary syndrome") |
| 18 | r.record_deduction("Given observation + guideline, ACS cannot be ruled out") |
| 19 | r.record_decision("Recommend immediate troponin + ECG") |
| 20 | |
| 21 | sealed = r.seal() |
| 22 | assert sealed.step_count == 5 |
| 23 | |
| 24 | # Verify end-to-end |
| 25 | result = TraceVerifier.verify(sealed) |
| 26 | assert result.fully_verified |
| 27 | |
| 28 | # Round-trip through JSON (proves serialization) |
| 29 | blob = sealed.to_json() |
| 30 | restored = SealedTrace.from_json(blob) |
| 31 | assert TraceVerifier.verify(restored).fully_verified |
| 32 | |
| 33 | # Prove step 3 (hypothesis) |
| 34 | target = sealed.steps[2] |
| 35 | proof = ReasoningProver.prove_step(sealed, target.step_id) |
| 36 | assert ReasoningProver.verify_proof(proof) |
| 37 | |
| 38 | |
| 39 | def test_tampered_step_between_seal_and_verify_flagged( |
| 40 | sample_trace_started: ReasoningRecorder, |
| 41 | ) -> None: |
| 42 | r = sample_trace_started |
| 43 | r.record_observation("A") |
| 44 | r.record_deduction("B") |
| 45 | r.record_decision("C") |
| 46 | sealed = r.seal() |
| 47 | |
| 48 | # Flip a byte of step 1's content_hash - should break the chain |
| 49 | sealed.steps[1].content_hash = ( |
| 50 | ("0" if sealed.steps[1].content_hash[0] != "0" else "f") |
| 51 | + sealed.steps[1].content_hash[1:] |
| 52 | ) |
| 53 | result = TraceVerifier.verify(sealed) |
| 54 | assert result.valid is False |
| 55 | assert result.chain_intact is False |
| 56 | |
| 57 | |
| 58 | def test_byzantine_swap_final_decision( |
| 59 | sample_trace_started: ReasoningRecorder, |
| 60 | ) -> None: |
| 61 | """Byzantine scenario: adversary swaps the final decision step entirely. |
| 62 | |
| 63 | Even if they recompute step_hash for the new step, the Merkle root and the |
| 64 | ML-DSA signature were computed over the ORIGINAL step hashes, so at least |
| 65 | one of those two checks must fail. |
| 66 | """ |
| 67 | r = sample_trace_started |
| 68 | r.record_observation("Contract is standard") |
| 69 | r.record_deduction("Clause X is enforceable") |
| 70 | r.record_decision("Approve the contract") |
| 71 | sealed = r.seal() |
| 72 | |
| 73 | # Adversary replaces the final decision content + recomputes step_hash |
| 74 | last = sealed.steps[-1] |
| 75 | last.content = "REJECT the contract" |
| 76 | last.content_hash = last.hash_content(last.content) |
| 77 | last.step_hash = last.compute_step_hash() |
| 78 | # They try to patch final_chain_hash too |
| 79 | sealed.final_chain_hash = last.step_hash |
| 80 | |
| 81 | result = TraceVerifier.verify(sealed) |
| 82 | # Merkle root must still be over the ORIGINAL step hashes, so it mismatches, |
| 83 | # and even if attacker updated merkle_root, the signature covers merkle_root. |
| 84 | assert result.fully_verified is False |
| 85 | |