src/pqc_ai_governance/audit.py
6.3 KB · 189 lines · python Raw
1 """Append-only audit log for governance operations."""
2
3 from __future__ import annotations
4
5 import json
6 from dataclasses import asdict, dataclass, field
7 from datetime import datetime, timezone
8 from typing import Any
9
10 from pqc_ai_governance.proposal import GovernanceProposal, ProposalKind
11 from pqc_ai_governance.round import ConsensusResult
12 from pqc_ai_governance.vote import SignedVote
13
14
15 # Operation name constants (stable strings written to the audit trail).
16 OP_PROPOSAL_CREATED = "proposal_created"
17 OP_VOTE_CAST = "vote_cast"
18 OP_CONSENSUS_REACHED = "consensus_reached"
19 OP_BYZANTINE_DETECTED = "byzantine_detected"
20 OP_NODE_ADDED = "node_added"
21 OP_NODE_REMOVED = "node_removed"
22 OP_AUTHORIZATION_GRANTED = "authorization_granted"
23 OP_AUTHORIZATION_REVOKED = "authorization_revoked"
24
25
26 @dataclass
27 class GovernanceAuditEntry:
28 """A single governance event logged for audit."""
29
30 timestamp: str
31 operation: str
32 proposal_id: str | None = None
33 subject_id: str | None = None
34 kind: str | None = None
35 actor_did: str | None = None
36 decision: str | None = None
37 details: dict[str, Any] = field(default_factory=dict)
38
39 def to_dict(self) -> dict[str, Any]:
40 return asdict(self)
41
42
43 class GovernanceAuditLog:
44 """Append-only audit log for governance operations.
45
46 Production usage: persist entries to a real log backend. This class gives
47 you an in-memory structure with filtering and JSON export for integrations.
48 """
49
50 def __init__(self, max_entries: int = 100_000) -> None:
51 self._entries: list[GovernanceAuditEntry] = []
52 self._max = max_entries
53
54 def log(self, entry: GovernanceAuditEntry) -> None:
55 if len(self._entries) >= self._max:
56 self._entries.pop(0)
57 self._entries.append(entry)
58
59 # -- convenience helpers -----------------------------------------------
60
61 def log_proposal_created(self, proposal: GovernanceProposal) -> None:
62 self.log(
63 GovernanceAuditEntry(
64 timestamp=datetime.now(timezone.utc).isoformat(),
65 operation=OP_PROPOSAL_CREATED,
66 proposal_id=proposal.proposal_id,
67 subject_id=proposal.subject_id,
68 kind=proposal.kind.value,
69 actor_did=proposal.proposer_did,
70 details={"title": proposal.title},
71 )
72 )
73
74 def log_vote_cast(self, signed: SignedVote) -> None:
75 self.log(
76 GovernanceAuditEntry(
77 timestamp=datetime.now(timezone.utc).isoformat(),
78 operation=OP_VOTE_CAST,
79 proposal_id=signed.vote.proposal_id,
80 actor_did=signed.vote.voter_did,
81 decision=signed.vote.decision.value,
82 details={"vote_id": signed.vote.vote_id},
83 )
84 )
85
86 def log_consensus_reached(self, result: ConsensusResult) -> None:
87 self.log(
88 GovernanceAuditEntry(
89 timestamp=datetime.now(timezone.utc).isoformat(),
90 operation=OP_CONSENSUS_REACHED,
91 proposal_id=result.proposal_id,
92 actor_did=result.signer_did,
93 decision=result.decision,
94 details={
95 "reason": result.reason,
96 "approve_weight": result.approve_weight,
97 "reject_weight": result.reject_weight,
98 "abstain_weight": result.abstain_weight,
99 "total_weight": result.total_weight,
100 "vote_count": len(result.included_vote_ids),
101 },
102 )
103 )
104
105 def log_byzantine_detected(
106 self, voter_did: str, proposal_id: str, prior: str, now: str
107 ) -> None:
108 self.log(
109 GovernanceAuditEntry(
110 timestamp=datetime.now(timezone.utc).isoformat(),
111 operation=OP_BYZANTINE_DETECTED,
112 proposal_id=proposal_id,
113 actor_did=voter_did,
114 details={"prior_decision": prior, "conflicting_decision": now},
115 )
116 )
117
118 def log_node_added(self, did: str, name: str, weight: int) -> None:
119 self.log(
120 GovernanceAuditEntry(
121 timestamp=datetime.now(timezone.utc).isoformat(),
122 operation=OP_NODE_ADDED,
123 actor_did=did,
124 details={"name": name, "weight": weight},
125 )
126 )
127
128 def log_node_removed(self, did: str) -> None:
129 self.log(
130 GovernanceAuditEntry(
131 timestamp=datetime.now(timezone.utc).isoformat(),
132 operation=OP_NODE_REMOVED,
133 actor_did=did,
134 )
135 )
136
137 def log_authorization_granted(
138 self, subject_id: str, kind: ProposalKind, proposal_id: str
139 ) -> None:
140 self.log(
141 GovernanceAuditEntry(
142 timestamp=datetime.now(timezone.utc).isoformat(),
143 operation=OP_AUTHORIZATION_GRANTED,
144 proposal_id=proposal_id,
145 subject_id=subject_id,
146 kind=kind.value,
147 )
148 )
149
150 def log_authorization_revoked(
151 self, subject_id: str, kind: ProposalKind, proposal_id: str
152 ) -> None:
153 self.log(
154 GovernanceAuditEntry(
155 timestamp=datetime.now(timezone.utc).isoformat(),
156 operation=OP_AUTHORIZATION_REVOKED,
157 proposal_id=proposal_id,
158 subject_id=subject_id,
159 kind=kind.value,
160 )
161 )
162
163 # -- query / export ----------------------------------------------------
164
165 def entries(
166 self,
167 limit: int = 100,
168 operation: str | None = None,
169 proposal_id: str | None = None,
170 actor_did: str | None = None,
171 ) -> list[GovernanceAuditEntry]:
172 filtered = self._entries
173 if operation:
174 filtered = [e for e in filtered if e.operation == operation]
175 if proposal_id:
176 filtered = [e for e in filtered if e.proposal_id == proposal_id]
177 if actor_did:
178 filtered = [e for e in filtered if e.actor_did == actor_did]
179 return filtered[-limit:][::-1]
180
181 def export_json(self) -> str:
182 return json.dumps([e.to_dict() for e in self._entries], indent=2)
183
184 def clear(self) -> None:
185 self._entries.clear()
186
187 def __len__(self) -> int:
188 return len(self._entries)
189