src/pqc_agent_wallet/audit.py
| 1 | """Audit log for wallet operations. ML-DSA signed entries.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | import json |
| 7 | from dataclasses import asdict, dataclass |
| 8 | from datetime import datetime, timezone |
| 9 | from typing import Any |
| 10 | |
| 11 | from quantumshield.core.algorithms import SignatureAlgorithm |
| 12 | from quantumshield.core.signatures import sign, verify |
| 13 | from quantumshield.identity.agent import AgentIdentity |
| 14 | |
| 15 | |
| 16 | @dataclass |
| 17 | class WalletAuditEntry: |
| 18 | """One audit event: who did what, signed with ML-DSA.""" |
| 19 | |
| 20 | timestamp: str |
| 21 | operation: str # 'unlock' | 'lock' | 'put' | 'get' | 'delete' | 'rotate' |
| 22 | actor_did: str |
| 23 | credential_name: str |
| 24 | success: bool |
| 25 | details: str = "" |
| 26 | signer_did: str = "" |
| 27 | algorithm: str = "" |
| 28 | signature: str = "" # hex |
| 29 | |
| 30 | def canonical_bytes(self) -> bytes: |
| 31 | """Bytes used for signing (no signature fields).""" |
| 32 | payload = { |
| 33 | "timestamp": self.timestamp, |
| 34 | "operation": self.operation, |
| 35 | "actor_did": self.actor_did, |
| 36 | "credential_name": self.credential_name, |
| 37 | "success": self.success, |
| 38 | "details": self.details, |
| 39 | } |
| 40 | return json.dumps( |
| 41 | payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False |
| 42 | ).encode("utf-8") |
| 43 | |
| 44 | def sign(self, identity: AgentIdentity) -> None: |
| 45 | """Sign this entry in place.""" |
| 46 | canonical = self.canonical_bytes() |
| 47 | digest = hashlib.sha3_256(canonical).digest() |
| 48 | sig = sign(digest, identity.signing_keypair) |
| 49 | self.signer_did = identity.did |
| 50 | self.algorithm = identity.signing_keypair.algorithm.value |
| 51 | self.signature = sig.hex() |
| 52 | |
| 53 | def verify_signature(self, public_key_hex: str) -> bool: |
| 54 | if not self.signature: |
| 55 | return False |
| 56 | try: |
| 57 | algorithm = SignatureAlgorithm(self.algorithm) |
| 58 | except ValueError: |
| 59 | return False |
| 60 | digest = hashlib.sha3_256(self.canonical_bytes()).digest() |
| 61 | try: |
| 62 | return verify( |
| 63 | digest, |
| 64 | bytes.fromhex(self.signature), |
| 65 | bytes.fromhex(public_key_hex), |
| 66 | algorithm, |
| 67 | ) |
| 68 | except Exception: |
| 69 | return False |
| 70 | |
| 71 | def to_dict(self) -> dict[str, Any]: |
| 72 | return asdict(self) |
| 73 | |
| 74 | @classmethod |
| 75 | def from_dict(cls, data: dict[str, Any]) -> WalletAuditEntry: |
| 76 | return cls(**data) |
| 77 | |
| 78 | |
| 79 | class WalletAuditLog: |
| 80 | """Append-only in-memory audit log. Production should persist entries.""" |
| 81 | |
| 82 | def __init__(self, max_entries: int = 100_000) -> None: |
| 83 | self._entries: list[WalletAuditEntry] = [] |
| 84 | self._max = max_entries |
| 85 | |
| 86 | def log( |
| 87 | self, |
| 88 | operation: str, |
| 89 | actor: AgentIdentity, |
| 90 | credential_name: str, |
| 91 | success: bool, |
| 92 | details: str = "", |
| 93 | ) -> WalletAuditEntry: |
| 94 | entry = WalletAuditEntry( |
| 95 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 96 | operation=operation, |
| 97 | actor_did=actor.did, |
| 98 | credential_name=credential_name, |
| 99 | success=success, |
| 100 | details=details, |
| 101 | ) |
| 102 | entry.sign(actor) |
| 103 | if len(self._entries) >= self._max: |
| 104 | self._entries.pop(0) |
| 105 | self._entries.append(entry) |
| 106 | return entry |
| 107 | |
| 108 | def entries( |
| 109 | self, |
| 110 | limit: int = 100, |
| 111 | operation: str | None = None, |
| 112 | credential_name: str | None = None, |
| 113 | ) -> list[WalletAuditEntry]: |
| 114 | out = self._entries |
| 115 | if operation: |
| 116 | out = [e for e in out if e.operation == operation] |
| 117 | if credential_name: |
| 118 | out = [e for e in out if e.credential_name == credential_name] |
| 119 | return out[-limit:][::-1] |
| 120 | |
| 121 | def export_json(self) -> str: |
| 122 | return json.dumps([e.to_dict() for e in self._entries], indent=2) |
| 123 | |
| 124 | def clear(self) -> None: |
| 125 | self._entries.clear() |
| 126 | |
| 127 | def __len__(self) -> int: |
| 128 | return len(self._entries) |
| 129 | |