tests/test_cli.py
2.1 KB · 73 lines · python Raw
1 """Tests for the pqc-audit CLI."""
2
3 from __future__ import annotations
4
5 import json
6 import os
7 from collections.abc import Callable
8
9 from click.testing import CliRunner
10 from quantumshield.identity.agent import AgentIdentity
11
12 from pqc_audit_log_fs.appender import LogAppender, RotationPolicy
13 from pqc_audit_log_fs.cli import main
14 from pqc_audit_log_fs.event import InferenceEvent
15
16
17 def _seed(
18 log_dir: str,
19 signer: AgentIdentity,
20 factory: Callable[..., InferenceEvent],
21 n: int = 5,
22 ) -> None:
23 app = LogAppender(
24 log_dir, signer,
25 rotation=RotationPolicy(max_events_per_segment=1000),
26 )
27 for _ in range(n):
28 app.append(factory())
29 app.close()
30
31
32 def test_verify_exits_0_for_good_log(
33 signer_identity: AgentIdentity,
34 tmp_log_dir: str,
35 event_factory: Callable[..., InferenceEvent],
36 ) -> None:
37 _seed(tmp_log_dir, signer_identity, event_factory, n=3)
38 runner = CliRunner()
39 result = runner.invoke(main, ["verify", tmp_log_dir])
40 assert result.exit_code == 0
41 assert "OK" in result.output
42
43
44 def test_verify_exits_1_after_tamper(
45 signer_identity: AgentIdentity,
46 tmp_log_dir: str,
47 event_factory: Callable[..., InferenceEvent],
48 ) -> None:
49 _seed(tmp_log_dir, signer_identity, event_factory, n=3)
50 jsonl = os.path.join(tmp_log_dir, "segment-00001.log")
51 with open(jsonl, "r", encoding="utf-8") as f:
52 lines = f.readlines()
53 first = json.loads(lines[0])
54 first["decision_label"] = "TAMPERED"
55 lines[0] = json.dumps(first, separators=(",", ":")) + "\n"
56 with open(jsonl, "w", encoding="utf-8") as f:
57 f.writelines(lines)
58 runner = CliRunner()
59 result = runner.invoke(main, ["verify", tmp_log_dir])
60 assert result.exit_code == 1
61
62
63 def test_info_prints_segment_count(
64 signer_identity: AgentIdentity,
65 tmp_log_dir: str,
66 event_factory: Callable[..., InferenceEvent],
67 ) -> None:
68 _seed(tmp_log_dir, signer_identity, event_factory, n=4)
69 runner = CliRunner()
70 result = runner.invoke(main, ["info", tmp_log_dir])
71 assert result.exit_code == 0
72 assert "segments: 1" in result.output
73