src/pqc_ebpf_attestation/policy.py
| 1 | """LoadPolicy - decide whether a signed program may be loaded.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from dataclasses import dataclass, field |
| 6 | from enum import Enum |
| 7 | |
| 8 | from pqc_ebpf_attestation.errors import PolicyDeniedError, UntrustedSignerError |
| 9 | from pqc_ebpf_attestation.program import BPFProgramType |
| 10 | from pqc_ebpf_attestation.signer import BPFVerifier, SignedBPFProgram |
| 11 | |
| 12 | |
| 13 | class PolicyDecision(str, Enum): |
| 14 | ALLOW = "allow" |
| 15 | DENY = "deny" |
| 16 | |
| 17 | |
| 18 | @dataclass |
| 19 | class PolicyRule: |
| 20 | """One rule in a LoadPolicy.""" |
| 21 | |
| 22 | program_types: tuple[BPFProgramType, ...] # which types the rule applies to |
| 23 | allowed_signers: frozenset[str] # DIDs permitted to load these types |
| 24 | require_signature: bool = True |
| 25 | max_bytecode_size: int = 2 * 1024 * 1024 # 2 MB default cap |
| 26 | |
| 27 | def applies_to(self, program_type: BPFProgramType) -> bool: |
| 28 | return program_type in self.program_types |
| 29 | |
| 30 | |
| 31 | @dataclass |
| 32 | class LoadPolicy: |
| 33 | """An ordered list of rules. First matching rule wins. |
| 34 | |
| 35 | Default (with no rules) denies everything. |
| 36 | """ |
| 37 | |
| 38 | rules: list[PolicyRule] = field(default_factory=list) |
| 39 | default_decision: PolicyDecision = PolicyDecision.DENY |
| 40 | |
| 41 | def add_rule(self, rule: PolicyRule) -> LoadPolicy: |
| 42 | self.rules.append(rule) |
| 43 | return self |
| 44 | |
| 45 | def evaluate(self, signed: SignedBPFProgram) -> tuple[PolicyDecision, str]: |
| 46 | """Decide whether this signed program may load. Returns (decision, reason).""" |
| 47 | # Find first matching rule |
| 48 | matching: PolicyRule | None = None |
| 49 | for rule in self.rules: |
| 50 | if rule.applies_to(signed.program.metadata.program_type): |
| 51 | matching = rule |
| 52 | break |
| 53 | |
| 54 | if matching is None: |
| 55 | return ( |
| 56 | self.default_decision, |
| 57 | f"no rule for program_type={signed.program.metadata.program_type.value}", |
| 58 | ) |
| 59 | |
| 60 | # Size check |
| 61 | if signed.program.bytecode_size > matching.max_bytecode_size: |
| 62 | return ( |
| 63 | PolicyDecision.DENY, |
| 64 | f"bytecode size {signed.program.bytecode_size} exceeds cap " |
| 65 | f"{matching.max_bytecode_size}", |
| 66 | ) |
| 67 | |
| 68 | # Signature check |
| 69 | if matching.require_signature: |
| 70 | result = BPFVerifier.verify(signed) |
| 71 | if not result.valid: |
| 72 | return PolicyDecision.DENY, result.error or "signature invalid" |
| 73 | |
| 74 | # Signer allow-list |
| 75 | if matching.allowed_signers and signed.signer_did not in matching.allowed_signers: |
| 76 | return ( |
| 77 | PolicyDecision.DENY, |
| 78 | f"signer {signed.signer_did} not in allow-list", |
| 79 | ) |
| 80 | |
| 81 | return PolicyDecision.ALLOW, "policy rule matched; all checks passed" |
| 82 | |
| 83 | def enforce(self, signed: SignedBPFProgram) -> None: |
| 84 | """Raise if the program would be denied.""" |
| 85 | decision, reason = self.evaluate(signed) |
| 86 | if decision == PolicyDecision.DENY: |
| 87 | # Distinguish untrusted signer vs general deny |
| 88 | if "not in allow-list" in reason: |
| 89 | raise UntrustedSignerError(reason) |
| 90 | raise PolicyDeniedError(reason) |
| 91 | |