src/pqc_ebpf_attestation/signer.py
5.5 KB · 166 lines · python Raw
1 """BPFSigner produces SignedBPFProgram envelopes; BPFVerifier checks them."""
2
3 from __future__ import annotations
4
5 import hashlib
6 from dataclasses import dataclass
7 from datetime import datetime, timezone
8 from typing import Any
9
10 from quantumshield.core.algorithms import SignatureAlgorithm
11 from quantumshield.core.signatures import sign, verify
12 from quantumshield.identity.agent import AgentIdentity
13
14 from pqc_ebpf_attestation.program import BPFProgram, BPFProgramMetadata, BPFProgramType
15
16
17 @dataclass
18 class SignedBPFProgram:
19 """A BPFProgram + signature envelope.
20
21 The signature is over the program's canonical manifest (metadata + hash),
22 not the raw bytecode, so the envelope stays small.
23 """
24
25 program: BPFProgram
26 signer_did: str
27 algorithm: str
28 signature: str # hex
29 public_key: str # hex
30 signed_at: str
31
32 def to_dict(self, include_bytecode: bool = True) -> dict[str, Any]:
33 return {
34 "program": self.program.to_dict(include_bytecode=include_bytecode),
35 "signer_did": self.signer_did,
36 "algorithm": self.algorithm,
37 "signature": self.signature,
38 "public_key": self.public_key,
39 "signed_at": self.signed_at,
40 }
41
42 @classmethod
43 def from_dict(cls, data: dict[str, Any]) -> SignedBPFProgram:
44 import base64
45
46 prog_data = data["program"]
47 meta_data = prog_data["metadata"]
48 metadata = BPFProgramMetadata(
49 name=meta_data["name"],
50 program_type=BPFProgramType(meta_data["program_type"]),
51 license=meta_data.get("license", "GPL"),
52 author=meta_data.get("author", ""),
53 description=meta_data.get("description", ""),
54 version=meta_data.get("version", ""),
55 kernel_min=meta_data.get("kernel_min", ""),
56 attach_point=meta_data.get("attach_point", ""),
57 )
58 # Bytecode may or may not be present
59 bytecode = b""
60 if "bytecode_base64" in prog_data:
61 bytecode = base64.b64decode(prog_data["bytecode_base64"])
62 program = BPFProgram(
63 metadata=metadata,
64 bytecode=bytecode,
65 bytecode_hash=prog_data.get("bytecode_hash", ""),
66 bytecode_size=int(prog_data.get("bytecode_size", 0)),
67 )
68 return cls(
69 program=program,
70 signer_did=data["signer_did"],
71 algorithm=data["algorithm"],
72 signature=data["signature"],
73 public_key=data["public_key"],
74 signed_at=data.get("signed_at", ""),
75 )
76
77
78 @dataclass(frozen=True)
79 class VerificationResult:
80 valid: bool
81 signature_valid: bool
82 hash_consistent: bool # stored bytecode_hash == hash(actual bytecode)
83 signer_did: str | None
84 program_name: str
85 error: str | None = None
86
87
88 class BPFSigner:
89 """Signs BPFPrograms with an AgentIdentity."""
90
91 def __init__(self, identity: AgentIdentity):
92 self.identity = identity
93
94 def sign(self, program: BPFProgram) -> SignedBPFProgram:
95 canonical = program.canonical_manifest_bytes()
96 digest = hashlib.sha3_256(canonical).digest()
97 sig = sign(digest, self.identity.signing_keypair)
98 return SignedBPFProgram(
99 program=program,
100 signer_did=self.identity.did,
101 algorithm=self.identity.signing_keypair.algorithm.value,
102 signature=sig.hex(),
103 public_key=self.identity.signing_keypair.public_key.hex(),
104 signed_at=datetime.now(timezone.utc).isoformat(),
105 )
106
107
108 class BPFVerifier:
109 """Independently verify SignedBPFProgram envelopes."""
110
111 @staticmethod
112 def verify(signed: SignedBPFProgram) -> VerificationResult:
113 # Check bytecode hash matches stored hash when bytecode is present
114 hash_ok = True
115 if signed.program.bytecode:
116 actual = BPFProgram.hash_bytecode(signed.program.bytecode)
117 hash_ok = actual == signed.program.bytecode_hash
118
119 # Signature
120 try:
121 algorithm = SignatureAlgorithm(signed.algorithm)
122 except ValueError:
123 return VerificationResult(
124 valid=False,
125 signature_valid=False,
126 hash_consistent=hash_ok,
127 signer_did=signed.signer_did,
128 program_name=signed.program.metadata.name,
129 error=f"unknown algorithm {signed.algorithm}",
130 )
131
132 canonical = signed.program.canonical_manifest_bytes()
133 digest = hashlib.sha3_256(canonical).digest()
134 try:
135 sig_ok = verify(
136 digest,
137 bytes.fromhex(signed.signature),
138 bytes.fromhex(signed.public_key),
139 algorithm,
140 )
141 except Exception as exc: # noqa: BLE001 - surface backend failures uniformly
142 return VerificationResult(
143 valid=False,
144 signature_valid=False,
145 hash_consistent=hash_ok,
146 signer_did=signed.signer_did,
147 program_name=signed.program.metadata.name,
148 error=f"signature verify failed: {exc}",
149 )
150
151 err = None
152 valid = sig_ok and hash_ok
153 if not sig_ok:
154 err = "invalid ML-DSA signature"
155 elif not hash_ok:
156 err = "bytecode hash does not match stored hash"
157
158 return VerificationResult(
159 valid=valid,
160 signature_valid=sig_ok,
161 hash_consistent=hash_ok,
162 signer_did=signed.signer_did,
163 program_name=signed.program.metadata.name,
164 error=err,
165 )
166