src/pqc_kv_cache/rotation.py
| 1 | """KeyRotationPolicy - decides when to rotate a TenantSession's key.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import os |
| 6 | from dataclasses import dataclass |
| 7 | from datetime import datetime, timezone |
| 8 | from enum import Enum |
| 9 | |
| 10 | from pqc_kv_cache.session import TenantSession |
| 11 | |
| 12 | |
| 13 | class RotationTrigger(str, Enum): |
| 14 | ENTRY_COUNT = "entry-count" |
| 15 | TIME_ELAPSED = "time-elapsed" |
| 16 | MANUAL = "manual" |
| 17 | |
| 18 | |
| 19 | @dataclass |
| 20 | class KeyRotationPolicy: |
| 21 | """Rotate session keys after N entries or T seconds, whichever comes first.""" |
| 22 | |
| 23 | max_entries: int = 100_000 |
| 24 | max_age_seconds: int = 300 |
| 25 | |
| 26 | def should_rotate( |
| 27 | self, session: TenantSession |
| 28 | ) -> tuple[bool, RotationTrigger | None]: |
| 29 | if session.entries_encrypted >= self.max_entries: |
| 30 | return True, RotationTrigger.ENTRY_COUNT |
| 31 | try: |
| 32 | created = datetime.fromisoformat(session.created_at) |
| 33 | age = (datetime.now(timezone.utc) - created).total_seconds() |
| 34 | if age >= self.max_age_seconds: |
| 35 | return True, RotationTrigger.TIME_ELAPSED |
| 36 | except ValueError: |
| 37 | pass |
| 38 | return False, None |
| 39 | |
| 40 | def rotate(self, session: TenantSession) -> bytes: |
| 41 | """Rotate the session key in place. Returns the new key (opaque 32 bytes).""" |
| 42 | new_key = os.urandom(32) |
| 43 | session.rotate_key(new_key) |
| 44 | return new_key |
| 45 | |