src/pqc_audit_log_fs/segment.py
2.1 KB · 70 lines · python Raw
1 """AuditSegment - a sealed batch of InferenceEvents with Merkle root + ML-DSA sig."""
2
3 from __future__ import annotations
4
5 import json
6 from dataclasses import asdict, dataclass, field
7 from typing import Any
8
9 from pqc_audit_log_fs.event import InferenceEvent
10 from pqc_audit_log_fs.merkle import compute_merkle_root
11
12
13 @dataclass
14 class SegmentHeader:
15 """Signed header for an AuditSegment."""
16
17 segment_id: str # e.g. "segment-00001"
18 segment_number: int # sequential
19 created_at: str
20 sealed_at: str = ""
21 event_count: int = 0
22 merkle_root: str = "" # hex SHA3-256
23 previous_segment_root: str = "" # chain link
24 log_id: str = "" # stable ID of the log this segment belongs to
25
26 # Populated by LogAppender when sealed:
27 signer_did: str = ""
28 algorithm: str = ""
29 signature: str = ""
30 public_key: str = ""
31
32 def canonical_bytes(self) -> bytes:
33 payload = {
34 "segment_id": self.segment_id,
35 "segment_number": self.segment_number,
36 "created_at": self.created_at,
37 "sealed_at": self.sealed_at,
38 "event_count": self.event_count,
39 "merkle_root": self.merkle_root,
40 "previous_segment_root": self.previous_segment_root,
41 "log_id": self.log_id,
42 }
43 return json.dumps(
44 payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False
45 ).encode("utf-8")
46
47 def to_dict(self) -> dict[str, Any]:
48 return asdict(self)
49
50 @classmethod
51 def from_dict(cls, data: dict[str, Any]) -> SegmentHeader:
52 return cls(**data)
53
54 def to_json(self) -> str:
55 return json.dumps(self.to_dict(), indent=2)
56
57
58 @dataclass
59 class AuditSegment:
60 """A sealed batch of events + signed header."""
61
62 header: SegmentHeader
63 events: list[InferenceEvent] = field(default_factory=list)
64
65 def recompute_root(self) -> str:
66 leaves = [e.leaf_hash() for e in self.events]
67 self.header.merkle_root = compute_merkle_root(leaves) if leaves else ""
68 self.header.event_count = len(self.events)
69 return self.header.merkle_root
70