src/pqc_content_provenance/signer.py
| 1 | """Signing and verification for ContentManifests.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | from dataclasses import dataclass |
| 7 | from datetime import datetime, timezone |
| 8 | |
| 9 | from quantumshield.core.algorithms import SignatureAlgorithm |
| 10 | from quantumshield.core.signatures import sign, verify |
| 11 | from quantumshield.identity.agent import AgentIdentity |
| 12 | |
| 13 | from pqc_content_provenance.errors import ContentHashMismatchError |
| 14 | from pqc_content_provenance.manifest import ContentManifest |
| 15 | |
| 16 | |
| 17 | @dataclass(frozen=True) |
| 18 | class VerificationResult: |
| 19 | """Result of verifying a ContentManifest against its content.""" |
| 20 | |
| 21 | valid: bool |
| 22 | manifest_id: str |
| 23 | signer_did: str | None |
| 24 | algorithm: str | None |
| 25 | content_hash_match: bool |
| 26 | signature_match: bool |
| 27 | error: str | None = None |
| 28 | |
| 29 | |
| 30 | class ManifestSigner: |
| 31 | """Signs ContentManifests with an AgentIdentity (usually the model's identity). |
| 32 | |
| 33 | Usage: |
| 34 | identity = AgentIdentity.create("llama-3-8b-signer") |
| 35 | signer = ManifestSigner(identity) |
| 36 | content = b"Hello, this is AI-generated text." |
| 37 | manifest = ContentManifest.create(content, "text/plain", attribution, ctx) |
| 38 | signed = signer.sign(manifest) |
| 39 | """ |
| 40 | |
| 41 | def __init__(self, identity: AgentIdentity): |
| 42 | self.identity = identity |
| 43 | |
| 44 | def sign(self, manifest: ContentManifest) -> ContentManifest: |
| 45 | """Sign the manifest in place and return it.""" |
| 46 | canonical = manifest.canonical_bytes() |
| 47 | # Sign the SHA3-256 digest of the canonical bytes (deterministic) |
| 48 | digest = hashlib.sha3_256(canonical).digest() |
| 49 | signature = sign(digest, self.identity.signing_keypair) |
| 50 | manifest.signer_did = self.identity.did |
| 51 | manifest.algorithm = self.identity.signing_keypair.algorithm.value |
| 52 | manifest.signature = signature.hex() |
| 53 | manifest.public_key = self.identity.signing_keypair.public_key.hex() |
| 54 | manifest.signed_at = datetime.now(timezone.utc).isoformat() |
| 55 | return manifest |
| 56 | |
| 57 | @staticmethod |
| 58 | def verify(manifest: ContentManifest, content: bytes | None = None) -> VerificationResult: |
| 59 | """Verify the manifest signature and (optionally) the content hash.""" |
| 60 | content_hash_match = True |
| 61 | if content is not None: |
| 62 | expected = hashlib.sha3_256(content).hexdigest() |
| 63 | content_hash_match = expected == manifest.content_hash |
| 64 | |
| 65 | try: |
| 66 | algorithm = SignatureAlgorithm(manifest.algorithm) |
| 67 | except ValueError: |
| 68 | return VerificationResult( |
| 69 | valid=False, |
| 70 | manifest_id=manifest.manifest_id, |
| 71 | signer_did=manifest.signer_did, |
| 72 | algorithm=manifest.algorithm, |
| 73 | content_hash_match=content_hash_match, |
| 74 | signature_match=False, |
| 75 | error=f"unknown algorithm {manifest.algorithm}", |
| 76 | ) |
| 77 | |
| 78 | canonical = manifest.canonical_bytes() |
| 79 | digest = hashlib.sha3_256(canonical).digest() |
| 80 | |
| 81 | try: |
| 82 | sig_valid = verify( |
| 83 | digest, |
| 84 | bytes.fromhex(manifest.signature), |
| 85 | bytes.fromhex(manifest.public_key), |
| 86 | algorithm, |
| 87 | ) |
| 88 | except Exception as exc: |
| 89 | return VerificationResult( |
| 90 | valid=False, |
| 91 | manifest_id=manifest.manifest_id, |
| 92 | signer_did=manifest.signer_did, |
| 93 | algorithm=manifest.algorithm, |
| 94 | content_hash_match=content_hash_match, |
| 95 | signature_match=False, |
| 96 | error=f"signature verify failed: {exc}", |
| 97 | ) |
| 98 | |
| 99 | all_ok = sig_valid and content_hash_match |
| 100 | err = None |
| 101 | if not sig_valid: |
| 102 | err = "invalid ML-DSA signature" |
| 103 | elif not content_hash_match: |
| 104 | err = "content hash does not match manifest" |
| 105 | |
| 106 | return VerificationResult( |
| 107 | valid=all_ok, |
| 108 | manifest_id=manifest.manifest_id, |
| 109 | signer_did=manifest.signer_did, |
| 110 | algorithm=manifest.algorithm, |
| 111 | content_hash_match=content_hash_match, |
| 112 | signature_match=sig_valid, |
| 113 | error=err, |
| 114 | ) |
| 115 | |
| 116 | def sign_and_raise_on_mismatch( |
| 117 | self, manifest: ContentManifest, content: bytes |
| 118 | ) -> ContentManifest: |
| 119 | """Sign and then double-check the content hash matches (defensive signing).""" |
| 120 | computed = ContentManifest.compute_content_hash(content) |
| 121 | if computed != manifest.content_hash: |
| 122 | raise ContentHashMismatchError( |
| 123 | f"content hash in manifest ({manifest.content_hash[:16]}...) " |
| 124 | f"does not match actual content ({computed[:16]}...)" |
| 125 | ) |
| 126 | return self.sign(manifest) |
| 127 | |