src/pqc_audit_log_fs/prover.py
| 1 | """InclusionProver - generate and verify proofs that an event is in a segment.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from pqc_audit_log_fs.errors import SegmentNotFoundError |
| 6 | from pqc_audit_log_fs.event import InferenceEvent |
| 7 | from pqc_audit_log_fs.merkle import ( |
| 8 | InclusionProof, |
| 9 | build_merkle_proof, |
| 10 | verify_inclusion, |
| 11 | ) |
| 12 | from pqc_audit_log_fs.reader import LogReader |
| 13 | |
| 14 | |
| 15 | class InclusionProver: |
| 16 | """Produce inclusion proofs for events stored in a log.""" |
| 17 | |
| 18 | def __init__(self, reader: LogReader) -> None: |
| 19 | self.reader = reader |
| 20 | |
| 21 | def prove_event( |
| 22 | self, segment_number: int, event_id: str |
| 23 | ) -> InclusionProof: |
| 24 | segment = self.reader.read_segment(segment_number) |
| 25 | idx: int | None = None |
| 26 | for i, e in enumerate(segment.events): |
| 27 | if e.event_id == event_id: |
| 28 | idx = i |
| 29 | break |
| 30 | if idx is None: |
| 31 | raise SegmentNotFoundError( |
| 32 | f"event {event_id} not in segment {segment_number}" |
| 33 | ) |
| 34 | leaves = [e.leaf_hash() for e in segment.events] |
| 35 | return build_merkle_proof(leaves, idx, segment.header.merkle_root) |
| 36 | |
| 37 | @staticmethod |
| 38 | def verify_proof(event: InferenceEvent, proof: InclusionProof) -> bool: |
| 39 | if event.leaf_hash() != proof.leaf_hash: |
| 40 | return False |
| 41 | return verify_inclusion(proof) |
| 42 | |