src/pqc_content_provenance/chain.py
| 1 | """Provenance chain -- link manifests across edits/derivations.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from collections.abc import Iterable |
| 6 | from dataclasses import dataclass, field |
| 7 | |
| 8 | from pqc_content_provenance.errors import ChainBrokenError |
| 9 | from pqc_content_provenance.manifest import ContentManifest |
| 10 | from pqc_content_provenance.signer import ManifestSigner |
| 11 | |
| 12 | |
| 13 | @dataclass |
| 14 | class ProvenanceLink: |
| 15 | """One link in a provenance chain: (manifest, verification_result).""" |
| 16 | |
| 17 | manifest: ContentManifest |
| 18 | |
| 19 | |
| 20 | @dataclass |
| 21 | class ProvenanceChain: |
| 22 | """An ordered chain of ContentManifests where each links to the previous.""" |
| 23 | |
| 24 | links: list[ProvenanceLink] = field(default_factory=list) |
| 25 | |
| 26 | def add(self, manifest: ContentManifest) -> None: |
| 27 | if self.links: |
| 28 | prev = self.links[-1].manifest |
| 29 | if manifest.previous_manifest_id != prev.manifest_id: |
| 30 | raise ChainBrokenError( |
| 31 | f"manifest {manifest.manifest_id} previous_manifest_id " |
| 32 | f"({manifest.previous_manifest_id}) does not match " |
| 33 | f"prior manifest id ({prev.manifest_id})" |
| 34 | ) |
| 35 | self.links.append(ProvenanceLink(manifest=manifest)) |
| 36 | |
| 37 | def verify_chain(self) -> tuple[bool, list[str]]: |
| 38 | """Verify every signature in the chain + every link.""" |
| 39 | errors: list[str] = [] |
| 40 | prev_id: str | None = None |
| 41 | for link in self.links: |
| 42 | m = link.manifest |
| 43 | if prev_id is not None and m.previous_manifest_id != prev_id: |
| 44 | errors.append(f"link break at {m.manifest_id}: expected prev {prev_id}") |
| 45 | result = ManifestSigner.verify(m) |
| 46 | if not result.valid: |
| 47 | errors.append(f"signature invalid at {m.manifest_id}: {result.error}") |
| 48 | prev_id = m.manifest_id |
| 49 | return len(errors) == 0, errors |
| 50 | |
| 51 | def to_dicts(self) -> list[dict]: |
| 52 | return [link.manifest.to_dict() for link in self.links] |
| 53 | |
| 54 | @classmethod |
| 55 | def from_dicts(cls, items: Iterable[dict]) -> ProvenanceChain: |
| 56 | chain = cls() |
| 57 | for item in items: |
| 58 | chain.links.append(ProvenanceLink(manifest=ContentManifest.from_dict(item))) |
| 59 | return chain |
| 60 | |