tests/test_integration.py
| 1 | """End-to-end integration tests for pqc-ebpf-attestation.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from dataclasses import replace |
| 6 | |
| 7 | from pqc_ebpf_attestation import ( |
| 8 | AttestationLog, |
| 9 | BPFProgram, |
| 10 | BPFProgramType, |
| 11 | BPFSigner, |
| 12 | LoadPolicy, |
| 13 | PolicyDecision, |
| 14 | PolicyRule, |
| 15 | ) |
| 16 | |
| 17 | |
| 18 | def test_full_sign_enforce_and_log( |
| 19 | signer, untrusted_identity, sample_bpf_metadata, sample_bytecode |
| 20 | ) -> None: |
| 21 | # 1. Sign a program with a trusted identity. |
| 22 | program = BPFProgram.from_bytes(sample_bpf_metadata, sample_bytecode) |
| 23 | signed = signer.sign(program) |
| 24 | |
| 25 | # 2. Build a policy that only allows the trusted signer for KPROBE programs. |
| 26 | policy = LoadPolicy().add_rule( |
| 27 | PolicyRule( |
| 28 | program_types=(BPFProgramType.KPROBE,), |
| 29 | allowed_signers=frozenset({signer.identity.did}), |
| 30 | ) |
| 31 | ) |
| 32 | |
| 33 | # 3. Trusted signed program is allowed and logged. |
| 34 | log = AttestationLog() |
| 35 | decision, reason = policy.evaluate(signed) |
| 36 | log.log(signed, decision, reason) |
| 37 | assert decision == PolicyDecision.ALLOW |
| 38 | |
| 39 | # 4. Untrusted signer is denied and logged. |
| 40 | untrusted_signer = BPFSigner(untrusted_identity) |
| 41 | untrusted_signed = untrusted_signer.sign(program) |
| 42 | decision2, reason2 = policy.evaluate(untrusted_signed) |
| 43 | log.log(untrusted_signed, decision2, reason2) |
| 44 | assert decision2 == PolicyDecision.DENY |
| 45 | assert "not in allow-list" in reason2 |
| 46 | |
| 47 | # 5. Audit log contains exactly two entries. |
| 48 | entries = log.entries() |
| 49 | assert len(entries) == 2 |
| 50 | decisions = {e.decision for e in entries} |
| 51 | assert decisions == {"allow", "deny"} |
| 52 | |
| 53 | |
| 54 | def test_tampered_bytecode_rejected_and_logged( |
| 55 | signer, sample_bpf_metadata, sample_bytecode |
| 56 | ) -> None: |
| 57 | program = BPFProgram.from_bytes(sample_bpf_metadata, sample_bytecode) |
| 58 | signed = signer.sign(program) |
| 59 | |
| 60 | # Tamper with the bytecode after signing. |
| 61 | tampered_program = replace( |
| 62 | signed.program, bytecode=signed.program.bytecode + b"\xde\xad\xbe\xef" |
| 63 | ) |
| 64 | tampered = replace(signed, program=tampered_program) |
| 65 | |
| 66 | policy = LoadPolicy().add_rule( |
| 67 | PolicyRule( |
| 68 | program_types=(BPFProgramType.KPROBE,), |
| 69 | allowed_signers=frozenset({signer.identity.did}), |
| 70 | ) |
| 71 | ) |
| 72 | log = AttestationLog() |
| 73 | decision, reason = policy.evaluate(tampered) |
| 74 | log.log(tampered, decision, reason) |
| 75 | |
| 76 | assert decision == PolicyDecision.DENY |
| 77 | assert "hash" in reason.lower() or "signature" in reason.lower() |
| 78 | entries = log.entries(decision="deny") |
| 79 | assert len(entries) == 1 |
| 80 | assert entries[0].program_name == sample_bpf_metadata.name |
| 81 | |