examples/detect_dataset_swap.py
3.3 KB · 92 lines · python Raw
1 """Detect a training-dataset swap between two MBOM versions.
2
3 Simulates an attacker (or careless pipeline) that replaces a training-data
4 component in-place: same component_id, different content_hash. The MBOM diff
5 flags it, and any tampering with the *signed* MBOM fails verification.
6 """
7
8 from __future__ import annotations
9
10 import hashlib
11
12 from quantumshield.identity.agent import AgentIdentity
13
14 from pqc_mbom import (
15 ComponentType,
16 MBOM,
17 MBOMBuilder,
18 MBOMSigner,
19 MBOMVerifier,
20 diff_mboms,
21 )
22
23
24 def _h(label: str) -> str:
25 return hashlib.sha3_256(label.encode()).hexdigest()
26
27
28 def main() -> None:
29 identity = AgentIdentity.create("publisher")
30 attacker_identity = AgentIdentity.create("attacker")
31
32 # v1 - published
33 v1 = (
34 MBOMBuilder("MyModel", "1.0.0", supplier="Acme")
35 .add_base_architecture("transformer-decoder", version="1", content_hash=_h("arch"))
36 .add_training_data("trusted-corpus", content_hash=_h("good-dataset"), content_size=10**9)
37 .add_weights("model.safetensors", content_hash=_h("weights"), content_size=8 * 10**9)
38 .build()
39 )
40 MBOMSigner(identity).sign(v1)
41 v1_json = v1.to_json()
42
43 # v2 - same component_id for training data, different hash (a swap).
44 v2 = MBOM.from_json(v1_json)
45 v2.model_version = "1.0.1"
46 for c in v2.components:
47 if c.component_type == ComponentType.TRAINING_DATA:
48 c.content_hash = _h("poisoned-dataset")
49 v2.recompute_root()
50 MBOMSigner(identity).sign(v2)
51
52 diff = diff_mboms(v1, v2)
53 print("Diff v1 -> v2:")
54 print(f" added: {[c.name for c in diff.added]}")
55 print(f" removed: {[c.name for c in diff.removed]}")
56 print(f" changed: {[(o.name, o.content_hash[:8], '->', n.content_hash[:8]) for o, n in diff.changed]}")
57 assert diff.changed, "dataset swap should be detected as a change"
58 assert diff.changed[0][0].component_type == ComponentType.TRAINING_DATA
59
60 # Both v1 and v2 verify on their own - they are each legitimately signed.
61 assert MBOMVerifier.verify(v1).valid
62 assert MBOMVerifier.verify(v2).valid
63
64 # Now the attacker scenario: tamper with a *signed* MBOM without re-signing.
65 tampered = MBOM.from_json(v1_json)
66 for c in tampered.components:
67 if c.component_type == ComponentType.TRAINING_DATA:
68 c.content_hash = _h("poisoned-dataset")
69 # Does NOT recompute root, does NOT re-sign.
70 result = MBOMVerifier.verify(tampered)
71 print("\nUnsigned tampering attempt (same signer, no re-sign):")
72 print(f" valid={result.valid} error={result.error}")
73 assert not result.valid
74
75 # Adversarial re-sign by a *different* identity still verifies
76 # cryptographically, but the signer_did reveals the swap - trust policy
77 # layer should reject unknown signers.
78 forged = MBOM.from_json(v1_json)
79 for c in forged.components:
80 if c.component_type == ComponentType.TRAINING_DATA:
81 c.content_hash = _h("poisoned-dataset")
82 MBOMSigner(attacker_identity).sign(forged)
83 result = MBOMVerifier.verify(forged)
84 print("\nForged MBOM re-signed by attacker:")
85 print(f" valid={result.valid} signer_did={result.signer_did}")
86 print(f" original signer was {identity.did}")
87 print(" -> trust-policy layer rejects: signer_did not in allow-list")
88
89
90 if __name__ == "__main__":
91 main()
92