src/pqc_agent_wallet/audit.py
3.8 KB · 129 lines · python Raw
1 """Audit log for wallet operations. ML-DSA signed entries."""
2
3 from __future__ import annotations
4
5 import hashlib
6 import json
7 from dataclasses import asdict, dataclass
8 from datetime import datetime, timezone
9 from typing import Any
10
11 from quantumshield.core.algorithms import SignatureAlgorithm
12 from quantumshield.core.signatures import sign, verify
13 from quantumshield.identity.agent import AgentIdentity
14
15
16 @dataclass
17 class WalletAuditEntry:
18 """One audit event: who did what, signed with ML-DSA."""
19
20 timestamp: str
21 operation: str # 'unlock' | 'lock' | 'put' | 'get' | 'delete' | 'rotate'
22 actor_did: str
23 credential_name: str
24 success: bool
25 details: str = ""
26 signer_did: str = ""
27 algorithm: str = ""
28 signature: str = "" # hex
29
30 def canonical_bytes(self) -> bytes:
31 """Bytes used for signing (no signature fields)."""
32 payload = {
33 "timestamp": self.timestamp,
34 "operation": self.operation,
35 "actor_did": self.actor_did,
36 "credential_name": self.credential_name,
37 "success": self.success,
38 "details": self.details,
39 }
40 return json.dumps(
41 payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False
42 ).encode("utf-8")
43
44 def sign(self, identity: AgentIdentity) -> None:
45 """Sign this entry in place."""
46 canonical = self.canonical_bytes()
47 digest = hashlib.sha3_256(canonical).digest()
48 sig = sign(digest, identity.signing_keypair)
49 self.signer_did = identity.did
50 self.algorithm = identity.signing_keypair.algorithm.value
51 self.signature = sig.hex()
52
53 def verify_signature(self, public_key_hex: str) -> bool:
54 if not self.signature:
55 return False
56 try:
57 algorithm = SignatureAlgorithm(self.algorithm)
58 except ValueError:
59 return False
60 digest = hashlib.sha3_256(self.canonical_bytes()).digest()
61 try:
62 return verify(
63 digest,
64 bytes.fromhex(self.signature),
65 bytes.fromhex(public_key_hex),
66 algorithm,
67 )
68 except Exception:
69 return False
70
71 def to_dict(self) -> dict[str, Any]:
72 return asdict(self)
73
74 @classmethod
75 def from_dict(cls, data: dict[str, Any]) -> WalletAuditEntry:
76 return cls(**data)
77
78
79 class WalletAuditLog:
80 """Append-only in-memory audit log. Production should persist entries."""
81
82 def __init__(self, max_entries: int = 100_000) -> None:
83 self._entries: list[WalletAuditEntry] = []
84 self._max = max_entries
85
86 def log(
87 self,
88 operation: str,
89 actor: AgentIdentity,
90 credential_name: str,
91 success: bool,
92 details: str = "",
93 ) -> WalletAuditEntry:
94 entry = WalletAuditEntry(
95 timestamp=datetime.now(timezone.utc).isoformat(),
96 operation=operation,
97 actor_did=actor.did,
98 credential_name=credential_name,
99 success=success,
100 details=details,
101 )
102 entry.sign(actor)
103 if len(self._entries) >= self._max:
104 self._entries.pop(0)
105 self._entries.append(entry)
106 return entry
107
108 def entries(
109 self,
110 limit: int = 100,
111 operation: str | None = None,
112 credential_name: str | None = None,
113 ) -> list[WalletAuditEntry]:
114 out = self._entries
115 if operation:
116 out = [e for e in out if e.operation == operation]
117 if credential_name:
118 out = [e for e in out if e.credential_name == credential_name]
119 return out[-limit:][::-1]
120
121 def export_json(self) -> str:
122 return json.dumps([e.to_dict() for e in self._entries], indent=2)
123
124 def clear(self) -> None:
125 self._entries.clear()
126
127 def __len__(self) -> int:
128 return len(self._entries)
129