src/pqc_ai_governance/round.py
5.8 KB · 166 lines · python Raw
1 """ConsensusRound - one voting round with quorum policy and ML-DSA signed result."""
2
3 from __future__ import annotations
4
5 import hashlib
6 import json
7 from dataclasses import asdict, dataclass, field
8 from datetime import datetime, timezone
9 from typing import TYPE_CHECKING, Any
10
11 from quantumshield.core.algorithms import SignatureAlgorithm
12 from quantumshield.core.signatures import sign, verify
13
14 from pqc_ai_governance.errors import ProposalExpiredError
15 from pqc_ai_governance.node import NodeRegistry
16 from pqc_ai_governance.proposal import GovernanceProposal, ProposalStatus
17 from pqc_ai_governance.tally import VoteTally
18 from pqc_ai_governance.vote import SignedVote
19
20 if TYPE_CHECKING:
21 from pqc_ai_governance.node import GovernanceNode
22
23
24 @dataclass
25 class QuorumPolicy:
26 """Quorum / approval thresholds as fractions of total_weight.
27
28 Default is PBFT-style: 2/3 of weight must participate AND 2/3 of cast
29 (non-abstain) weight must approve.
30 """
31
32 min_participation_fraction: float = 2 / 3
33 min_approval_fraction: float = 2 / 3
34
35 def check(self, tally: VoteTally, total_weight: int) -> tuple[bool, str]:
36 participation = tally.total_cast_weight()
37 if total_weight == 0:
38 return False, "total weight is zero"
39 if participation / total_weight < self.min_participation_fraction:
40 return False, (
41 f"participation {participation}/{total_weight} "
42 f"< {self.min_participation_fraction:.2%}"
43 )
44 effective = tally.approve_weight + tally.reject_weight # abstain not counted in ratio
45 if effective == 0:
46 return False, "all votes are abstain"
47 if tally.approve_weight / effective < self.min_approval_fraction:
48 return False, (
49 f"approval {tally.approve_weight}/{effective} "
50 f"< {self.min_approval_fraction:.2%}"
51 )
52 return True, "quorum met, supermajority approve"
53
54
55 @dataclass
56 class ConsensusResult:
57 """Signed outcome of a consensus round."""
58
59 proposal_id: str
60 proposal_hash: str
61 decision: str # "passed" | "rejected"
62 reason: str
63 approve_weight: int
64 reject_weight: int
65 abstain_weight: int
66 total_weight: int
67 included_vote_ids: list[str] = field(default_factory=list)
68 decided_at: str = ""
69 signer_did: str = ""
70 algorithm: str = ""
71 signature: str = ""
72 public_key: str = ""
73
74 def canonical_bytes(self) -> bytes:
75 payload = {
76 "proposal_id": self.proposal_id,
77 "proposal_hash": self.proposal_hash,
78 "decision": self.decision,
79 "reason": self.reason,
80 "approve_weight": self.approve_weight,
81 "reject_weight": self.reject_weight,
82 "abstain_weight": self.abstain_weight,
83 "total_weight": self.total_weight,
84 "included_vote_ids": sorted(self.included_vote_ids),
85 "decided_at": self.decided_at,
86 }
87 return json.dumps(
88 payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False
89 ).encode("utf-8")
90
91 def to_dict(self) -> dict[str, Any]:
92 return asdict(self)
93
94 def to_json(self) -> str:
95 return json.dumps(self.to_dict(), indent=2)
96
97
98 @dataclass
99 class ConsensusRound:
100 """A consensus round: one proposal, many votes, deterministic signed result."""
101
102 proposal: GovernanceProposal
103 registry: NodeRegistry
104 policy: QuorumPolicy = field(default_factory=QuorumPolicy)
105 tally: VoteTally = field(init=False)
106
107 def __post_init__(self) -> None:
108 self.tally = VoteTally(proposal=self.proposal, registry=self.registry)
109
110 def cast(self, signed_vote: SignedVote) -> None:
111 if self.proposal.is_expired():
112 raise ProposalExpiredError(
113 f"proposal {self.proposal.proposal_id} is expired"
114 )
115 self.tally.add(signed_vote)
116
117 def finalize(self, coordinator: GovernanceNode) -> ConsensusResult:
118 """Decide, then sign the result with the coordinator's identity."""
119 total_weight = self.registry.total_weight()
120 ok, reason = self.policy.check(self.tally, total_weight)
121 decision = "passed" if ok else "rejected"
122 if ok:
123 self.proposal.status = ProposalStatus.PASSED
124 else:
125 self.proposal.status = ProposalStatus.REJECTED
126
127 result = ConsensusResult(
128 proposal_id=self.proposal.proposal_id,
129 proposal_hash=self.proposal.proposal_hash(),
130 decision=decision,
131 reason=reason,
132 approve_weight=self.tally.approve_weight,
133 reject_weight=self.tally.reject_weight,
134 abstain_weight=self.tally.abstain_weight,
135 total_weight=total_weight,
136 included_vote_ids=[v.vote.vote_id for v in self.tally.valid_votes],
137 decided_at=datetime.now(timezone.utc).isoformat(),
138 )
139
140 digest = hashlib.sha3_256(result.canonical_bytes()).digest()
141 sig = sign(digest, coordinator.identity.signing_keypair)
142 result.signer_did = coordinator.identity.did
143 result.algorithm = coordinator.identity.signing_keypair.algorithm.value
144 result.signature = sig.hex()
145 result.public_key = coordinator.identity.signing_keypair.public_key.hex()
146 return result
147
148 @staticmethod
149 def verify_result(result: ConsensusResult) -> bool:
150 if not result.signature:
151 return False
152 try:
153 algorithm = SignatureAlgorithm(result.algorithm)
154 except ValueError:
155 return False
156 digest = hashlib.sha3_256(result.canonical_bytes()).digest()
157 try:
158 return verify(
159 digest,
160 bytes.fromhex(result.signature),
161 bytes.fromhex(result.public_key),
162 algorithm,
163 )
164 except Exception:
165 return False
166