src/pqc_ebpf_attestation/program.py
| 1 | """eBPF program data structures.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | import json |
| 7 | from dataclasses import asdict, dataclass |
| 8 | from enum import Enum |
| 9 | from typing import Any |
| 10 | |
| 11 | |
| 12 | class BPFProgramType(str, Enum): |
| 13 | """eBPF program attach types the verifier understands.""" |
| 14 | |
| 15 | KPROBE = "kprobe" |
| 16 | KRETPROBE = "kretprobe" |
| 17 | TRACEPOINT = "tracepoint" |
| 18 | XDP = "xdp" |
| 19 | SOCKET_FILTER = "socket_filter" |
| 20 | CGROUP_SKB = "cgroup_skb" |
| 21 | LSM = "lsm" |
| 22 | TRACING = "tracing" # fentry/fexit |
| 23 | PERF_EVENT = "perf_event" |
| 24 | SCHED_CLS = "sched_cls" |
| 25 | SYSCALL = "syscall" |
| 26 | USER = "user" # userspace helper |
| 27 | UNKNOWN = "unknown" |
| 28 | |
| 29 | |
| 30 | @dataclass(frozen=True) |
| 31 | class BPFProgramMetadata: |
| 32 | """Non-bytecode metadata about an eBPF program.""" |
| 33 | |
| 34 | name: str |
| 35 | program_type: BPFProgramType |
| 36 | license: str = "GPL" |
| 37 | author: str = "" |
| 38 | description: str = "" |
| 39 | version: str = "" |
| 40 | kernel_min: str = "" # e.g. "5.15" |
| 41 | attach_point: str = "" # e.g. "sys_bpf", "sys_enter_read" |
| 42 | |
| 43 | def to_dict(self) -> dict[str, Any]: |
| 44 | d = asdict(self) |
| 45 | d["program_type"] = self.program_type.value |
| 46 | return d |
| 47 | |
| 48 | |
| 49 | @dataclass |
| 50 | class BPFProgram: |
| 51 | """A single eBPF program: bytecode + metadata + hash.""" |
| 52 | |
| 53 | metadata: BPFProgramMetadata |
| 54 | bytecode: bytes # the raw ELF bytes or BPF instructions |
| 55 | bytecode_hash: str = "" # hex SHA3-256 of bytecode |
| 56 | bytecode_size: int = 0 |
| 57 | |
| 58 | @staticmethod |
| 59 | def hash_bytecode(bytecode: bytes) -> str: |
| 60 | return hashlib.sha3_256(bytecode).hexdigest() |
| 61 | |
| 62 | @classmethod |
| 63 | def from_bytes(cls, metadata: BPFProgramMetadata, bytecode: bytes) -> BPFProgram: |
| 64 | return cls( |
| 65 | metadata=metadata, |
| 66 | bytecode=bytecode, |
| 67 | bytecode_hash=cls.hash_bytecode(bytecode), |
| 68 | bytecode_size=len(bytecode), |
| 69 | ) |
| 70 | |
| 71 | @classmethod |
| 72 | def from_file(cls, metadata: BPFProgramMetadata, path: str) -> BPFProgram: |
| 73 | with open(path, "rb") as f: |
| 74 | bytecode = f.read() |
| 75 | return cls.from_bytes(metadata, bytecode) |
| 76 | |
| 77 | def canonical_manifest_bytes(self) -> bytes: |
| 78 | """Deterministic bytes that are signed (metadata + hash, NOT raw bytecode).""" |
| 79 | payload = { |
| 80 | "metadata": self.metadata.to_dict(), |
| 81 | "bytecode_hash": self.bytecode_hash, |
| 82 | "bytecode_size": self.bytecode_size, |
| 83 | } |
| 84 | return json.dumps( |
| 85 | payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False |
| 86 | ).encode("utf-8") |
| 87 | |
| 88 | def to_dict(self, include_bytecode: bool = False) -> dict[str, Any]: |
| 89 | d: dict[str, Any] = { |
| 90 | "metadata": self.metadata.to_dict(), |
| 91 | "bytecode_hash": self.bytecode_hash, |
| 92 | "bytecode_size": self.bytecode_size, |
| 93 | } |
| 94 | if include_bytecode: |
| 95 | import base64 |
| 96 | |
| 97 | d["bytecode_base64"] = base64.b64encode(self.bytecode).decode("ascii") |
| 98 | return d |
| 99 | |