tests/test_integration.py
2.3 KB · 70 lines · python Raw
1 """End-to-end integration tests."""
2
3 from __future__ import annotations
4
5 import json
6 import os
7 from collections.abc import Callable
8
9 from quantumshield.identity.agent import AgentIdentity
10
11 from pqc_audit_log_fs.appender import LogAppender, RotationPolicy
12 from pqc_audit_log_fs.event import InferenceEvent
13 from pqc_audit_log_fs.reader import LogReader
14
15
16 def test_full_flow_25_events_produces_3_segments(
17 signer_identity: AgentIdentity,
18 tmp_log_dir: str,
19 event_factory: Callable[..., InferenceEvent],
20 ) -> None:
21 app = LogAppender(
22 tmp_log_dir, signer_identity,
23 rotation=RotationPolicy(max_events_per_segment=10),
24 )
25 for _ in range(25):
26 app.append(event_factory())
27 # After 25 events with max=10: segments 1 and 2 are sealed; 5 events in
28 # segment 3 (not yet sealed).
29 assert os.path.exists(os.path.join(tmp_log_dir, "segment-00001.sig.json"))
30 assert os.path.exists(os.path.join(tmp_log_dir, "segment-00002.sig.json"))
31 assert not os.path.exists(os.path.join(tmp_log_dir, "segment-00003.sig.json"))
32 app.close()
33 # After close, segment 3 is also sealed
34 assert os.path.exists(os.path.join(tmp_log_dir, "segment-00003.sig.json"))
35
36 reader = LogReader(tmp_log_dir)
37 assert reader.list_segments() == [1, 2, 3]
38 ok, errors = reader.verify_chain()
39 assert ok, f"unexpected errors: {errors}"
40
41
42 def test_tampering_flow_detected_by_verify_chain(
43 signer_identity: AgentIdentity,
44 tmp_log_dir: str,
45 event_factory: Callable[..., InferenceEvent],
46 ) -> None:
47 app = LogAppender(
48 tmp_log_dir, signer_identity,
49 rotation=RotationPolicy(max_events_per_segment=5),
50 )
51 for _ in range(12):
52 app.append(event_factory())
53 app.close()
54
55 # Tamper with segment 1's jsonl (changes merkle root vs declared)
56 jsonl = os.path.join(tmp_log_dir, "segment-00001.log")
57 with open(jsonl, "r", encoding="utf-8") as f:
58 lines = f.readlines()
59 first = json.loads(lines[0])
60 first["decision_label"] = "FORGED"
61 lines[0] = json.dumps(first, separators=(",", ":")) + "\n"
62 with open(jsonl, "w", encoding="utf-8") as f:
63 f.writelines(lines)
64
65 reader = LogReader(tmp_log_dir)
66 ok, errors = reader.verify_chain()
67 assert not ok
68 # At least one error identifies segment 1
69 assert any("segment 1" in e or "segment 00001" in e for e in errors)
70