src/pqc_content_provenance/manifest.py
6.2 KB · 162 lines · python Raw
1 """ContentManifest -- the core provenance record attached to every output."""
2
3 from __future__ import annotations
4
5 import hashlib
6 import json
7 import uuid
8 from dataclasses import asdict, dataclass, field
9 from datetime import datetime, timezone
10 from typing import Any
11
12 from pqc_content_provenance.assertions import ASSERTION_REGISTRY, Assertion
13 from pqc_content_provenance.errors import InvalidManifestError, UnknownAssertionError
14
15
16 @dataclass
17 class ModelAttribution:
18 """Identifies the model that produced the content."""
19
20 model_did: str # did:pqaid:...
21 model_name: str # e.g. "Llama-3-8B-Instruct"
22 model_version: str # e.g. "1.0"
23 registry_url: str = "" # e.g. https://quantamrkt.com/models/...
24 model_manifest_hash: str = "" # hash of the model manifest in Shield Registry
25
26 def to_dict(self) -> dict[str, Any]:
27 return asdict(self)
28
29
30 @dataclass
31 class GenerationContext:
32 """What produced the content -- prompt hash, parameters, etc."""
33
34 prompt_hash: str = "" # SHA3-256 of the prompt
35 input_content_hashes: list[str] = field(default_factory=list) # hashes of reference inputs
36 parameters: dict = field(default_factory=dict)
37 generated_at: str = "" # ISO-8601
38
39 def to_dict(self) -> dict[str, Any]:
40 return asdict(self)
41
42
43 @dataclass
44 class ContentManifest:
45 """The signed provenance manifest attached to AI-generated content.
46
47 Stores: manifest ID, content hash, model attribution, generation context,
48 assertions (pluggable claims), signature chain (see ProvenanceChain).
49 """
50
51 manifest_id: str
52 content_hash: str # SHA3-256 of the output bytes
53 content_type: str # mime-type (text/plain, image/png, ...)
54 content_size: int # bytes
55 model_attribution: ModelAttribution
56 generation_context: GenerationContext
57 assertions: list[Assertion] = field(default_factory=list)
58 created_at: str = ""
59 previous_manifest_id: str | None = None # prior link in chain (re-signing, editing)
60
61 # Filled in by ManifestSigner
62 signer_did: str = ""
63 algorithm: str = ""
64 signature: str = "" # hex
65 public_key: str = "" # hex
66 signed_at: str = ""
67
68 @staticmethod
69 def compute_content_hash(content: bytes) -> str:
70 return hashlib.sha3_256(content).hexdigest()
71
72 @classmethod
73 def create(
74 cls,
75 content: bytes,
76 content_type: str,
77 model_attribution: ModelAttribution,
78 generation_context: GenerationContext,
79 assertions: list[Assertion] | None = None,
80 previous_manifest_id: str | None = None,
81 ) -> ContentManifest:
82 return cls(
83 manifest_id=f"urn:pqc-prov:{uuid.uuid4().hex}",
84 content_hash=cls.compute_content_hash(content),
85 content_type=content_type,
86 content_size=len(content),
87 model_attribution=model_attribution,
88 generation_context=generation_context,
89 assertions=list(assertions or []),
90 created_at=datetime.now(timezone.utc).isoformat(),
91 previous_manifest_id=previous_manifest_id,
92 )
93
94 def canonical_bytes(self) -> bytes:
95 """Deterministic bytes used for signing (excludes the signature itself)."""
96 payload = {
97 "manifest_id": self.manifest_id,
98 "content_hash": self.content_hash,
99 "content_type": self.content_type,
100 "content_size": self.content_size,
101 "model_attribution": self.model_attribution.to_dict(),
102 "generation_context": self.generation_context.to_dict(),
103 "assertions": [a.to_dict() for a in self.assertions],
104 "created_at": self.created_at,
105 "previous_manifest_id": self.previous_manifest_id,
106 }
107 return json.dumps(
108 payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False
109 ).encode("utf-8")
110
111 def to_dict(self) -> dict[str, Any]:
112 d = asdict(self)
113 d["model_attribution"] = self.model_attribution.to_dict()
114 d["generation_context"] = self.generation_context.to_dict()
115 d["assertions"] = [a.to_dict() for a in self.assertions]
116 return d
117
118 def to_json(self) -> str:
119 return json.dumps(self.to_dict(), indent=2)
120
121 @classmethod
122 def from_dict(cls, data: dict[str, Any]) -> ContentManifest:
123 try:
124 assertions_raw = data.get("assertions", [])
125 assertions: list[Assertion] = []
126 for a in assertions_raw:
127 label = a.get("label")
128 cls_ = ASSERTION_REGISTRY.get(label or "")
129 if not cls_:
130 raise UnknownAssertionError(f"Unknown assertion label: {label}")
131 assertion = cls_.from_dict(a)
132 assertions.append(assertion)
133
134 mattr = data["model_attribution"]
135 gctx = data["generation_context"]
136 return cls(
137 manifest_id=data["manifest_id"],
138 content_hash=data["content_hash"],
139 content_type=data["content_type"],
140 content_size=data["content_size"],
141 model_attribution=ModelAttribution(**mattr),
142 generation_context=GenerationContext(**gctx),
143 assertions=assertions,
144 created_at=data.get("created_at", ""),
145 previous_manifest_id=data.get("previous_manifest_id"),
146 signer_did=data.get("signer_did", ""),
147 algorithm=data.get("algorithm", ""),
148 signature=data.get("signature", ""),
149 public_key=data.get("public_key", ""),
150 signed_at=data.get("signed_at", ""),
151 )
152 except KeyError as e:
153 raise InvalidManifestError(f"Missing required field: {e}") from e
154
155 @classmethod
156 def from_json(cls, blob: str) -> ContentManifest:
157 try:
158 data = json.loads(blob)
159 except json.JSONDecodeError as e:
160 raise InvalidManifestError(f"Invalid JSON: {e}") from e
161 return cls.from_dict(data)
162