src/pqc_ebpf_attestation/program.py
2.8 KB · 99 lines · python Raw
1 """eBPF program data structures."""
2
3 from __future__ import annotations
4
5 import hashlib
6 import json
7 from dataclasses import asdict, dataclass
8 from enum import Enum
9 from typing import Any
10
11
12 class BPFProgramType(str, Enum):
13 """eBPF program attach types the verifier understands."""
14
15 KPROBE = "kprobe"
16 KRETPROBE = "kretprobe"
17 TRACEPOINT = "tracepoint"
18 XDP = "xdp"
19 SOCKET_FILTER = "socket_filter"
20 CGROUP_SKB = "cgroup_skb"
21 LSM = "lsm"
22 TRACING = "tracing" # fentry/fexit
23 PERF_EVENT = "perf_event"
24 SCHED_CLS = "sched_cls"
25 SYSCALL = "syscall"
26 USER = "user" # userspace helper
27 UNKNOWN = "unknown"
28
29
30 @dataclass(frozen=True)
31 class BPFProgramMetadata:
32 """Non-bytecode metadata about an eBPF program."""
33
34 name: str
35 program_type: BPFProgramType
36 license: str = "GPL"
37 author: str = ""
38 description: str = ""
39 version: str = ""
40 kernel_min: str = "" # e.g. "5.15"
41 attach_point: str = "" # e.g. "sys_bpf", "sys_enter_read"
42
43 def to_dict(self) -> dict[str, Any]:
44 d = asdict(self)
45 d["program_type"] = self.program_type.value
46 return d
47
48
49 @dataclass
50 class BPFProgram:
51 """A single eBPF program: bytecode + metadata + hash."""
52
53 metadata: BPFProgramMetadata
54 bytecode: bytes # the raw ELF bytes or BPF instructions
55 bytecode_hash: str = "" # hex SHA3-256 of bytecode
56 bytecode_size: int = 0
57
58 @staticmethod
59 def hash_bytecode(bytecode: bytes) -> str:
60 return hashlib.sha3_256(bytecode).hexdigest()
61
62 @classmethod
63 def from_bytes(cls, metadata: BPFProgramMetadata, bytecode: bytes) -> BPFProgram:
64 return cls(
65 metadata=metadata,
66 bytecode=bytecode,
67 bytecode_hash=cls.hash_bytecode(bytecode),
68 bytecode_size=len(bytecode),
69 )
70
71 @classmethod
72 def from_file(cls, metadata: BPFProgramMetadata, path: str) -> BPFProgram:
73 with open(path, "rb") as f:
74 bytecode = f.read()
75 return cls.from_bytes(metadata, bytecode)
76
77 def canonical_manifest_bytes(self) -> bytes:
78 """Deterministic bytes that are signed (metadata + hash, NOT raw bytecode)."""
79 payload = {
80 "metadata": self.metadata.to_dict(),
81 "bytecode_hash": self.bytecode_hash,
82 "bytecode_size": self.bytecode_size,
83 }
84 return json.dumps(
85 payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False
86 ).encode("utf-8")
87
88 def to_dict(self, include_bytecode: bool = False) -> dict[str, Any]:
89 d: dict[str, Any] = {
90 "metadata": self.metadata.to_dict(),
91 "bytecode_hash": self.bytecode_hash,
92 "bytecode_size": self.bytecode_size,
93 }
94 if include_bytecode:
95 import base64
96
97 d["bytecode_base64"] = base64.b64encode(self.bytecode).decode("ascii")
98 return d
99