src/pqc_ai_governance/node.py
4.1 KB · 133 lines · python Raw
1 """GovernanceNode - one voting participant in the consensus."""
2
3 from __future__ import annotations
4
5 import hashlib
6 from dataclasses import dataclass, field
7
8 from quantumshield.core.algorithms import SignatureAlgorithm
9 from quantumshield.core.signatures import sign, verify
10 from quantumshield.identity.agent import AgentIdentity
11
12 from pqc_ai_governance.errors import UnknownNodeError
13 from pqc_ai_governance.proposal import GovernanceProposal
14 from pqc_ai_governance.vote import SignedVote, Vote, VoteDecision
15
16
17 @dataclass
18 class GovernanceNode:
19 """A voting node in the governance consensus.
20
21 Wraps an ``AgentIdentity``; produces signed proposals and signed votes.
22 ``weight`` of 1 gives a one-node-one-vote system; >1 yields weighted voting.
23 """
24
25 identity: AgentIdentity
26 name: str
27 weight: int = 1
28
29 @property
30 def did(self) -> str:
31 return self.identity.did
32
33 def sign_proposal(self, proposal: GovernanceProposal) -> GovernanceProposal:
34 """Sign the proposal as the proposer."""
35 digest = hashlib.sha3_256(proposal.canonical_bytes()).digest()
36 sig = sign(digest, self.identity.signing_keypair)
37 proposal.signer_did = self.identity.did
38 proposal.algorithm = self.identity.signing_keypair.algorithm.value
39 proposal.signature = sig.hex()
40 proposal.public_key = self.identity.signing_keypair.public_key.hex()
41 return proposal
42
43 def cast_vote(
44 self,
45 proposal: GovernanceProposal,
46 decision: VoteDecision,
47 rationale: str = "",
48 ) -> SignedVote:
49 """Cast a signed vote on a proposal."""
50 vote = Vote.create(
51 proposal_id=proposal.proposal_id,
52 proposal_hash=proposal.proposal_hash(),
53 voter_did=self.identity.did,
54 decision=decision,
55 rationale=rationale,
56 )
57 digest = hashlib.sha3_256(vote.canonical_bytes()).digest()
58 sig = sign(digest, self.identity.signing_keypair)
59 return SignedVote(
60 vote=vote,
61 algorithm=self.identity.signing_keypair.algorithm.value,
62 signature=sig.hex(),
63 public_key=self.identity.signing_keypair.public_key.hex(),
64 )
65
66 @staticmethod
67 def verify_vote(signed: SignedVote) -> bool:
68 try:
69 algorithm = SignatureAlgorithm(signed.algorithm)
70 except ValueError:
71 return False
72 digest = hashlib.sha3_256(signed.vote.canonical_bytes()).digest()
73 try:
74 return verify(
75 digest,
76 bytes.fromhex(signed.signature),
77 bytes.fromhex(signed.public_key),
78 algorithm,
79 )
80 except Exception:
81 return False
82
83 @staticmethod
84 def verify_proposal(proposal: GovernanceProposal) -> bool:
85 if not proposal.signature:
86 return False
87 try:
88 algorithm = SignatureAlgorithm(proposal.algorithm)
89 except ValueError:
90 return False
91 digest = hashlib.sha3_256(proposal.canonical_bytes()).digest()
92 try:
93 return verify(
94 digest,
95 bytes.fromhex(proposal.signature),
96 bytes.fromhex(proposal.public_key),
97 algorithm,
98 )
99 except Exception:
100 return False
101
102
103 @dataclass
104 class NodeRegistry:
105 """Allow-list of governance nodes, keyed by DID."""
106
107 nodes: dict[str, GovernanceNode] = field(default_factory=dict)
108
109 def register(self, node: GovernanceNode) -> None:
110 self.nodes[node.did] = node
111
112 def remove(self, did: str) -> None:
113 if did not in self.nodes:
114 raise UnknownNodeError(f"no node with did {did}")
115 del self.nodes[did]
116
117 def get(self, did: str) -> GovernanceNode:
118 if did not in self.nodes:
119 raise UnknownNodeError(f"no node with did {did}")
120 return self.nodes[did]
121
122 def is_member(self, did: str) -> bool:
123 return did in self.nodes
124
125 def total_weight(self) -> int:
126 return sum(n.weight for n in self.nodes.values())
127
128 def list_dids(self) -> list[str]:
129 return sorted(self.nodes.keys())
130
131 def __len__(self) -> int:
132 return len(self.nodes)
133