src/pqc_bootloader/signer.py
| 1 | """Firmware signing and verification using ML-DSA.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | from dataclasses import dataclass |
| 7 | from datetime import datetime, timezone |
| 8 | |
| 9 | from quantumshield.core.algorithms import SignatureAlgorithm |
| 10 | from quantumshield.core.signatures import sign, verify |
| 11 | from quantumshield.identity.agent import AgentIdentity |
| 12 | |
| 13 | from pqc_bootloader.errors import FirmwareVerificationError |
| 14 | from pqc_bootloader.firmware import FirmwareImage, SignedFirmware |
| 15 | from pqc_bootloader.key_ring import KeyRing |
| 16 | |
| 17 | |
| 18 | @dataclass(frozen=True) |
| 19 | class VerificationResult: |
| 20 | """Outcome of verifying a SignedFirmware.""" |
| 21 | |
| 22 | valid: bool |
| 23 | signature_valid: bool |
| 24 | hash_consistent: bool |
| 25 | key_trusted: bool |
| 26 | signer_did: str | None |
| 27 | firmware_name: str | None |
| 28 | error: str | None = None |
| 29 | |
| 30 | |
| 31 | class FirmwareSigner: |
| 32 | """Signs firmware images with a fixed manufacturer AgentIdentity. |
| 33 | |
| 34 | Usage: |
| 35 | manufacturer = AgentIdentity.create("acme-appliance-vendor") |
| 36 | signer = FirmwareSigner(manufacturer) |
| 37 | signed = signer.sign(firmware) |
| 38 | # distribute signed.to_dict() with the firmware update |
| 39 | """ |
| 40 | |
| 41 | def __init__(self, identity: AgentIdentity) -> None: |
| 42 | self.identity = identity |
| 43 | |
| 44 | @property |
| 45 | def key_id(self) -> str: |
| 46 | """Fingerprint of the manufacturer public key.""" |
| 47 | return hashlib.sha3_256(self.identity.signing_keypair.public_key).hexdigest() |
| 48 | |
| 49 | def sign( |
| 50 | self, |
| 51 | firmware: FirmwareImage, |
| 52 | previous_firmware_hash: str = "", |
| 53 | ) -> SignedFirmware: |
| 54 | """Produce a SignedFirmware envelope for the given FirmwareImage.""" |
| 55 | manifest = firmware.canonical_manifest_bytes() |
| 56 | sig = sign(manifest, self.identity.signing_keypair) |
| 57 | return SignedFirmware( |
| 58 | firmware=firmware, |
| 59 | manufacturer_key_id=self.key_id, |
| 60 | signer_did=self.identity.did, |
| 61 | algorithm=self.identity.signing_keypair.algorithm.value, |
| 62 | signature=sig.hex(), |
| 63 | public_key=self.identity.signing_keypair.public_key.hex(), |
| 64 | signed_at=datetime.now(timezone.utc).isoformat(), |
| 65 | previous_firmware_hash=previous_firmware_hash, |
| 66 | ) |
| 67 | |
| 68 | |
| 69 | class FirmwareVerifier: |
| 70 | """Validates SignedFirmware envelopes against a manufacturer key-ring. |
| 71 | |
| 72 | The verifier can operate in three modes: |
| 73 | 1. Pure signature check (no actual_bytes, no key_ring). |
| 74 | 2. Hash-consistency check (actual_bytes supplied - compares SHA3-256 |
| 75 | of the delivered image against the signed image_hash). |
| 76 | 3. Trust check (key_ring supplied - refuses firmware signed by keys |
| 77 | not in the manufacturer allow-list). |
| 78 | """ |
| 79 | |
| 80 | @staticmethod |
| 81 | def verify( |
| 82 | signed: SignedFirmware, |
| 83 | actual_bytes: bytes | None = None, |
| 84 | key_ring: KeyRing | None = None, |
| 85 | ) -> VerificationResult: |
| 86 | firmware_name = signed.firmware.metadata.name |
| 87 | signer_did = signed.signer_did |
| 88 | |
| 89 | # --- hash-consistency check ------------------------------------- |
| 90 | hash_consistent = True |
| 91 | if actual_bytes is not None: |
| 92 | actual_hash = FirmwareImage.hash_bytes(actual_bytes) |
| 93 | if actual_hash != signed.firmware.image_hash: |
| 94 | return VerificationResult( |
| 95 | valid=False, |
| 96 | signature_valid=False, |
| 97 | hash_consistent=False, |
| 98 | key_trusted=False, |
| 99 | signer_did=signer_did, |
| 100 | firmware_name=firmware_name, |
| 101 | error=( |
| 102 | f"image hash mismatch (expected " |
| 103 | f"{signed.firmware.image_hash[:16]}..., got " |
| 104 | f"{actual_hash[:16]}...)" |
| 105 | ), |
| 106 | ) |
| 107 | |
| 108 | # --- trust check (key-ring) ------------------------------------- |
| 109 | key_trusted = True |
| 110 | if key_ring is not None: |
| 111 | if not key_ring.is_trusted(signed.manufacturer_key_id): |
| 112 | return VerificationResult( |
| 113 | valid=False, |
| 114 | signature_valid=False, |
| 115 | hash_consistent=hash_consistent, |
| 116 | key_trusted=False, |
| 117 | signer_did=signer_did, |
| 118 | firmware_name=firmware_name, |
| 119 | error=( |
| 120 | f"manufacturer key {signed.manufacturer_key_id[:16]}... " |
| 121 | f"not trusted by key-ring" |
| 122 | ), |
| 123 | ) |
| 124 | # Also verify that the embedded public_key matches the key_id |
| 125 | expected_kid = KeyRing.fingerprint(signed.public_key) |
| 126 | if expected_kid != signed.manufacturer_key_id: |
| 127 | return VerificationResult( |
| 128 | valid=False, |
| 129 | signature_valid=False, |
| 130 | hash_consistent=hash_consistent, |
| 131 | key_trusted=False, |
| 132 | signer_did=signer_did, |
| 133 | firmware_name=firmware_name, |
| 134 | error=( |
| 135 | "embedded public_key fingerprint does not match " |
| 136 | "manufacturer_key_id" |
| 137 | ), |
| 138 | ) |
| 139 | |
| 140 | # --- signature check -------------------------------------------- |
| 141 | try: |
| 142 | algorithm = SignatureAlgorithm(signed.algorithm) |
| 143 | except ValueError: |
| 144 | return VerificationResult( |
| 145 | valid=False, |
| 146 | signature_valid=False, |
| 147 | hash_consistent=hash_consistent, |
| 148 | key_trusted=key_trusted, |
| 149 | signer_did=signer_did, |
| 150 | firmware_name=firmware_name, |
| 151 | error=f"unknown algorithm {signed.algorithm}", |
| 152 | ) |
| 153 | |
| 154 | manifest = signed.firmware.canonical_manifest_bytes() |
| 155 | try: |
| 156 | sig_ok = verify( |
| 157 | manifest, |
| 158 | bytes.fromhex(signed.signature), |
| 159 | bytes.fromhex(signed.public_key), |
| 160 | algorithm, |
| 161 | ) |
| 162 | except Exception as exc: # noqa: BLE001 |
| 163 | return VerificationResult( |
| 164 | valid=False, |
| 165 | signature_valid=False, |
| 166 | hash_consistent=hash_consistent, |
| 167 | key_trusted=key_trusted, |
| 168 | signer_did=signer_did, |
| 169 | firmware_name=firmware_name, |
| 170 | error=f"signature verify failed: {exc}", |
| 171 | ) |
| 172 | |
| 173 | if not sig_ok: |
| 174 | return VerificationResult( |
| 175 | valid=False, |
| 176 | signature_valid=False, |
| 177 | hash_consistent=hash_consistent, |
| 178 | key_trusted=key_trusted, |
| 179 | signer_did=signer_did, |
| 180 | firmware_name=firmware_name, |
| 181 | error="invalid ML-DSA signature", |
| 182 | ) |
| 183 | |
| 184 | return VerificationResult( |
| 185 | valid=True, |
| 186 | signature_valid=True, |
| 187 | hash_consistent=hash_consistent, |
| 188 | key_trusted=key_trusted, |
| 189 | signer_did=signer_did, |
| 190 | firmware_name=firmware_name, |
| 191 | ) |
| 192 | |
| 193 | @staticmethod |
| 194 | def verify_or_raise( |
| 195 | signed: SignedFirmware, |
| 196 | actual_bytes: bytes | None = None, |
| 197 | key_ring: KeyRing | None = None, |
| 198 | ) -> VerificationResult: |
| 199 | """Like verify() but raises FirmwareVerificationError on failure.""" |
| 200 | result = FirmwareVerifier.verify(signed, actual_bytes, key_ring) |
| 201 | if not result.valid: |
| 202 | raise FirmwareVerificationError( |
| 203 | f"firmware {result.firmware_name!r} failed verification: {result.error}" |
| 204 | ) |
| 205 | return result |
| 206 | |