tests/test_server.py
| 1 | """Tests for PQCMCPServer — tool registration, verification, signing.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import pytest |
| 6 | from quantumshield.identity.agent import AgentIdentity |
| 7 | |
| 8 | from pqc_mcp_transport.server import PQCMCPServer |
| 9 | from pqc_mcp_transport.signer import MessageSigner |
| 10 | |
| 11 | |
| 12 | @pytest.fixture |
| 13 | def server(server_identity: AgentIdentity) -> PQCMCPServer: |
| 14 | srv = PQCMCPServer(identity=server_identity, require_auth=True) |
| 15 | |
| 16 | @srv.tool("greet", description="Greet someone") |
| 17 | async def greet(name: str) -> str: |
| 18 | return f"Hello, {name}!" |
| 19 | |
| 20 | return srv |
| 21 | |
| 22 | |
| 23 | class TestToolRegistration: |
| 24 | def test_server_registers_tool(self, server: PQCMCPServer) -> None: |
| 25 | tools = server.get_tool_list() |
| 26 | assert len(tools) == 1 |
| 27 | assert tools[0]["name"] == "greet" |
| 28 | assert tools[0]["description"] == "Greet someone" |
| 29 | |
| 30 | |
| 31 | @pytest.mark.asyncio |
| 32 | class TestRequestHandling: |
| 33 | async def test_server_verifies_incoming_call( |
| 34 | self, |
| 35 | server: PQCMCPServer, |
| 36 | client_identity: AgentIdentity, |
| 37 | server_identity: AgentIdentity, |
| 38 | ) -> None: |
| 39 | """A properly signed request is accepted and produces a signed response.""" |
| 40 | # First do a handshake |
| 41 | from pqc_mcp_transport.handshake import PQCHandshake |
| 42 | |
| 43 | request, nonce = PQCHandshake.initiate(client_identity) |
| 44 | hs_response = await server.handle_handshake(request.to_dict()) |
| 45 | session_id = hs_response["session_id"] |
| 46 | |
| 47 | # Now send a signed tool call |
| 48 | signer = MessageSigner(client_identity) |
| 49 | message = { |
| 50 | "jsonrpc": "2.0", |
| 51 | "method": "tools/call", |
| 52 | "id": "req-1", |
| 53 | "params": {"name": "greet", "arguments": {"name": "Alice"}}, |
| 54 | } |
| 55 | signed = signer.sign_message(message) |
| 56 | signed["_pqc"]["session_id"] = session_id |
| 57 | |
| 58 | response = await server.handle_request(signed) |
| 59 | assert "_pqc" in response |
| 60 | # Verify the server's response signature |
| 61 | vr = MessageSigner.verify_message(response) |
| 62 | assert vr.valid is True |
| 63 | assert vr.signer_did == server_identity.did |
| 64 | |
| 65 | stripped = MessageSigner.strip_pqc(response) |
| 66 | assert stripped["result"]["content"] == "Hello, Alice!" |
| 67 | |
| 68 | async def test_server_rejects_unsigned_call( |
| 69 | self, server: PQCMCPServer |
| 70 | ) -> None: |
| 71 | """An unsigned request is rejected when require_auth is True.""" |
| 72 | message = { |
| 73 | "jsonrpc": "2.0", |
| 74 | "method": "tools/call", |
| 75 | "id": "req-2", |
| 76 | "params": {"name": "greet", "arguments": {"name": "Bob"}}, |
| 77 | } |
| 78 | response = await server.handle_request(message) |
| 79 | # Should get an error response (no _pqc signature is also not signed) |
| 80 | assert "error" in response |
| 81 | assert response["error"]["code"] == -32600 |
| 82 | |
| 83 | async def test_server_signs_response( |
| 84 | self, |
| 85 | server: PQCMCPServer, |
| 86 | client_identity: AgentIdentity, |
| 87 | server_identity: AgentIdentity, |
| 88 | ) -> None: |
| 89 | """Every response from the server carries a _pqc envelope.""" |
| 90 | from pqc_mcp_transport.handshake import PQCHandshake |
| 91 | |
| 92 | request, nonce = PQCHandshake.initiate(client_identity) |
| 93 | hs_response = await server.handle_handshake(request.to_dict()) |
| 94 | session_id = hs_response["session_id"] |
| 95 | |
| 96 | signer = MessageSigner(client_identity) |
| 97 | message = { |
| 98 | "jsonrpc": "2.0", |
| 99 | "method": "tools/list", |
| 100 | "id": "req-3", |
| 101 | } |
| 102 | signed = signer.sign_message(message) |
| 103 | signed["_pqc"]["session_id"] = session_id |
| 104 | |
| 105 | response = await server.handle_request(signed) |
| 106 | assert "_pqc" in response |
| 107 | assert response["_pqc"]["signer_did"] == server_identity.did |
| 108 | |