src/pqc_content_provenance/chain.py
2.1 KB · 60 lines · python Raw
1 """Provenance chain -- link manifests across edits/derivations."""
2
3 from __future__ import annotations
4
5 from collections.abc import Iterable
6 from dataclasses import dataclass, field
7
8 from pqc_content_provenance.errors import ChainBrokenError
9 from pqc_content_provenance.manifest import ContentManifest
10 from pqc_content_provenance.signer import ManifestSigner
11
12
13 @dataclass
14 class ProvenanceLink:
15 """One link in a provenance chain: (manifest, verification_result)."""
16
17 manifest: ContentManifest
18
19
20 @dataclass
21 class ProvenanceChain:
22 """An ordered chain of ContentManifests where each links to the previous."""
23
24 links: list[ProvenanceLink] = field(default_factory=list)
25
26 def add(self, manifest: ContentManifest) -> None:
27 if self.links:
28 prev = self.links[-1].manifest
29 if manifest.previous_manifest_id != prev.manifest_id:
30 raise ChainBrokenError(
31 f"manifest {manifest.manifest_id} previous_manifest_id "
32 f"({manifest.previous_manifest_id}) does not match "
33 f"prior manifest id ({prev.manifest_id})"
34 )
35 self.links.append(ProvenanceLink(manifest=manifest))
36
37 def verify_chain(self) -> tuple[bool, list[str]]:
38 """Verify every signature in the chain + every link."""
39 errors: list[str] = []
40 prev_id: str | None = None
41 for link in self.links:
42 m = link.manifest
43 if prev_id is not None and m.previous_manifest_id != prev_id:
44 errors.append(f"link break at {m.manifest_id}: expected prev {prev_id}")
45 result = ManifestSigner.verify(m)
46 if not result.valid:
47 errors.append(f"signature invalid at {m.manifest_id}: {result.error}")
48 prev_id = m.manifest_id
49 return len(errors) == 0, errors
50
51 def to_dicts(self) -> list[dict]:
52 return [link.manifest.to_dict() for link in self.links]
53
54 @classmethod
55 def from_dicts(cls, items: Iterable[dict]) -> ProvenanceChain:
56 chain = cls()
57 for item in items:
58 chain.links.append(ProvenanceLink(manifest=ContentManifest.from_dict(item)))
59 return chain
60