tests/test_session.py
| 1 | """Tests for TenantSession and establish_tenant_session.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | from datetime import datetime, timedelta, timezone |
| 6 | |
| 7 | import pytest |
| 8 | |
| 9 | from pqc_kv_cache.errors import SessionExpiredError |
| 10 | from pqc_kv_cache.session import ( |
| 11 | TenantIdentity, |
| 12 | establish_tenant_session, |
| 13 | ) |
| 14 | |
| 15 | |
| 16 | def test_establish_creates_valid_session() -> None: |
| 17 | tenant = TenantIdentity(tenant_id="t1", display_name="T 1") |
| 18 | s = establish_tenant_session(tenant) |
| 19 | assert s.is_valid() |
| 20 | assert len(s.symmetric_key) == 32 |
| 21 | assert s.tenant == tenant |
| 22 | assert s.session_id.startswith("urn:pqc-kv-sess:") |
| 23 | |
| 24 | |
| 25 | def test_session_id_unique_across_calls() -> None: |
| 26 | tenant = TenantIdentity(tenant_id="t1") |
| 27 | s1 = establish_tenant_session(tenant) |
| 28 | s2 = establish_tenant_session(tenant) |
| 29 | assert s1.session_id != s2.session_id |
| 30 | |
| 31 | |
| 32 | def test_consume_sequence_increments() -> None: |
| 33 | s = establish_tenant_session(TenantIdentity(tenant_id="t1")) |
| 34 | assert s.consume_sequence() == 1 |
| 35 | assert s.consume_sequence() == 2 |
| 36 | assert s.consume_sequence() == 3 |
| 37 | assert s.entries_encrypted == 3 |
| 38 | |
| 39 | |
| 40 | def test_rotate_key_resets_counters() -> None: |
| 41 | s = establish_tenant_session(TenantIdentity(tenant_id="t1")) |
| 42 | s.consume_sequence() |
| 43 | s.consume_sequence() |
| 44 | assert s.entries_encrypted == 2 |
| 45 | new_key = b"\x11" * 32 |
| 46 | s.rotate_key(new_key) |
| 47 | assert s.symmetric_key == new_key |
| 48 | assert s.entries_encrypted == 0 |
| 49 | assert s.next_sequence == 1 |
| 50 | |
| 51 | |
| 52 | def test_is_valid_true_initially() -> None: |
| 53 | s = establish_tenant_session(TenantIdentity(tenant_id="t1")) |
| 54 | assert s.is_valid() |
| 55 | |
| 56 | |
| 57 | def test_expired_session_check_valid_raises() -> None: |
| 58 | s = establish_tenant_session(TenantIdentity(tenant_id="t1")) |
| 59 | past = datetime.now(timezone.utc) - timedelta(seconds=1) |
| 60 | s.expires_at = past.isoformat() |
| 61 | assert not s.is_valid() |
| 62 | with pytest.raises(SessionExpiredError): |
| 63 | s.check_valid() |
| 64 | |