src/pqc_mcp_transport/signer.py
3.4 KB · 99 lines · python Raw
1 """Message signing and verification for PQC MCP Transport."""
2
3 from __future__ import annotations
4
5 import hashlib
6 import json
7 import os
8 from dataclasses import dataclass
9 from datetime import datetime, timezone
10
11 from quantumshield.core.algorithms import SignatureAlgorithm
12 from quantumshield.core.signatures import sign, verify
13 from quantumshield.identity.agent import AgentIdentity
14
15
16 @dataclass
17 class VerificationResult:
18 """Result of verifying a PQC-signed MCP message."""
19
20 valid: bool
21 signer_did: str | None = None
22 algorithm: str | None = None
23 timestamp: str | None = None
24 nonce: str | None = None
25 error: str | None = None
26
27
28 class MessageSigner:
29 """Signs and verifies MCP JSON-RPC messages using ML-DSA post-quantum signatures."""
30
31 def __init__(self, identity: AgentIdentity) -> None:
32 self.identity = identity
33
34 @staticmethod
35 def canonicalize(message: dict) -> bytes:
36 """Deterministic JSON serialization for signing.
37
38 Removes the ``_pqc`` envelope if present, sorts keys, and uses
39 compact separators so that the canonical form is reproducible
40 regardless of insertion order or whitespace.
41 """
42 clean = {k: v for k, v in message.items() if k != "_pqc"}
43 return json.dumps(clean, sort_keys=True, separators=(",", ":")).encode("utf-8")
44
45 def sign_message(self, message: dict) -> dict:
46 """Add a ``_pqc`` envelope with an ML-DSA signature to *message*.
47
48 Returns a new dict (the original is not mutated).
49 """
50 canonical = self.canonicalize(message)
51 msg_hash = hashlib.sha3_256(canonical).digest()
52 signature = sign(msg_hash, self.identity.signing_keypair)
53 nonce = os.urandom(16).hex()
54
55 signed = dict(message)
56 signed["_pqc"] = {
57 "signer_did": self.identity.did,
58 "algorithm": self.identity.signing_keypair.algorithm.value,
59 "timestamp": datetime.now(timezone.utc).isoformat(),
60 "nonce": nonce,
61 "signature": signature.hex(),
62 "public_key": self.identity.signing_keypair.public_key.hex(),
63 }
64 return signed
65
66 @staticmethod
67 def verify_message(message: dict) -> VerificationResult:
68 """Verify the ``_pqc`` envelope on *message*.
69
70 Returns a :class:`VerificationResult` whose ``.valid`` flag
71 indicates whether the signature verified successfully.
72 """
73 pqc = message.get("_pqc")
74 if not pqc:
75 return VerificationResult(valid=False, error="No _pqc envelope")
76
77 try:
78 canonical = MessageSigner.canonicalize(message)
79 msg_hash = hashlib.sha3_256(canonical).digest()
80 sig_bytes = bytes.fromhex(pqc["signature"])
81 pub_bytes = bytes.fromhex(pqc["public_key"])
82 algorithm = SignatureAlgorithm(pqc["algorithm"])
83
84 is_valid = verify(msg_hash, sig_bytes, pub_bytes, algorithm)
85 return VerificationResult(
86 valid=is_valid,
87 signer_did=pqc.get("signer_did"),
88 algorithm=pqc.get("algorithm"),
89 timestamp=pqc.get("timestamp"),
90 nonce=pqc.get("nonce"),
91 )
92 except Exception as exc:
93 return VerificationResult(valid=False, error=str(exc))
94
95 @staticmethod
96 def strip_pqc(message: dict) -> dict:
97 """Return a copy of *message* without the ``_pqc`` envelope."""
98 return {k: v for k, v in message.items() if k != "_pqc"}
99