tests/test_signer.py
| 1 | """Tests for UpdateSigner.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from quantumshield.identity.agent import AgentIdentity |
| 6 | |
| 7 | from pqc_federated_learning import ( |
| 8 | ClientUpdate, |
| 9 | ClientUpdateMetadata, |
| 10 | GradientTensor, |
| 11 | UpdateSigner, |
| 12 | ) |
| 13 | |
| 14 | |
| 15 | def _make_update(did: str = "did:pqaid:abc") -> ClientUpdate: |
| 16 | meta = ClientUpdateMetadata( |
| 17 | client_did=did, |
| 18 | round_id="r1", |
| 19 | model_id="m1", |
| 20 | num_samples=10, |
| 21 | ) |
| 22 | tensors = [GradientTensor(name="w", shape=(2,), values=(1.0, 2.0))] |
| 23 | return ClientUpdate.create(meta, tensors) |
| 24 | |
| 25 | |
| 26 | def test_sign_populates_fields(client_a_identity: AgentIdentity) -> None: |
| 27 | u = _make_update(client_a_identity.did) |
| 28 | signed = UpdateSigner(client_a_identity).sign(u) |
| 29 | assert signed.signer_did == client_a_identity.did |
| 30 | assert signed.algorithm == client_a_identity.signing_keypair.algorithm.value |
| 31 | assert signed.signature != "" |
| 32 | assert signed.public_key != "" |
| 33 | assert signed.signed_at != "" |
| 34 | |
| 35 | |
| 36 | def test_verify_success(client_a_identity: AgentIdentity) -> None: |
| 37 | u = _make_update(client_a_identity.did) |
| 38 | signed = UpdateSigner(client_a_identity).sign(u) |
| 39 | result = UpdateSigner.verify(signed) |
| 40 | assert result.valid is True |
| 41 | assert result.content_hash_ok is True |
| 42 | assert result.signature_ok is True |
| 43 | assert result.error is None |
| 44 | |
| 45 | |
| 46 | def test_tamper_detection_on_values(client_a_identity: AgentIdentity) -> None: |
| 47 | u = _make_update(client_a_identity.did) |
| 48 | signed = UpdateSigner(client_a_identity).sign(u) |
| 49 | # Tamper: replace a tensor with different values. Signature stays, hash mismatches. |
| 50 | signed.tensors = [GradientTensor(name="w", shape=(2,), values=(9.0, 9.0))] |
| 51 | result = UpdateSigner.verify(signed) |
| 52 | assert result.valid is False |
| 53 | # Hash no longer matches, and signature no longer verifies the tampered bytes |
| 54 | assert result.content_hash_ok is False or result.signature_ok is False |
| 55 | |
| 56 | |
| 57 | def test_wrong_public_key_fails( |
| 58 | client_a_identity: AgentIdentity, attacker_identity: AgentIdentity |
| 59 | ) -> None: |
| 60 | u = _make_update(client_a_identity.did) |
| 61 | signed = UpdateSigner(client_a_identity).sign(u) |
| 62 | # Swap in attacker's public key |
| 63 | signed.public_key = attacker_identity.signing_keypair.public_key.hex() |
| 64 | result = UpdateSigner.verify(signed) |
| 65 | assert result.valid is False |
| 66 | assert result.signature_ok is False |
| 67 | |
| 68 | |
| 69 | def test_missing_signature_fields_invalid() -> None: |
| 70 | u = _make_update() |
| 71 | # No sign() called - signature fields empty |
| 72 | result = UpdateSigner.verify(u) |
| 73 | assert result.valid is False |
| 74 | assert result.error == "missing signature fields" |
| 75 | |