src/pqc_ebpf_attestation/signer.py
| 1 | """BPFSigner produces SignedBPFProgram envelopes; BPFVerifier checks them.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | from dataclasses import dataclass |
| 7 | from datetime import datetime, timezone |
| 8 | from typing import Any |
| 9 | |
| 10 | from quantumshield.core.algorithms import SignatureAlgorithm |
| 11 | from quantumshield.core.signatures import sign, verify |
| 12 | from quantumshield.identity.agent import AgentIdentity |
| 13 | |
| 14 | from pqc_ebpf_attestation.program import BPFProgram, BPFProgramMetadata, BPFProgramType |
| 15 | |
| 16 | |
| 17 | @dataclass |
| 18 | class SignedBPFProgram: |
| 19 | """A BPFProgram + signature envelope. |
| 20 | |
| 21 | The signature is over the program's canonical manifest (metadata + hash), |
| 22 | not the raw bytecode, so the envelope stays small. |
| 23 | """ |
| 24 | |
| 25 | program: BPFProgram |
| 26 | signer_did: str |
| 27 | algorithm: str |
| 28 | signature: str # hex |
| 29 | public_key: str # hex |
| 30 | signed_at: str |
| 31 | |
| 32 | def to_dict(self, include_bytecode: bool = True) -> dict[str, Any]: |
| 33 | return { |
| 34 | "program": self.program.to_dict(include_bytecode=include_bytecode), |
| 35 | "signer_did": self.signer_did, |
| 36 | "algorithm": self.algorithm, |
| 37 | "signature": self.signature, |
| 38 | "public_key": self.public_key, |
| 39 | "signed_at": self.signed_at, |
| 40 | } |
| 41 | |
| 42 | @classmethod |
| 43 | def from_dict(cls, data: dict[str, Any]) -> SignedBPFProgram: |
| 44 | import base64 |
| 45 | |
| 46 | prog_data = data["program"] |
| 47 | meta_data = prog_data["metadata"] |
| 48 | metadata = BPFProgramMetadata( |
| 49 | name=meta_data["name"], |
| 50 | program_type=BPFProgramType(meta_data["program_type"]), |
| 51 | license=meta_data.get("license", "GPL"), |
| 52 | author=meta_data.get("author", ""), |
| 53 | description=meta_data.get("description", ""), |
| 54 | version=meta_data.get("version", ""), |
| 55 | kernel_min=meta_data.get("kernel_min", ""), |
| 56 | attach_point=meta_data.get("attach_point", ""), |
| 57 | ) |
| 58 | # Bytecode may or may not be present |
| 59 | bytecode = b"" |
| 60 | if "bytecode_base64" in prog_data: |
| 61 | bytecode = base64.b64decode(prog_data["bytecode_base64"]) |
| 62 | program = BPFProgram( |
| 63 | metadata=metadata, |
| 64 | bytecode=bytecode, |
| 65 | bytecode_hash=prog_data.get("bytecode_hash", ""), |
| 66 | bytecode_size=int(prog_data.get("bytecode_size", 0)), |
| 67 | ) |
| 68 | return cls( |
| 69 | program=program, |
| 70 | signer_did=data["signer_did"], |
| 71 | algorithm=data["algorithm"], |
| 72 | signature=data["signature"], |
| 73 | public_key=data["public_key"], |
| 74 | signed_at=data.get("signed_at", ""), |
| 75 | ) |
| 76 | |
| 77 | |
| 78 | @dataclass(frozen=True) |
| 79 | class VerificationResult: |
| 80 | valid: bool |
| 81 | signature_valid: bool |
| 82 | hash_consistent: bool # stored bytecode_hash == hash(actual bytecode) |
| 83 | signer_did: str | None |
| 84 | program_name: str |
| 85 | error: str | None = None |
| 86 | |
| 87 | |
| 88 | class BPFSigner: |
| 89 | """Signs BPFPrograms with an AgentIdentity.""" |
| 90 | |
| 91 | def __init__(self, identity: AgentIdentity): |
| 92 | self.identity = identity |
| 93 | |
| 94 | def sign(self, program: BPFProgram) -> SignedBPFProgram: |
| 95 | canonical = program.canonical_manifest_bytes() |
| 96 | digest = hashlib.sha3_256(canonical).digest() |
| 97 | sig = sign(digest, self.identity.signing_keypair) |
| 98 | return SignedBPFProgram( |
| 99 | program=program, |
| 100 | signer_did=self.identity.did, |
| 101 | algorithm=self.identity.signing_keypair.algorithm.value, |
| 102 | signature=sig.hex(), |
| 103 | public_key=self.identity.signing_keypair.public_key.hex(), |
| 104 | signed_at=datetime.now(timezone.utc).isoformat(), |
| 105 | ) |
| 106 | |
| 107 | |
| 108 | class BPFVerifier: |
| 109 | """Independently verify SignedBPFProgram envelopes.""" |
| 110 | |
| 111 | @staticmethod |
| 112 | def verify(signed: SignedBPFProgram) -> VerificationResult: |
| 113 | # Check bytecode hash matches stored hash when bytecode is present |
| 114 | hash_ok = True |
| 115 | if signed.program.bytecode: |
| 116 | actual = BPFProgram.hash_bytecode(signed.program.bytecode) |
| 117 | hash_ok = actual == signed.program.bytecode_hash |
| 118 | |
| 119 | # Signature |
| 120 | try: |
| 121 | algorithm = SignatureAlgorithm(signed.algorithm) |
| 122 | except ValueError: |
| 123 | return VerificationResult( |
| 124 | valid=False, |
| 125 | signature_valid=False, |
| 126 | hash_consistent=hash_ok, |
| 127 | signer_did=signed.signer_did, |
| 128 | program_name=signed.program.metadata.name, |
| 129 | error=f"unknown algorithm {signed.algorithm}", |
| 130 | ) |
| 131 | |
| 132 | canonical = signed.program.canonical_manifest_bytes() |
| 133 | digest = hashlib.sha3_256(canonical).digest() |
| 134 | try: |
| 135 | sig_ok = verify( |
| 136 | digest, |
| 137 | bytes.fromhex(signed.signature), |
| 138 | bytes.fromhex(signed.public_key), |
| 139 | algorithm, |
| 140 | ) |
| 141 | except Exception as exc: # noqa: BLE001 - surface backend failures uniformly |
| 142 | return VerificationResult( |
| 143 | valid=False, |
| 144 | signature_valid=False, |
| 145 | hash_consistent=hash_ok, |
| 146 | signer_did=signed.signer_did, |
| 147 | program_name=signed.program.metadata.name, |
| 148 | error=f"signature verify failed: {exc}", |
| 149 | ) |
| 150 | |
| 151 | err = None |
| 152 | valid = sig_ok and hash_ok |
| 153 | if not sig_ok: |
| 154 | err = "invalid ML-DSA signature" |
| 155 | elif not hash_ok: |
| 156 | err = "bytecode hash does not match stored hash" |
| 157 | |
| 158 | return VerificationResult( |
| 159 | valid=valid, |
| 160 | signature_valid=sig_ok, |
| 161 | hash_consistent=hash_ok, |
| 162 | signer_did=signed.signer_did, |
| 163 | program_name=signed.program.metadata.name, |
| 164 | error=err, |
| 165 | ) |
| 166 | |