examples/multi_tenant_isolation.py
| 1 | """Multi-tenant isolation example. |
| 2 | |
| 3 | Demonstrates: |
| 4 | - Two tenants (Alice and Bob) share the same inference process. |
| 5 | - Each has their own TenantSession managed by TenantIsolationManager. |
| 6 | - Alice tries to decrypt Bob's cache entry via her session -> rejected. |
| 7 | - Violation is logged to the audit trail. |
| 8 | """ |
| 9 | |
| 10 | from __future__ import annotations |
| 11 | |
| 12 | import os |
| 13 | |
| 14 | from pqc_kv_cache import ( |
| 15 | EntryMetadata, |
| 16 | KVAuditLog, |
| 17 | KVCacheEntry, |
| 18 | TenantIdentity, |
| 19 | TenantIsolationError, |
| 20 | TenantIsolationManager, |
| 21 | ) |
| 22 | |
| 23 | |
| 24 | def make_entry(tenant_id: str, session_id: str, pos: int) -> KVCacheEntry: |
| 25 | meta = EntryMetadata( |
| 26 | tenant_id=tenant_id, |
| 27 | session_id=session_id, |
| 28 | layer_idx=0, |
| 29 | position=pos, |
| 30 | token_id=100 + pos, |
| 31 | ) |
| 32 | return KVCacheEntry( |
| 33 | metadata=meta, |
| 34 | key_tensor_bytes=os.urandom(32), |
| 35 | value_tensor_bytes=os.urandom(32), |
| 36 | ) |
| 37 | |
| 38 | |
| 39 | def main() -> None: |
| 40 | mgr = TenantIsolationManager() |
| 41 | audit = KVAuditLog() |
| 42 | |
| 43 | alice = TenantIdentity(tenant_id="tenant-alice", display_name="Alice Inc.") |
| 44 | bob = TenantIdentity(tenant_id="tenant-bob", display_name="Bob LLC") |
| 45 | |
| 46 | s_alice = mgr.create_session(alice) |
| 47 | s_bob = mgr.create_session(bob) |
| 48 | print(f"Alice session: {s_alice.session_id}") |
| 49 | print(f"Bob session: {s_bob.session_id}") |
| 50 | print(f"Active tenants: {mgr.list_active_tenants()}") |
| 51 | |
| 52 | # Each tenant encrypts their own KV entry |
| 53 | alice_entry = make_entry(alice.tenant_id, s_alice.session_id, pos=0) |
| 54 | bob_entry = make_entry(bob.tenant_id, s_bob.session_id, pos=0) |
| 55 | |
| 56 | alice_enc = mgr.encrypt(alice.tenant_id, alice_entry) |
| 57 | bob_enc = mgr.encrypt(bob.tenant_id, bob_entry) |
| 58 | print("Alice encrypted her entry.") |
| 59 | print("Bob encrypted his entry.") |
| 60 | |
| 61 | # Alice attempts to decrypt Bob's ciphertext through her session |
| 62 | try: |
| 63 | mgr.decrypt(alice.tenant_id, bob_enc) |
| 64 | except TenantIsolationError as exc: |
| 65 | print(f"\nExpected isolation violation: {exc}") |
| 66 | audit.log_isolation_violation( |
| 67 | attacker_tenant=alice.tenant_id, |
| 68 | target_tenant=bob.tenant_id, |
| 69 | details="attempted cross-tenant decrypt", |
| 70 | ) |
| 71 | |
| 72 | # Alice can still decrypt her own |
| 73 | dec_a = mgr.decrypt(alice.tenant_id, alice_enc) |
| 74 | assert dec_a.key_tensor_bytes == alice_entry.key_tensor_bytes |
| 75 | print("Alice successfully decrypted her own entry.") |
| 76 | |
| 77 | # Bob can still decrypt his own |
| 78 | dec_b = mgr.decrypt(bob.tenant_id, bob_enc) |
| 79 | assert dec_b.value_tensor_bytes == bob_entry.value_tensor_bytes |
| 80 | print("Bob successfully decrypted his own entry.") |
| 81 | |
| 82 | print("\nIsolation audit entries:") |
| 83 | for entry in audit.entries(operation="isolation-violation"): |
| 84 | print(f" {entry.timestamp} attacker={entry.tenant_id} {entry.details}") |
| 85 | |
| 86 | |
| 87 | if __name__ == "__main__": |
| 88 | main() |
| 89 | |