src/pqc_audit_log_fs/appender.py
7.5 KB · 213 lines · python Raw
1 """LogAppender - append-only writer producing rotating AuditSegments on disk."""
2
3 from __future__ import annotations
4
5 import hashlib
6 import json
7 import os
8 import uuid
9 from dataclasses import dataclass
10 from datetime import datetime, timezone
11 from types import TracebackType
12
13 from quantumshield.core.signatures import sign
14 from quantumshield.identity.agent import AgentIdentity
15
16 from pqc_audit_log_fs.errors import AppendToSealedSegmentError
17 from pqc_audit_log_fs.event import InferenceEvent
18 from pqc_audit_log_fs.segment import AuditSegment, SegmentHeader
19
20
21 @dataclass
22 class RotationPolicy:
23 """Rotate when the current segment hits either threshold."""
24
25 max_events_per_segment: int = 10_000
26 max_bytes_per_segment: int = 10 * 1024 * 1024 # 10 MB default
27 max_segment_age_seconds: int = 3600 # 1 hour
28
29 def should_rotate(
30 self, event_count: int, bytes_written: int, created_at: str
31 ) -> bool:
32 if event_count >= self.max_events_per_segment:
33 return True
34 if bytes_written >= self.max_bytes_per_segment:
35 return True
36 try:
37 created = datetime.fromisoformat(created_at)
38 age = (datetime.now(timezone.utc) - created).total_seconds()
39 if age >= self.max_segment_age_seconds:
40 return True
41 except ValueError:
42 pass
43 return False
44
45
46 class LogAppender:
47 """Append-only writer that rotates into numbered segments on disk.
48
49 Directory layout::
50
51 <log_dir>/
52 segment-00001.log (jsonl events, one per line)
53 segment-00001.sig.json (signed SegmentHeader)
54 segment-00002.log
55 segment-00002.sig.json
56 ...
57
58 Segments chain via previous_segment_root in the header.
59 """
60
61 def __init__(
62 self,
63 log_dir: str,
64 signer: AgentIdentity,
65 log_id: str | None = None,
66 rotation: RotationPolicy | None = None,
67 ) -> None:
68 os.makedirs(log_dir, exist_ok=True)
69 self.log_dir = log_dir
70 self.signer = signer
71 self.log_id = log_id or f"urn:pqc-audit-log:{uuid.uuid4().hex}"
72 self.rotation = rotation or RotationPolicy()
73
74 # State
75 self._current_segment_number: int = self._detect_next_segment_number()
76 self._current_events: list[InferenceEvent] = []
77 self._current_created_at: str = datetime.now(timezone.utc).isoformat()
78 self._current_bytes_written: int = 0
79 self._previous_segment_root: str = self._read_previous_segment_root()
80
81 # Open jsonl file for current segment
82 self._jsonl_path = self._segment_jsonl_path(self._current_segment_number)
83 self._jsonl_file = open(self._jsonl_path, "a", encoding="utf-8")
84 self._sealed = False
85
86 # -- paths --------------------------------------------------------------
87
88 def _segment_jsonl_path(self, n: int) -> str:
89 return os.path.join(self.log_dir, f"segment-{n:05d}.log")
90
91 def _segment_sig_path(self, n: int) -> str:
92 return os.path.join(self.log_dir, f"segment-{n:05d}.sig.json")
93
94 # -- state recovery -----------------------------------------------------
95
96 def _detect_next_segment_number(self) -> int:
97 max_n = 0
98 names = os.listdir(self.log_dir) if os.path.isdir(self.log_dir) else []
99 for name in names:
100 if name.startswith("segment-") and name.endswith(".sig.json"):
101 try:
102 n = int(name[len("segment-"): len("segment-") + 5])
103 max_n = max(max_n, n)
104 except ValueError:
105 continue
106 return max_n + 1
107
108 def _read_previous_segment_root(self) -> str:
109 if self._current_segment_number <= 1:
110 return ""
111 prev_sig = self._segment_sig_path(self._current_segment_number - 1)
112 if not os.path.exists(prev_sig):
113 return ""
114 try:
115 with open(prev_sig, "r", encoding="utf-8") as f:
116 data = json.load(f)
117 return str(data.get("merkle_root", ""))
118 except (OSError, json.JSONDecodeError):
119 return ""
120
121 # -- append / rotate ----------------------------------------------------
122
123 def append(self, event: InferenceEvent) -> None:
124 if self._sealed:
125 raise AppendToSealedSegmentError("cannot append to a closed appender")
126 line = event.to_jsonl() + "\n"
127 self._jsonl_file.write(line)
128 self._jsonl_file.flush()
129 try:
130 os.fsync(self._jsonl_file.fileno())
131 except (OSError, ValueError):
132 pass
133 self._current_events.append(event)
134 self._current_bytes_written += len(line.encode("utf-8"))
135
136 if self.rotation.should_rotate(
137 len(self._current_events),
138 self._current_bytes_written,
139 self._current_created_at,
140 ):
141 self.seal_current_segment()
142
143 def seal_current_segment(self) -> SegmentHeader | None:
144 """Seal the current segment with a signed header and start a new one."""
145 if not self._current_events:
146 return None
147
148 header = SegmentHeader(
149 segment_id=f"segment-{self._current_segment_number:05d}",
150 segment_number=self._current_segment_number,
151 created_at=self._current_created_at,
152 sealed_at=datetime.now(timezone.utc).isoformat(),
153 event_count=len(self._current_events),
154 merkle_root="",
155 previous_segment_root=self._previous_segment_root,
156 log_id=self.log_id,
157 )
158 segment = AuditSegment(header=header, events=list(self._current_events))
159 segment.recompute_root()
160
161 digest = hashlib.sha3_256(header.canonical_bytes()).digest()
162 sig = sign(digest, self.signer.signing_keypair)
163 header.signer_did = self.signer.did
164 header.algorithm = self.signer.signing_keypair.algorithm.value
165 header.signature = sig.hex()
166 header.public_key = self.signer.signing_keypair.public_key.hex()
167
168 sig_path = self._segment_sig_path(self._current_segment_number)
169 with open(sig_path, "w", encoding="utf-8") as f:
170 json.dump(header.to_dict(), f, indent=2)
171
172 # Close current jsonl file
173 self._jsonl_file.close()
174
175 # Roll over state for next segment
176 self._previous_segment_root = header.merkle_root
177 self._current_segment_number += 1
178 self._current_events = []
179 self._current_created_at = datetime.now(timezone.utc).isoformat()
180 self._current_bytes_written = 0
181 self._jsonl_path = self._segment_jsonl_path(self._current_segment_number)
182 self._jsonl_file = open(self._jsonl_path, "a", encoding="utf-8")
183 return header
184
185 def close(self) -> SegmentHeader | None:
186 """Seal the current segment (if any) and close the file."""
187 header: SegmentHeader | None = None
188 if self._current_events:
189 header = self.seal_current_segment()
190 self._sealed = True
191 try:
192 self._jsonl_file.close()
193 except Exception:
194 pass
195 # Remove the empty jsonl file created for the rolled-over next segment
196 try:
197 if os.path.exists(self._jsonl_path) and os.path.getsize(self._jsonl_path) == 0:
198 os.remove(self._jsonl_path)
199 except OSError:
200 pass
201 return header
202
203 def __enter__(self) -> LogAppender:
204 return self
205
206 def __exit__(
207 self,
208 exc_type: type[BaseException] | None,
209 exc: BaseException | None,
210 tb: TracebackType | None,
211 ) -> None:
212 self.close()
213