src/pqc_ai_governance/node.py
| 1 | """GovernanceNode - one voting participant in the consensus.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import hashlib |
| 6 | from dataclasses import dataclass, field |
| 7 | |
| 8 | from quantumshield.core.algorithms import SignatureAlgorithm |
| 9 | from quantumshield.core.signatures import sign, verify |
| 10 | from quantumshield.identity.agent import AgentIdentity |
| 11 | |
| 12 | from pqc_ai_governance.errors import UnknownNodeError |
| 13 | from pqc_ai_governance.proposal import GovernanceProposal |
| 14 | from pqc_ai_governance.vote import SignedVote, Vote, VoteDecision |
| 15 | |
| 16 | |
| 17 | @dataclass |
| 18 | class GovernanceNode: |
| 19 | """A voting node in the governance consensus. |
| 20 | |
| 21 | Wraps an ``AgentIdentity``; produces signed proposals and signed votes. |
| 22 | ``weight`` of 1 gives a one-node-one-vote system; >1 yields weighted voting. |
| 23 | """ |
| 24 | |
| 25 | identity: AgentIdentity |
| 26 | name: str |
| 27 | weight: int = 1 |
| 28 | |
| 29 | @property |
| 30 | def did(self) -> str: |
| 31 | return self.identity.did |
| 32 | |
| 33 | def sign_proposal(self, proposal: GovernanceProposal) -> GovernanceProposal: |
| 34 | """Sign the proposal as the proposer.""" |
| 35 | digest = hashlib.sha3_256(proposal.canonical_bytes()).digest() |
| 36 | sig = sign(digest, self.identity.signing_keypair) |
| 37 | proposal.signer_did = self.identity.did |
| 38 | proposal.algorithm = self.identity.signing_keypair.algorithm.value |
| 39 | proposal.signature = sig.hex() |
| 40 | proposal.public_key = self.identity.signing_keypair.public_key.hex() |
| 41 | return proposal |
| 42 | |
| 43 | def cast_vote( |
| 44 | self, |
| 45 | proposal: GovernanceProposal, |
| 46 | decision: VoteDecision, |
| 47 | rationale: str = "", |
| 48 | ) -> SignedVote: |
| 49 | """Cast a signed vote on a proposal.""" |
| 50 | vote = Vote.create( |
| 51 | proposal_id=proposal.proposal_id, |
| 52 | proposal_hash=proposal.proposal_hash(), |
| 53 | voter_did=self.identity.did, |
| 54 | decision=decision, |
| 55 | rationale=rationale, |
| 56 | ) |
| 57 | digest = hashlib.sha3_256(vote.canonical_bytes()).digest() |
| 58 | sig = sign(digest, self.identity.signing_keypair) |
| 59 | return SignedVote( |
| 60 | vote=vote, |
| 61 | algorithm=self.identity.signing_keypair.algorithm.value, |
| 62 | signature=sig.hex(), |
| 63 | public_key=self.identity.signing_keypair.public_key.hex(), |
| 64 | ) |
| 65 | |
| 66 | @staticmethod |
| 67 | def verify_vote(signed: SignedVote) -> bool: |
| 68 | try: |
| 69 | algorithm = SignatureAlgorithm(signed.algorithm) |
| 70 | except ValueError: |
| 71 | return False |
| 72 | digest = hashlib.sha3_256(signed.vote.canonical_bytes()).digest() |
| 73 | try: |
| 74 | return verify( |
| 75 | digest, |
| 76 | bytes.fromhex(signed.signature), |
| 77 | bytes.fromhex(signed.public_key), |
| 78 | algorithm, |
| 79 | ) |
| 80 | except Exception: |
| 81 | return False |
| 82 | |
| 83 | @staticmethod |
| 84 | def verify_proposal(proposal: GovernanceProposal) -> bool: |
| 85 | if not proposal.signature: |
| 86 | return False |
| 87 | try: |
| 88 | algorithm = SignatureAlgorithm(proposal.algorithm) |
| 89 | except ValueError: |
| 90 | return False |
| 91 | digest = hashlib.sha3_256(proposal.canonical_bytes()).digest() |
| 92 | try: |
| 93 | return verify( |
| 94 | digest, |
| 95 | bytes.fromhex(proposal.signature), |
| 96 | bytes.fromhex(proposal.public_key), |
| 97 | algorithm, |
| 98 | ) |
| 99 | except Exception: |
| 100 | return False |
| 101 | |
| 102 | |
| 103 | @dataclass |
| 104 | class NodeRegistry: |
| 105 | """Allow-list of governance nodes, keyed by DID.""" |
| 106 | |
| 107 | nodes: dict[str, GovernanceNode] = field(default_factory=dict) |
| 108 | |
| 109 | def register(self, node: GovernanceNode) -> None: |
| 110 | self.nodes[node.did] = node |
| 111 | |
| 112 | def remove(self, did: str) -> None: |
| 113 | if did not in self.nodes: |
| 114 | raise UnknownNodeError(f"no node with did {did}") |
| 115 | del self.nodes[did] |
| 116 | |
| 117 | def get(self, did: str) -> GovernanceNode: |
| 118 | if did not in self.nodes: |
| 119 | raise UnknownNodeError(f"no node with did {did}") |
| 120 | return self.nodes[did] |
| 121 | |
| 122 | def is_member(self, did: str) -> bool: |
| 123 | return did in self.nodes |
| 124 | |
| 125 | def total_weight(self) -> int: |
| 126 | return sum(n.weight for n in self.nodes.values()) |
| 127 | |
| 128 | def list_dids(self) -> list[str]: |
| 129 | return sorted(self.nodes.keys()) |
| 130 | |
| 131 | def __len__(self) -> int: |
| 132 | return len(self.nodes) |
| 133 | |