tests/test_reader.py
3.7 KB · 122 lines · python Raw
1 """Tests for LogReader."""
2
3 from __future__ import annotations
4
5 import json
6 import os
7 from collections.abc import Callable
8
9 import pytest
10 from quantumshield.identity.agent import AgentIdentity
11
12 from pqc_audit_log_fs.appender import LogAppender, RotationPolicy
13 from pqc_audit_log_fs.errors import SegmentCorruptedError, SignatureVerificationError
14 from pqc_audit_log_fs.event import InferenceEvent
15 from pqc_audit_log_fs.reader import LogReader
16
17
18 def _seed(
19 log_dir: str,
20 signer: AgentIdentity,
21 factory: Callable[..., InferenceEvent],
22 n: int = 3,
23 max_events: int = 100,
24 ) -> None:
25 app = LogAppender(
26 log_dir, signer,
27 rotation=RotationPolicy(max_events_per_segment=max_events),
28 )
29 for _ in range(n):
30 app.append(factory())
31 app.close()
32
33
34 def test_list_segments_sorted(
35 signer_identity: AgentIdentity,
36 tmp_log_dir: str,
37 event_factory: Callable[..., InferenceEvent],
38 ) -> None:
39 # Create 3 rotated segments
40 app = LogAppender(
41 tmp_log_dir, signer_identity,
42 rotation=RotationPolicy(max_events_per_segment=1),
43 )
44 for _ in range(3):
45 app.append(event_factory())
46 app.close()
47 reader = LogReader(tmp_log_dir)
48 assert reader.list_segments() == [1, 2, 3]
49
50
51 def test_read_header_and_segment(
52 signer_identity: AgentIdentity,
53 tmp_log_dir: str,
54 event_factory: Callable[..., InferenceEvent],
55 ) -> None:
56 _seed(tmp_log_dir, signer_identity, event_factory, n=3)
57 reader = LogReader(tmp_log_dir)
58 header = reader.read_header(1)
59 segment = reader.read_segment(1)
60 assert segment.header == header
61 assert len(segment.events) == 3
62 assert header.event_count == 3
63
64
65 def test_verify_segment_passes(
66 signer_identity: AgentIdentity,
67 tmp_log_dir: str,
68 event_factory: Callable[..., InferenceEvent],
69 ) -> None:
70 _seed(tmp_log_dir, signer_identity, event_factory, n=5)
71 reader = LogReader(tmp_log_dir)
72 assert reader.verify_segment(1) is True
73
74
75 def test_tampered_jsonl_raises(
76 signer_identity: AgentIdentity,
77 tmp_log_dir: str,
78 event_factory: Callable[..., InferenceEvent],
79 ) -> None:
80 _seed(tmp_log_dir, signer_identity, event_factory, n=4)
81 jsonl = os.path.join(tmp_log_dir, "segment-00001.log")
82 # Rewrite first line with a bogus decision_label (changes leaf hash)
83 with open(jsonl, "r", encoding="utf-8") as f:
84 lines = f.readlines()
85 first = json.loads(lines[0])
86 first["decision_label"] = "TAMPERED"
87 lines[0] = json.dumps(first, separators=(",", ":")) + "\n"
88 with open(jsonl, "w", encoding="utf-8") as f:
89 f.writelines(lines)
90 reader = LogReader(tmp_log_dir)
91 with pytest.raises(SegmentCorruptedError):
92 reader.verify_segment(1)
93
94
95 def test_tampered_signature_raises(
96 signer_identity: AgentIdentity,
97 tmp_log_dir: str,
98 event_factory: Callable[..., InferenceEvent],
99 ) -> None:
100 _seed(tmp_log_dir, signer_identity, event_factory, n=3)
101 sig_path = os.path.join(tmp_log_dir, "segment-00001.sig.json")
102 with open(sig_path, "r", encoding="utf-8") as f:
103 data = json.load(f)
104 # Flip a byte in the signature hex
105 sig = data["signature"]
106 first_byte = sig[:2]
107 flipped = f"{(int(first_byte, 16) ^ 0xFF):02x}"
108 data["signature"] = flipped + sig[2:]
109 with open(sig_path, "w", encoding="utf-8") as f:
110 json.dump(data, f, indent=2)
111 reader = LogReader(tmp_log_dir)
112 # If liboqs or ed25519 backend is in use this raises; stub backend always
113 # returns True, in which case we skip the check.
114 try:
115 reader.verify_segment(1)
116 except SignatureVerificationError:
117 return
118 pytest.skip(
119 "signature backend is STUB (no liboqs or ed25519); "
120 "tampered signatures cannot be detected in stub mode"
121 )
122