src/pqc_enclave_sdk/attestation.py
| 1 | """Device attestation - signed claim that an artifact was stored on a genuine device.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | import json |
| 7 | from dataclasses import asdict, dataclass |
| 8 | from datetime import datetime, timezone |
| 9 | from typing import Any |
| 10 | |
| 11 | from quantumshield.core.algorithms import SignatureAlgorithm |
| 12 | from quantumshield.core.signatures import sign, verify |
| 13 | from quantumshield.identity.agent import AgentIdentity |
| 14 | |
| 15 | from pqc_enclave_sdk.errors import AttestationError |
| 16 | |
| 17 | |
| 18 | @dataclass |
| 19 | class DeviceAttestation: |
| 20 | """Signed claim that a specific artifact was stored in a specific device enclave.""" |
| 21 | |
| 22 | device_id: str |
| 23 | device_model: str |
| 24 | enclave_vendor: str |
| 25 | artifact_id: str |
| 26 | artifact_content_hash: str |
| 27 | issued_at: str |
| 28 | signer_did: str = "" |
| 29 | algorithm: str = "" |
| 30 | signature: str = "" |
| 31 | public_key: str = "" |
| 32 | |
| 33 | def canonical_bytes(self) -> bytes: |
| 34 | payload = { |
| 35 | "device_id": self.device_id, |
| 36 | "device_model": self.device_model, |
| 37 | "enclave_vendor": self.enclave_vendor, |
| 38 | "artifact_id": self.artifact_id, |
| 39 | "artifact_content_hash": self.artifact_content_hash, |
| 40 | "issued_at": self.issued_at, |
| 41 | } |
| 42 | return json.dumps( |
| 43 | payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False |
| 44 | ).encode("utf-8") |
| 45 | |
| 46 | def to_dict(self) -> dict[str, Any]: |
| 47 | return asdict(self) |
| 48 | |
| 49 | @classmethod |
| 50 | def from_dict(cls, data: dict[str, Any]) -> DeviceAttestation: |
| 51 | return cls(**data) |
| 52 | |
| 53 | |
| 54 | class DeviceAttester: |
| 55 | """Produce and verify DeviceAttestations using an AgentIdentity bound to the device.""" |
| 56 | |
| 57 | def __init__( |
| 58 | self, |
| 59 | identity: AgentIdentity, |
| 60 | device_id: str, |
| 61 | device_model: str, |
| 62 | enclave_vendor: str, |
| 63 | ) -> None: |
| 64 | self.identity = identity |
| 65 | self.device_id = device_id |
| 66 | self.device_model = device_model |
| 67 | self.enclave_vendor = enclave_vendor |
| 68 | |
| 69 | def attest(self, artifact_id: str, content_hash: str) -> DeviceAttestation: |
| 70 | att = DeviceAttestation( |
| 71 | device_id=self.device_id, |
| 72 | device_model=self.device_model, |
| 73 | enclave_vendor=self.enclave_vendor, |
| 74 | artifact_id=artifact_id, |
| 75 | artifact_content_hash=content_hash, |
| 76 | issued_at=datetime.now(timezone.utc).isoformat(), |
| 77 | ) |
| 78 | digest = hashlib.sha3_256(att.canonical_bytes()).digest() |
| 79 | sig = sign(digest, self.identity.signing_keypair) |
| 80 | att.signer_did = self.identity.did |
| 81 | att.algorithm = self.identity.signing_keypair.algorithm.value |
| 82 | att.signature = sig.hex() |
| 83 | att.public_key = self.identity.signing_keypair.public_key.hex() |
| 84 | return att |
| 85 | |
| 86 | @staticmethod |
| 87 | def verify(attestation: DeviceAttestation) -> bool: |
| 88 | if not attestation.signature: |
| 89 | return False |
| 90 | try: |
| 91 | algorithm = SignatureAlgorithm(attestation.algorithm) |
| 92 | except ValueError: |
| 93 | return False |
| 94 | digest = hashlib.sha3_256(attestation.canonical_bytes()).digest() |
| 95 | try: |
| 96 | return verify( |
| 97 | digest, |
| 98 | bytes.fromhex(attestation.signature), |
| 99 | bytes.fromhex(attestation.public_key), |
| 100 | algorithm, |
| 101 | ) |
| 102 | except Exception: |
| 103 | return False |
| 104 | |
| 105 | @staticmethod |
| 106 | def verify_or_raise(attestation: DeviceAttestation) -> None: |
| 107 | if not DeviceAttester.verify(attestation): |
| 108 | raise AttestationError("device attestation signature invalid") |
| 109 | |