src/pqc_audit_log_fs/appender.py
| 1 | """LogAppender - append-only writer producing rotating AuditSegments on disk.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | import json |
| 7 | import os |
| 8 | import uuid |
| 9 | from dataclasses import dataclass |
| 10 | from datetime import datetime, timezone |
| 11 | from types import TracebackType |
| 12 | |
| 13 | from quantumshield.core.signatures import sign |
| 14 | from quantumshield.identity.agent import AgentIdentity |
| 15 | |
| 16 | from pqc_audit_log_fs.errors import AppendToSealedSegmentError |
| 17 | from pqc_audit_log_fs.event import InferenceEvent |
| 18 | from pqc_audit_log_fs.segment import AuditSegment, SegmentHeader |
| 19 | |
| 20 | |
| 21 | @dataclass |
| 22 | class RotationPolicy: |
| 23 | """Rotate when the current segment hits either threshold.""" |
| 24 | |
| 25 | max_events_per_segment: int = 10_000 |
| 26 | max_bytes_per_segment: int = 10 * 1024 * 1024 # 10 MB default |
| 27 | max_segment_age_seconds: int = 3600 # 1 hour |
| 28 | |
| 29 | def should_rotate( |
| 30 | self, event_count: int, bytes_written: int, created_at: str |
| 31 | ) -> bool: |
| 32 | if event_count >= self.max_events_per_segment: |
| 33 | return True |
| 34 | if bytes_written >= self.max_bytes_per_segment: |
| 35 | return True |
| 36 | try: |
| 37 | created = datetime.fromisoformat(created_at) |
| 38 | age = (datetime.now(timezone.utc) - created).total_seconds() |
| 39 | if age >= self.max_segment_age_seconds: |
| 40 | return True |
| 41 | except ValueError: |
| 42 | pass |
| 43 | return False |
| 44 | |
| 45 | |
| 46 | class LogAppender: |
| 47 | """Append-only writer that rotates into numbered segments on disk. |
| 48 | |
| 49 | Directory layout:: |
| 50 | |
| 51 | <log_dir>/ |
| 52 | segment-00001.log (jsonl events, one per line) |
| 53 | segment-00001.sig.json (signed SegmentHeader) |
| 54 | segment-00002.log |
| 55 | segment-00002.sig.json |
| 56 | ... |
| 57 | |
| 58 | Segments chain via previous_segment_root in the header. |
| 59 | """ |
| 60 | |
| 61 | def __init__( |
| 62 | self, |
| 63 | log_dir: str, |
| 64 | signer: AgentIdentity, |
| 65 | log_id: str | None = None, |
| 66 | rotation: RotationPolicy | None = None, |
| 67 | ) -> None: |
| 68 | os.makedirs(log_dir, exist_ok=True) |
| 69 | self.log_dir = log_dir |
| 70 | self.signer = signer |
| 71 | self.log_id = log_id or f"urn:pqc-audit-log:{uuid.uuid4().hex}" |
| 72 | self.rotation = rotation or RotationPolicy() |
| 73 | |
| 74 | # State |
| 75 | self._current_segment_number: int = self._detect_next_segment_number() |
| 76 | self._current_events: list[InferenceEvent] = [] |
| 77 | self._current_created_at: str = datetime.now(timezone.utc).isoformat() |
| 78 | self._current_bytes_written: int = 0 |
| 79 | self._previous_segment_root: str = self._read_previous_segment_root() |
| 80 | |
| 81 | # Open jsonl file for current segment |
| 82 | self._jsonl_path = self._segment_jsonl_path(self._current_segment_number) |
| 83 | self._jsonl_file = open(self._jsonl_path, "a", encoding="utf-8") |
| 84 | self._sealed = False |
| 85 | |
| 86 | # -- paths -------------------------------------------------------------- |
| 87 | |
| 88 | def _segment_jsonl_path(self, n: int) -> str: |
| 89 | return os.path.join(self.log_dir, f"segment-{n:05d}.log") |
| 90 | |
| 91 | def _segment_sig_path(self, n: int) -> str: |
| 92 | return os.path.join(self.log_dir, f"segment-{n:05d}.sig.json") |
| 93 | |
| 94 | # -- state recovery ----------------------------------------------------- |
| 95 | |
| 96 | def _detect_next_segment_number(self) -> int: |
| 97 | max_n = 0 |
| 98 | names = os.listdir(self.log_dir) if os.path.isdir(self.log_dir) else [] |
| 99 | for name in names: |
| 100 | if name.startswith("segment-") and name.endswith(".sig.json"): |
| 101 | try: |
| 102 | n = int(name[len("segment-"): len("segment-") + 5]) |
| 103 | max_n = max(max_n, n) |
| 104 | except ValueError: |
| 105 | continue |
| 106 | return max_n + 1 |
| 107 | |
| 108 | def _read_previous_segment_root(self) -> str: |
| 109 | if self._current_segment_number <= 1: |
| 110 | return "" |
| 111 | prev_sig = self._segment_sig_path(self._current_segment_number - 1) |
| 112 | if not os.path.exists(prev_sig): |
| 113 | return "" |
| 114 | try: |
| 115 | with open(prev_sig, "r", encoding="utf-8") as f: |
| 116 | data = json.load(f) |
| 117 | return str(data.get("merkle_root", "")) |
| 118 | except (OSError, json.JSONDecodeError): |
| 119 | return "" |
| 120 | |
| 121 | # -- append / rotate ---------------------------------------------------- |
| 122 | |
| 123 | def append(self, event: InferenceEvent) -> None: |
| 124 | if self._sealed: |
| 125 | raise AppendToSealedSegmentError("cannot append to a closed appender") |
| 126 | line = event.to_jsonl() + "\n" |
| 127 | self._jsonl_file.write(line) |
| 128 | self._jsonl_file.flush() |
| 129 | try: |
| 130 | os.fsync(self._jsonl_file.fileno()) |
| 131 | except (OSError, ValueError): |
| 132 | pass |
| 133 | self._current_events.append(event) |
| 134 | self._current_bytes_written += len(line.encode("utf-8")) |
| 135 | |
| 136 | if self.rotation.should_rotate( |
| 137 | len(self._current_events), |
| 138 | self._current_bytes_written, |
| 139 | self._current_created_at, |
| 140 | ): |
| 141 | self.seal_current_segment() |
| 142 | |
| 143 | def seal_current_segment(self) -> SegmentHeader | None: |
| 144 | """Seal the current segment with a signed header and start a new one.""" |
| 145 | if not self._current_events: |
| 146 | return None |
| 147 | |
| 148 | header = SegmentHeader( |
| 149 | segment_id=f"segment-{self._current_segment_number:05d}", |
| 150 | segment_number=self._current_segment_number, |
| 151 | created_at=self._current_created_at, |
| 152 | sealed_at=datetime.now(timezone.utc).isoformat(), |
| 153 | event_count=len(self._current_events), |
| 154 | merkle_root="", |
| 155 | previous_segment_root=self._previous_segment_root, |
| 156 | log_id=self.log_id, |
| 157 | ) |
| 158 | segment = AuditSegment(header=header, events=list(self._current_events)) |
| 159 | segment.recompute_root() |
| 160 | |
| 161 | digest = hashlib.sha3_256(header.canonical_bytes()).digest() |
| 162 | sig = sign(digest, self.signer.signing_keypair) |
| 163 | header.signer_did = self.signer.did |
| 164 | header.algorithm = self.signer.signing_keypair.algorithm.value |
| 165 | header.signature = sig.hex() |
| 166 | header.public_key = self.signer.signing_keypair.public_key.hex() |
| 167 | |
| 168 | sig_path = self._segment_sig_path(self._current_segment_number) |
| 169 | with open(sig_path, "w", encoding="utf-8") as f: |
| 170 | json.dump(header.to_dict(), f, indent=2) |
| 171 | |
| 172 | # Close current jsonl file |
| 173 | self._jsonl_file.close() |
| 174 | |
| 175 | # Roll over state for next segment |
| 176 | self._previous_segment_root = header.merkle_root |
| 177 | self._current_segment_number += 1 |
| 178 | self._current_events = [] |
| 179 | self._current_created_at = datetime.now(timezone.utc).isoformat() |
| 180 | self._current_bytes_written = 0 |
| 181 | self._jsonl_path = self._segment_jsonl_path(self._current_segment_number) |
| 182 | self._jsonl_file = open(self._jsonl_path, "a", encoding="utf-8") |
| 183 | return header |
| 184 | |
| 185 | def close(self) -> SegmentHeader | None: |
| 186 | """Seal the current segment (if any) and close the file.""" |
| 187 | header: SegmentHeader | None = None |
| 188 | if self._current_events: |
| 189 | header = self.seal_current_segment() |
| 190 | self._sealed = True |
| 191 | try: |
| 192 | self._jsonl_file.close() |
| 193 | except Exception: |
| 194 | pass |
| 195 | # Remove the empty jsonl file created for the rolled-over next segment |
| 196 | try: |
| 197 | if os.path.exists(self._jsonl_path) and os.path.getsize(self._jsonl_path) == 0: |
| 198 | os.remove(self._jsonl_path) |
| 199 | except OSError: |
| 200 | pass |
| 201 | return header |
| 202 | |
| 203 | def __enter__(self) -> LogAppender: |
| 204 | return self |
| 205 | |
| 206 | def __exit__( |
| 207 | self, |
| 208 | exc_type: type[BaseException] | None, |
| 209 | exc: BaseException | None, |
| 210 | tb: TracebackType | None, |
| 211 | ) -> None: |
| 212 | self.close() |
| 213 | |