examples/key_rotation.py
| 1 | """Key rotation example. |
| 2 | |
| 3 | Demonstrates: |
| 4 | - KeyRotationPolicy with a low entry threshold (5). |
| 5 | - Encrypting 10 entries in one session. |
| 6 | - Rotating the key after 5 entries. |
| 7 | - Verifying the new key materially differs from the old one. |
| 8 | """ |
| 9 | |
| 10 | from __future__ import annotations |
| 11 | |
| 12 | import os |
| 13 | |
| 14 | from pqc_kv_cache import ( |
| 15 | CacheEncryptor, |
| 16 | EntryMetadata, |
| 17 | KeyRotationPolicy, |
| 18 | KVAuditLog, |
| 19 | KVCacheEntry, |
| 20 | TenantIdentity, |
| 21 | establish_tenant_session, |
| 22 | ) |
| 23 | |
| 24 | |
| 25 | def main() -> None: |
| 26 | tenant = TenantIdentity(tenant_id="tenant-alice", display_name="Alice") |
| 27 | session = establish_tenant_session(tenant) |
| 28 | policy = KeyRotationPolicy(max_entries=5, max_age_seconds=3600) |
| 29 | audit = KVAuditLog() |
| 30 | |
| 31 | original_key_prefix = session.symmetric_key[:4].hex() |
| 32 | print(f"Original key prefix: {original_key_prefix}") |
| 33 | |
| 34 | rotated_once = False |
| 35 | for pos in range(10): |
| 36 | meta = EntryMetadata( |
| 37 | tenant_id=tenant.tenant_id, |
| 38 | session_id=session.session_id, |
| 39 | layer_idx=0, |
| 40 | position=pos, |
| 41 | ) |
| 42 | entry = KVCacheEntry( |
| 43 | metadata=meta, |
| 44 | key_tensor_bytes=os.urandom(32), |
| 45 | value_tensor_bytes=os.urandom(32), |
| 46 | ) |
| 47 | enc = CacheEncryptor(session).encrypt_entry(entry) |
| 48 | audit.log_encrypt( |
| 49 | tenant.tenant_id, |
| 50 | session.session_id, |
| 51 | 0, |
| 52 | pos, |
| 53 | enc.sequence_number, |
| 54 | ) |
| 55 | print( |
| 56 | f"pos={pos} encrypted seq={enc.sequence_number} " |
| 57 | f"entries_encrypted={session.entries_encrypted}" |
| 58 | ) |
| 59 | |
| 60 | should, trigger = policy.should_rotate(session) |
| 61 | if should and not rotated_once: |
| 62 | print(f" -> rotation triggered by {trigger.value}") |
| 63 | old_key = session.symmetric_key |
| 64 | new_key = policy.rotate(session) |
| 65 | audit.log_rotate(tenant.tenant_id, session.session_id, trigger.value) |
| 66 | assert new_key != old_key |
| 67 | print(f" -> new key prefix: {new_key[:4].hex()}") |
| 68 | print( |
| 69 | f" -> session reset: entries_encrypted=" |
| 70 | f"{session.entries_encrypted} next_sequence={session.next_sequence}" |
| 71 | ) |
| 72 | rotated_once = True |
| 73 | |
| 74 | print("\nAudit operations:") |
| 75 | for entry in audit.entries(limit=20): |
| 76 | print(f" {entry.operation:8s} seq={entry.sequence_number} details={entry.details}") |
| 77 | |
| 78 | |
| 79 | if __name__ == "__main__": |
| 80 | main() |
| 81 | |