src/pqc_reasoning_ledger/verifier.py
| 1 | """TraceVerifier - independently check a SealedTrace.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | from dataclasses import dataclass |
| 7 | |
| 8 | from quantumshield.core.algorithms import SignatureAlgorithm |
| 9 | from quantumshield.core.signatures import verify |
| 10 | |
| 11 | from pqc_reasoning_ledger.errors import SignatureVerificationError |
| 12 | from pqc_reasoning_ledger.merkle import compute_merkle_root |
| 13 | from pqc_reasoning_ledger.trace import SealedTrace |
| 14 | |
| 15 | |
| 16 | @dataclass(frozen=True) |
| 17 | class VerificationResult: |
| 18 | valid: bool |
| 19 | signature_valid: bool |
| 20 | chain_intact: bool |
| 21 | merkle_root_valid: bool |
| 22 | step_count: int |
| 23 | error: str | None = None |
| 24 | |
| 25 | @property |
| 26 | def fully_verified(self) -> bool: |
| 27 | return self.signature_valid and self.chain_intact and self.merkle_root_valid |
| 28 | |
| 29 | |
| 30 | class TraceVerifier: |
| 31 | """Independently verify a SealedTrace: chain + merkle + signature.""" |
| 32 | |
| 33 | @staticmethod |
| 34 | def verify(sealed: SealedTrace) -> VerificationResult: |
| 35 | # 1. Verify chain integrity: each step's previous_step_hash references prior step_hash |
| 36 | chain_ok = True |
| 37 | prev = "0" * 64 |
| 38 | for s in sealed.steps: |
| 39 | expected_hash = s.compute_step_hash() |
| 40 | if s.step_hash != expected_hash: |
| 41 | chain_ok = False |
| 42 | break |
| 43 | if s.previous_step_hash != prev: |
| 44 | chain_ok = False |
| 45 | break |
| 46 | prev = s.step_hash |
| 47 | |
| 48 | if sealed.steps and prev != sealed.final_chain_hash: |
| 49 | chain_ok = False |
| 50 | |
| 51 | # 2. Verify Merkle root |
| 52 | recomputed = ( |
| 53 | compute_merkle_root([s.step_hash for s in sealed.steps]) |
| 54 | if sealed.steps |
| 55 | else "" |
| 56 | ) |
| 57 | merkle_ok = recomputed == sealed.merkle_root |
| 58 | |
| 59 | # 3. Verify signature |
| 60 | sig_ok = False |
| 61 | err: str | None = None |
| 62 | if not sealed.signature or not sealed.algorithm: |
| 63 | err = "missing signature" |
| 64 | else: |
| 65 | try: |
| 66 | algorithm = SignatureAlgorithm(sealed.algorithm) |
| 67 | digest = hashlib.sha3_256(sealed.canonical_bytes()).digest() |
| 68 | sig_ok = verify( |
| 69 | digest, |
| 70 | bytes.fromhex(sealed.signature), |
| 71 | bytes.fromhex(sealed.public_key), |
| 72 | algorithm, |
| 73 | ) |
| 74 | if not sig_ok: |
| 75 | err = "ML-DSA signature invalid" |
| 76 | except ValueError: |
| 77 | err = f"unknown algorithm {sealed.algorithm}" |
| 78 | except Exception as exc: # noqa: BLE001 |
| 79 | err = f"signature verify failed: {exc}" |
| 80 | |
| 81 | if not chain_ok and err is None: |
| 82 | err = "chain integrity broken" |
| 83 | elif not merkle_ok and err is None: |
| 84 | err = "merkle root mismatch" |
| 85 | |
| 86 | valid = sig_ok and chain_ok and merkle_ok |
| 87 | return VerificationResult( |
| 88 | valid=valid, |
| 89 | signature_valid=sig_ok, |
| 90 | chain_intact=chain_ok, |
| 91 | merkle_root_valid=merkle_ok, |
| 92 | step_count=sealed.step_count, |
| 93 | error=err, |
| 94 | ) |
| 95 | |
| 96 | @staticmethod |
| 97 | def verify_or_raise(sealed: SealedTrace) -> None: |
| 98 | result = TraceVerifier.verify(sealed) |
| 99 | if not result.valid: |
| 100 | raise SignatureVerificationError(result.error or "verification failed") |
| 101 | |