tests/test_server.py
3.6 KB · 108 lines · python Raw
1 """Tests for PQCMCPServer — tool registration, verification, signing."""
2
3 from __future__ import annotations
4
5 import pytest
6 from quantumshield.identity.agent import AgentIdentity
7
8 from pqc_mcp_transport.server import PQCMCPServer
9 from pqc_mcp_transport.signer import MessageSigner
10
11
12 @pytest.fixture
13 def server(server_identity: AgentIdentity) -> PQCMCPServer:
14 srv = PQCMCPServer(identity=server_identity, require_auth=True)
15
16 @srv.tool("greet", description="Greet someone")
17 async def greet(name: str) -> str:
18 return f"Hello, {name}!"
19
20 return srv
21
22
23 class TestToolRegistration:
24 def test_server_registers_tool(self, server: PQCMCPServer) -> None:
25 tools = server.get_tool_list()
26 assert len(tools) == 1
27 assert tools[0]["name"] == "greet"
28 assert tools[0]["description"] == "Greet someone"
29
30
31 @pytest.mark.asyncio
32 class TestRequestHandling:
33 async def test_server_verifies_incoming_call(
34 self,
35 server: PQCMCPServer,
36 client_identity: AgentIdentity,
37 server_identity: AgentIdentity,
38 ) -> None:
39 """A properly signed request is accepted and produces a signed response."""
40 # First do a handshake
41 from pqc_mcp_transport.handshake import PQCHandshake
42
43 request, nonce = PQCHandshake.initiate(client_identity)
44 hs_response = await server.handle_handshake(request.to_dict())
45 session_id = hs_response["session_id"]
46
47 # Now send a signed tool call
48 signer = MessageSigner(client_identity)
49 message = {
50 "jsonrpc": "2.0",
51 "method": "tools/call",
52 "id": "req-1",
53 "params": {"name": "greet", "arguments": {"name": "Alice"}},
54 }
55 signed = signer.sign_message(message)
56 signed["_pqc"]["session_id"] = session_id
57
58 response = await server.handle_request(signed)
59 assert "_pqc" in response
60 # Verify the server's response signature
61 vr = MessageSigner.verify_message(response)
62 assert vr.valid is True
63 assert vr.signer_did == server_identity.did
64
65 stripped = MessageSigner.strip_pqc(response)
66 assert stripped["result"]["content"] == "Hello, Alice!"
67
68 async def test_server_rejects_unsigned_call(
69 self, server: PQCMCPServer
70 ) -> None:
71 """An unsigned request is rejected when require_auth is True."""
72 message = {
73 "jsonrpc": "2.0",
74 "method": "tools/call",
75 "id": "req-2",
76 "params": {"name": "greet", "arguments": {"name": "Bob"}},
77 }
78 response = await server.handle_request(message)
79 # Should get an error response (no _pqc signature is also not signed)
80 assert "error" in response
81 assert response["error"]["code"] == -32600
82
83 async def test_server_signs_response(
84 self,
85 server: PQCMCPServer,
86 client_identity: AgentIdentity,
87 server_identity: AgentIdentity,
88 ) -> None:
89 """Every response from the server carries a _pqc envelope."""
90 from pqc_mcp_transport.handshake import PQCHandshake
91
92 request, nonce = PQCHandshake.initiate(client_identity)
93 hs_response = await server.handle_handshake(request.to_dict())
94 session_id = hs_response["session_id"]
95
96 signer = MessageSigner(client_identity)
97 message = {
98 "jsonrpc": "2.0",
99 "method": "tools/list",
100 "id": "req-3",
101 }
102 signed = signer.sign_message(message)
103 signed["_pqc"]["session_id"] = session_id
104
105 response = await server.handle_request(signed)
106 assert "_pqc" in response
107 assert response["_pqc"]["signer_did"] == server_identity.did
108