src/pqc_ai_governance/audit.py
| 1 | """Append-only audit log for governance operations.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | from dataclasses import asdict, dataclass, field |
| 7 | from datetime import datetime, timezone |
| 8 | from typing import Any |
| 9 | |
| 10 | from pqc_ai_governance.proposal import GovernanceProposal, ProposalKind |
| 11 | from pqc_ai_governance.round import ConsensusResult |
| 12 | from pqc_ai_governance.vote import SignedVote |
| 13 | |
| 14 | |
| 15 | # Operation name constants (stable strings written to the audit trail). |
| 16 | OP_PROPOSAL_CREATED = "proposal_created" |
| 17 | OP_VOTE_CAST = "vote_cast" |
| 18 | OP_CONSENSUS_REACHED = "consensus_reached" |
| 19 | OP_BYZANTINE_DETECTED = "byzantine_detected" |
| 20 | OP_NODE_ADDED = "node_added" |
| 21 | OP_NODE_REMOVED = "node_removed" |
| 22 | OP_AUTHORIZATION_GRANTED = "authorization_granted" |
| 23 | OP_AUTHORIZATION_REVOKED = "authorization_revoked" |
| 24 | |
| 25 | |
| 26 | @dataclass |
| 27 | class GovernanceAuditEntry: |
| 28 | """A single governance event logged for audit.""" |
| 29 | |
| 30 | timestamp: str |
| 31 | operation: str |
| 32 | proposal_id: str | None = None |
| 33 | subject_id: str | None = None |
| 34 | kind: str | None = None |
| 35 | actor_did: str | None = None |
| 36 | decision: str | None = None |
| 37 | details: dict[str, Any] = field(default_factory=dict) |
| 38 | |
| 39 | def to_dict(self) -> dict[str, Any]: |
| 40 | return asdict(self) |
| 41 | |
| 42 | |
| 43 | class GovernanceAuditLog: |
| 44 | """Append-only audit log for governance operations. |
| 45 | |
| 46 | Production usage: persist entries to a real log backend. This class gives |
| 47 | you an in-memory structure with filtering and JSON export for integrations. |
| 48 | """ |
| 49 | |
| 50 | def __init__(self, max_entries: int = 100_000) -> None: |
| 51 | self._entries: list[GovernanceAuditEntry] = [] |
| 52 | self._max = max_entries |
| 53 | |
| 54 | def log(self, entry: GovernanceAuditEntry) -> None: |
| 55 | if len(self._entries) >= self._max: |
| 56 | self._entries.pop(0) |
| 57 | self._entries.append(entry) |
| 58 | |
| 59 | # -- convenience helpers ----------------------------------------------- |
| 60 | |
| 61 | def log_proposal_created(self, proposal: GovernanceProposal) -> None: |
| 62 | self.log( |
| 63 | GovernanceAuditEntry( |
| 64 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 65 | operation=OP_PROPOSAL_CREATED, |
| 66 | proposal_id=proposal.proposal_id, |
| 67 | subject_id=proposal.subject_id, |
| 68 | kind=proposal.kind.value, |
| 69 | actor_did=proposal.proposer_did, |
| 70 | details={"title": proposal.title}, |
| 71 | ) |
| 72 | ) |
| 73 | |
| 74 | def log_vote_cast(self, signed: SignedVote) -> None: |
| 75 | self.log( |
| 76 | GovernanceAuditEntry( |
| 77 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 78 | operation=OP_VOTE_CAST, |
| 79 | proposal_id=signed.vote.proposal_id, |
| 80 | actor_did=signed.vote.voter_did, |
| 81 | decision=signed.vote.decision.value, |
| 82 | details={"vote_id": signed.vote.vote_id}, |
| 83 | ) |
| 84 | ) |
| 85 | |
| 86 | def log_consensus_reached(self, result: ConsensusResult) -> None: |
| 87 | self.log( |
| 88 | GovernanceAuditEntry( |
| 89 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 90 | operation=OP_CONSENSUS_REACHED, |
| 91 | proposal_id=result.proposal_id, |
| 92 | actor_did=result.signer_did, |
| 93 | decision=result.decision, |
| 94 | details={ |
| 95 | "reason": result.reason, |
| 96 | "approve_weight": result.approve_weight, |
| 97 | "reject_weight": result.reject_weight, |
| 98 | "abstain_weight": result.abstain_weight, |
| 99 | "total_weight": result.total_weight, |
| 100 | "vote_count": len(result.included_vote_ids), |
| 101 | }, |
| 102 | ) |
| 103 | ) |
| 104 | |
| 105 | def log_byzantine_detected( |
| 106 | self, voter_did: str, proposal_id: str, prior: str, now: str |
| 107 | ) -> None: |
| 108 | self.log( |
| 109 | GovernanceAuditEntry( |
| 110 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 111 | operation=OP_BYZANTINE_DETECTED, |
| 112 | proposal_id=proposal_id, |
| 113 | actor_did=voter_did, |
| 114 | details={"prior_decision": prior, "conflicting_decision": now}, |
| 115 | ) |
| 116 | ) |
| 117 | |
| 118 | def log_node_added(self, did: str, name: str, weight: int) -> None: |
| 119 | self.log( |
| 120 | GovernanceAuditEntry( |
| 121 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 122 | operation=OP_NODE_ADDED, |
| 123 | actor_did=did, |
| 124 | details={"name": name, "weight": weight}, |
| 125 | ) |
| 126 | ) |
| 127 | |
| 128 | def log_node_removed(self, did: str) -> None: |
| 129 | self.log( |
| 130 | GovernanceAuditEntry( |
| 131 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 132 | operation=OP_NODE_REMOVED, |
| 133 | actor_did=did, |
| 134 | ) |
| 135 | ) |
| 136 | |
| 137 | def log_authorization_granted( |
| 138 | self, subject_id: str, kind: ProposalKind, proposal_id: str |
| 139 | ) -> None: |
| 140 | self.log( |
| 141 | GovernanceAuditEntry( |
| 142 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 143 | operation=OP_AUTHORIZATION_GRANTED, |
| 144 | proposal_id=proposal_id, |
| 145 | subject_id=subject_id, |
| 146 | kind=kind.value, |
| 147 | ) |
| 148 | ) |
| 149 | |
| 150 | def log_authorization_revoked( |
| 151 | self, subject_id: str, kind: ProposalKind, proposal_id: str |
| 152 | ) -> None: |
| 153 | self.log( |
| 154 | GovernanceAuditEntry( |
| 155 | timestamp=datetime.now(timezone.utc).isoformat(), |
| 156 | operation=OP_AUTHORIZATION_REVOKED, |
| 157 | proposal_id=proposal_id, |
| 158 | subject_id=subject_id, |
| 159 | kind=kind.value, |
| 160 | ) |
| 161 | ) |
| 162 | |
| 163 | # -- query / export ---------------------------------------------------- |
| 164 | |
| 165 | def entries( |
| 166 | self, |
| 167 | limit: int = 100, |
| 168 | operation: str | None = None, |
| 169 | proposal_id: str | None = None, |
| 170 | actor_did: str | None = None, |
| 171 | ) -> list[GovernanceAuditEntry]: |
| 172 | filtered = self._entries |
| 173 | if operation: |
| 174 | filtered = [e for e in filtered if e.operation == operation] |
| 175 | if proposal_id: |
| 176 | filtered = [e for e in filtered if e.proposal_id == proposal_id] |
| 177 | if actor_did: |
| 178 | filtered = [e for e in filtered if e.actor_did == actor_did] |
| 179 | return filtered[-limit:][::-1] |
| 180 | |
| 181 | def export_json(self) -> str: |
| 182 | return json.dumps([e.to_dict() for e in self._entries], indent=2) |
| 183 | |
| 184 | def clear(self) -> None: |
| 185 | self._entries.clear() |
| 186 | |
| 187 | def __len__(self) -> int: |
| 188 | return len(self._entries) |
| 189 | |