tests/test_integration.py
2.5 KB · 81 lines · python Raw
1 """End-to-end integration tests for pqc-ebpf-attestation."""
2
3 from __future__ import annotations
4
5 from dataclasses import replace
6
7 from pqc_ebpf_attestation import (
8 AttestationLog,
9 BPFProgram,
10 BPFProgramType,
11 BPFSigner,
12 LoadPolicy,
13 PolicyDecision,
14 PolicyRule,
15 )
16
17
18 def test_full_sign_enforce_and_log(
19 signer, untrusted_identity, sample_bpf_metadata, sample_bytecode
20 ) -> None:
21 # 1. Sign a program with a trusted identity.
22 program = BPFProgram.from_bytes(sample_bpf_metadata, sample_bytecode)
23 signed = signer.sign(program)
24
25 # 2. Build a policy that only allows the trusted signer for KPROBE programs.
26 policy = LoadPolicy().add_rule(
27 PolicyRule(
28 program_types=(BPFProgramType.KPROBE,),
29 allowed_signers=frozenset({signer.identity.did}),
30 )
31 )
32
33 # 3. Trusted signed program is allowed and logged.
34 log = AttestationLog()
35 decision, reason = policy.evaluate(signed)
36 log.log(signed, decision, reason)
37 assert decision == PolicyDecision.ALLOW
38
39 # 4. Untrusted signer is denied and logged.
40 untrusted_signer = BPFSigner(untrusted_identity)
41 untrusted_signed = untrusted_signer.sign(program)
42 decision2, reason2 = policy.evaluate(untrusted_signed)
43 log.log(untrusted_signed, decision2, reason2)
44 assert decision2 == PolicyDecision.DENY
45 assert "not in allow-list" in reason2
46
47 # 5. Audit log contains exactly two entries.
48 entries = log.entries()
49 assert len(entries) == 2
50 decisions = {e.decision for e in entries}
51 assert decisions == {"allow", "deny"}
52
53
54 def test_tampered_bytecode_rejected_and_logged(
55 signer, sample_bpf_metadata, sample_bytecode
56 ) -> None:
57 program = BPFProgram.from_bytes(sample_bpf_metadata, sample_bytecode)
58 signed = signer.sign(program)
59
60 # Tamper with the bytecode after signing.
61 tampered_program = replace(
62 signed.program, bytecode=signed.program.bytecode + b"\xde\xad\xbe\xef"
63 )
64 tampered = replace(signed, program=tampered_program)
65
66 policy = LoadPolicy().add_rule(
67 PolicyRule(
68 program_types=(BPFProgramType.KPROBE,),
69 allowed_signers=frozenset({signer.identity.did}),
70 )
71 )
72 log = AttestationLog()
73 decision, reason = policy.evaluate(tampered)
74 log.log(tampered, decision, reason)
75
76 assert decision == PolicyDecision.DENY
77 assert "hash" in reason.lower() or "signature" in reason.lower()
78 entries = log.entries(decision="deny")
79 assert len(entries) == 1
80 assert entries[0].program_name == sample_bpf_metadata.name
81