src/pqc_ebpf_attestation/policy.py
3.0 KB · 91 lines · python Raw
1 """LoadPolicy - decide whether a signed program may be loaded."""
2
3 from __future__ import annotations
4
5 from dataclasses import dataclass, field
6 from enum import Enum
7
8 from pqc_ebpf_attestation.errors import PolicyDeniedError, UntrustedSignerError
9 from pqc_ebpf_attestation.program import BPFProgramType
10 from pqc_ebpf_attestation.signer import BPFVerifier, SignedBPFProgram
11
12
13 class PolicyDecision(str, Enum):
14 ALLOW = "allow"
15 DENY = "deny"
16
17
18 @dataclass
19 class PolicyRule:
20 """One rule in a LoadPolicy."""
21
22 program_types: tuple[BPFProgramType, ...] # which types the rule applies to
23 allowed_signers: frozenset[str] # DIDs permitted to load these types
24 require_signature: bool = True
25 max_bytecode_size: int = 2 * 1024 * 1024 # 2 MB default cap
26
27 def applies_to(self, program_type: BPFProgramType) -> bool:
28 return program_type in self.program_types
29
30
31 @dataclass
32 class LoadPolicy:
33 """An ordered list of rules. First matching rule wins.
34
35 Default (with no rules) denies everything.
36 """
37
38 rules: list[PolicyRule] = field(default_factory=list)
39 default_decision: PolicyDecision = PolicyDecision.DENY
40
41 def add_rule(self, rule: PolicyRule) -> LoadPolicy:
42 self.rules.append(rule)
43 return self
44
45 def evaluate(self, signed: SignedBPFProgram) -> tuple[PolicyDecision, str]:
46 """Decide whether this signed program may load. Returns (decision, reason)."""
47 # Find first matching rule
48 matching: PolicyRule | None = None
49 for rule in self.rules:
50 if rule.applies_to(signed.program.metadata.program_type):
51 matching = rule
52 break
53
54 if matching is None:
55 return (
56 self.default_decision,
57 f"no rule for program_type={signed.program.metadata.program_type.value}",
58 )
59
60 # Size check
61 if signed.program.bytecode_size > matching.max_bytecode_size:
62 return (
63 PolicyDecision.DENY,
64 f"bytecode size {signed.program.bytecode_size} exceeds cap "
65 f"{matching.max_bytecode_size}",
66 )
67
68 # Signature check
69 if matching.require_signature:
70 result = BPFVerifier.verify(signed)
71 if not result.valid:
72 return PolicyDecision.DENY, result.error or "signature invalid"
73
74 # Signer allow-list
75 if matching.allowed_signers and signed.signer_did not in matching.allowed_signers:
76 return (
77 PolicyDecision.DENY,
78 f"signer {signed.signer_did} not in allow-list",
79 )
80
81 return PolicyDecision.ALLOW, "policy rule matched; all checks passed"
82
83 def enforce(self, signed: SignedBPFProgram) -> None:
84 """Raise if the program would be denied."""
85 decision, reason = self.evaluate(signed)
86 if decision == PolicyDecision.DENY:
87 # Distinguish untrusted signer vs general deny
88 if "not in allow-list" in reason:
89 raise UntrustedSignerError(reason)
90 raise PolicyDeniedError(reason)
91