examples/key_rotation.py
2.4 KB · 81 lines · python Raw
1 """Key rotation example.
2
3 Demonstrates:
4 - KeyRotationPolicy with a low entry threshold (5).
5 - Encrypting 10 entries in one session.
6 - Rotating the key after 5 entries.
7 - Verifying the new key materially differs from the old one.
8 """
9
10 from __future__ import annotations
11
12 import os
13
14 from pqc_kv_cache import (
15 CacheEncryptor,
16 EntryMetadata,
17 KeyRotationPolicy,
18 KVAuditLog,
19 KVCacheEntry,
20 TenantIdentity,
21 establish_tenant_session,
22 )
23
24
25 def main() -> None:
26 tenant = TenantIdentity(tenant_id="tenant-alice", display_name="Alice")
27 session = establish_tenant_session(tenant)
28 policy = KeyRotationPolicy(max_entries=5, max_age_seconds=3600)
29 audit = KVAuditLog()
30
31 original_key_prefix = session.symmetric_key[:4].hex()
32 print(f"Original key prefix: {original_key_prefix}")
33
34 rotated_once = False
35 for pos in range(10):
36 meta = EntryMetadata(
37 tenant_id=tenant.tenant_id,
38 session_id=session.session_id,
39 layer_idx=0,
40 position=pos,
41 )
42 entry = KVCacheEntry(
43 metadata=meta,
44 key_tensor_bytes=os.urandom(32),
45 value_tensor_bytes=os.urandom(32),
46 )
47 enc = CacheEncryptor(session).encrypt_entry(entry)
48 audit.log_encrypt(
49 tenant.tenant_id,
50 session.session_id,
51 0,
52 pos,
53 enc.sequence_number,
54 )
55 print(
56 f"pos={pos} encrypted seq={enc.sequence_number} "
57 f"entries_encrypted={session.entries_encrypted}"
58 )
59
60 should, trigger = policy.should_rotate(session)
61 if should and not rotated_once:
62 print(f" -> rotation triggered by {trigger.value}")
63 old_key = session.symmetric_key
64 new_key = policy.rotate(session)
65 audit.log_rotate(tenant.tenant_id, session.session_id, trigger.value)
66 assert new_key != old_key
67 print(f" -> new key prefix: {new_key[:4].hex()}")
68 print(
69 f" -> session reset: entries_encrypted="
70 f"{session.entries_encrypted} next_sequence={session.next_sequence}"
71 )
72 rotated_once = True
73
74 print("\nAudit operations:")
75 for entry in audit.entries(limit=20):
76 print(f" {entry.operation:8s} seq={entry.sequence_number} details={entry.details}")
77
78
79 if __name__ == "__main__":
80 main()
81