examples/tamper_detection.py
1.8 KB · 67 lines · python Raw
1 """tamper_detection.py - show that LogReader flags any mutation to a sealed log.
2
3 Run::
4
5 python examples/tamper_detection.py
6 """
7
8 from __future__ import annotations
9
10 import json
11 import os
12 import tempfile
13
14 from quantumshield.identity.agent import AgentIdentity
15
16 from pqc_audit_log_fs import (
17 InferenceEvent,
18 LogAppender,
19 LogReader,
20 RotationPolicy,
21 )
22
23
24 def main() -> None:
25 with tempfile.TemporaryDirectory() as log_dir:
26 signer = AgentIdentity.create(name="demo-signer")
27 appender = LogAppender(
28 log_dir,
29 signer,
30 rotation=RotationPolicy(max_events_per_segment=1000),
31 )
32
33 for i in range(20):
34 event = InferenceEvent.create(
35 model_did="did:pqaid:demo-model",
36 model_version="1.0.0",
37 input_bytes=f"in-{i}".encode(),
38 output_bytes=f"out-{i}".encode(),
39 decision_label="approve",
40 )
41 appender.append(event)
42 appender.close()
43
44 # Mutate one line in the sealed segment
45 jsonl = os.path.join(log_dir, "segment-00001.log")
46 with open(jsonl, "r", encoding="utf-8") as f:
47 lines = f.readlines()
48 third = json.loads(lines[2])
49 third["decision_label"] = "deny" # forger swaps the outcome
50 lines[2] = json.dumps(third, separators=(",", ":")) + "\n"
51 with open(jsonl, "w", encoding="utf-8") as f:
52 f.writelines(lines)
53 print("[tamper] mutated line 3 of segment-00001.log")
54
55 reader = LogReader(log_dir)
56 ok, errors = reader.verify_chain()
57 if ok:
58 print("[FAIL] tamper was NOT detected")
59 else:
60 print(f"[OK] tamper detected across {len(errors)} error(s):")
61 for e in errors:
62 print(f" - {e}")
63
64
65 if __name__ == "__main__":
66 main()
67