tests/test_isolation.py
| 1 | """Tests for TenantIsolationManager.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from datetime import datetime, timedelta, timezone |
| 6 | |
| 7 | import pytest |
| 8 | |
| 9 | from pqc_kv_cache.errors import TenantIsolationError, UnknownTenantError |
| 10 | from pqc_kv_cache.isolation import TenantIsolationManager |
| 11 | |
| 12 | |
| 13 | def test_create_session_stores_by_tenant(tenant_alice) -> None: |
| 14 | mgr = TenantIsolationManager() |
| 15 | s = mgr.create_session(tenant_alice) |
| 16 | assert mgr.sessions[tenant_alice.tenant_id] is s |
| 17 | # Re-create returns same valid session |
| 18 | s2 = mgr.create_session(tenant_alice) |
| 19 | assert s2 is s |
| 20 | |
| 21 | |
| 22 | def test_get_session_unknown_raises() -> None: |
| 23 | mgr = TenantIsolationManager() |
| 24 | with pytest.raises(UnknownTenantError): |
| 25 | mgr.get_session("no-such-tenant") |
| 26 | |
| 27 | |
| 28 | def test_manager_encrypt_decrypt_roundtrip( |
| 29 | tenant_alice, sample_entry_factory |
| 30 | ) -> None: |
| 31 | mgr = TenantIsolationManager() |
| 32 | session = mgr.create_session(tenant_alice) |
| 33 | entry = sample_entry_factory(session, 1, 2) |
| 34 | enc = mgr.encrypt(tenant_alice.tenant_id, entry) |
| 35 | dec = mgr.decrypt(tenant_alice.tenant_id, enc) |
| 36 | assert dec.key_tensor_bytes == entry.key_tensor_bytes |
| 37 | assert dec.value_tensor_bytes == entry.value_tensor_bytes |
| 38 | |
| 39 | |
| 40 | def test_decrypt_alice_entry_with_bob_session_raises( |
| 41 | tenant_alice, tenant_bob, sample_entry_factory |
| 42 | ) -> None: |
| 43 | mgr = TenantIsolationManager() |
| 44 | session_a = mgr.create_session(tenant_alice) |
| 45 | mgr.create_session(tenant_bob) |
| 46 | entry = sample_entry_factory(session_a, 0, 0) |
| 47 | enc = mgr.encrypt(tenant_alice.tenant_id, entry) |
| 48 | # Bob's session tries to decrypt Alice's ciphertext |
| 49 | with pytest.raises(TenantIsolationError): |
| 50 | mgr.decrypt(tenant_bob.tenant_id, enc) |
| 51 | |
| 52 | |
| 53 | def test_list_active_tenants_excludes_expired(tenant_alice, tenant_bob) -> None: |
| 54 | mgr = TenantIsolationManager() |
| 55 | s_a = mgr.create_session(tenant_alice) |
| 56 | mgr.create_session(tenant_bob) |
| 57 | # Expire Alice's session manually |
| 58 | past = datetime.now(timezone.utc) - timedelta(seconds=1) |
| 59 | s_a.expires_at = past.isoformat() |
| 60 | active = mgr.list_active_tenants() |
| 61 | assert tenant_bob.tenant_id in active |
| 62 | assert tenant_alice.tenant_id not in active |
| 63 | |