src/pqc_audit_log_fs/cli.py
| 1 | """pqc-audit CLI.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | import sys |
| 7 | |
| 8 | import click |
| 9 | |
| 10 | from pqc_audit_log_fs import __version__ |
| 11 | from pqc_audit_log_fs.prover import InclusionProver |
| 12 | from pqc_audit_log_fs.reader import LogReader |
| 13 | |
| 14 | |
| 15 | @click.group() |
| 16 | @click.version_option(version=__version__, prog_name="pqc-audit") |
| 17 | def main() -> None: |
| 18 | """pqc-audit - immutable AI inference audit log.""" |
| 19 | |
| 20 | |
| 21 | @main.command() |
| 22 | @click.argument("log_dir", type=click.Path(exists=True)) |
| 23 | def verify(log_dir: str) -> None: |
| 24 | """Verify all segment signatures and the chain.""" |
| 25 | reader = LogReader(log_dir) |
| 26 | ok, errors = reader.verify_chain() |
| 27 | if ok: |
| 28 | click.echo(f"[OK] all {len(reader.list_segments())} segments verify") |
| 29 | sys.exit(0) |
| 30 | else: |
| 31 | for e in errors: |
| 32 | click.echo(f"[FAIL] {e}", err=True) |
| 33 | sys.exit(1) |
| 34 | |
| 35 | |
| 36 | @main.command() |
| 37 | @click.argument("log_dir", type=click.Path(exists=True)) |
| 38 | @click.argument("segment_number", type=int) |
| 39 | @click.argument("event_id") |
| 40 | def prove(log_dir: str, segment_number: int, event_id: str) -> None: |
| 41 | """Produce a Merkle inclusion proof for EVENT_ID in SEGMENT_NUMBER.""" |
| 42 | reader = LogReader(log_dir) |
| 43 | prover = InclusionProver(reader) |
| 44 | proof = prover.prove_event(segment_number, event_id) |
| 45 | payload = proof.to_dict() if hasattr(proof, "to_dict") else proof.__dict__ |
| 46 | click.echo(json.dumps(payload, indent=2)) |
| 47 | |
| 48 | |
| 49 | @main.command() |
| 50 | @click.argument("log_dir", type=click.Path(exists=True)) |
| 51 | def info(log_dir: str) -> None: |
| 52 | """Show info about a log directory.""" |
| 53 | reader = LogReader(log_dir) |
| 54 | segments = reader.list_segments() |
| 55 | click.echo(f"log_dir: {log_dir}") |
| 56 | click.echo(f"segments: {len(segments)}") |
| 57 | for n in segments: |
| 58 | h = reader.read_header(n) |
| 59 | prev = h.previous_segment_root[:16] + "..." if h.previous_segment_root else "<genesis>" |
| 60 | click.echo( |
| 61 | f" segment {n:05d} events={h.event_count:>6} " |
| 62 | f"root={h.merkle_root[:16]}... prev={prev} " |
| 63 | f"sealed_at={h.sealed_at}" |
| 64 | ) |
| 65 | |
| 66 | |
| 67 | if __name__ == "__main__": |
| 68 | main() |
| 69 | |