src/pqc_kv_cache/encryptor.py
3.4 KB · 99 lines · python Raw
1 """CacheEncryptor / CacheDecryptor - the AES-256-GCM wrappers."""
2
3 from __future__ import annotations
4
5 import json
6 import os
7
8 from cryptography.hazmat.primitives.ciphers.aead import AESGCM
9
10 from pqc_kv_cache.entry import EncryptedEntry, EntryMetadata, KVCacheEntry
11 from pqc_kv_cache.errors import (
12 DecryptionError,
13 NonceReplayError,
14 TenantIsolationError,
15 )
16 from pqc_kv_cache.session import TenantSession
17
18 NONCE_SIZE = 12
19
20
21 def _aad(metadata: EntryMetadata, sequence_number: int, key_len: int) -> bytes:
22 """Associated data binding metadata + sequence + key_len to the ciphertext."""
23 payload = {
24 "metadata": metadata.to_dict(),
25 "sequence_number": sequence_number,
26 "key_len": key_len,
27 }
28 return json.dumps(
29 payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False
30 ).encode("utf-8")
31
32
33 class CacheEncryptor:
34 """Wraps a TenantSession to produce EncryptedEntry objects."""
35
36 def __init__(self, session: TenantSession):
37 self.session = session
38
39 def encrypt_entry(self, entry: KVCacheEntry) -> EncryptedEntry:
40 self.session.check_valid()
41 if entry.metadata.tenant_id != self.session.tenant.tenant_id:
42 raise TenantIsolationError(
43 f"entry tenant {entry.metadata.tenant_id} != session tenant "
44 f"{self.session.tenant.tenant_id}"
45 )
46 seq = self.session.consume_sequence()
47 nonce = os.urandom(NONCE_SIZE)
48 plaintext = entry.key_tensor_bytes + entry.value_tensor_bytes
49 key_len = len(entry.key_tensor_bytes)
50 aad = _aad(entry.metadata, seq, key_len)
51 aes = AESGCM(self.session.symmetric_key)
52 ct = aes.encrypt(nonce, plaintext, aad)
53 return EncryptedEntry(
54 metadata=entry.metadata,
55 nonce=nonce.hex(),
56 ciphertext=ct.hex(),
57 key_len=key_len,
58 sequence_number=seq,
59 )
60
61
62 class CacheDecryptor:
63 """Wraps a TenantSession to decrypt EncryptedEntry objects.
64
65 Enforces strict tenant isolation: refuses to decrypt entries whose
66 metadata.tenant_id does not match the session's tenant_id, even if the
67 symmetric key would somehow work.
68 """
69
70 def __init__(self, session: TenantSession):
71 self.session = session
72 self._seen_nonces: set[str] = set()
73
74 def decrypt_entry(self, enc: EncryptedEntry) -> KVCacheEntry:
75 self.session.check_valid()
76 if enc.metadata.tenant_id != self.session.tenant.tenant_id:
77 raise TenantIsolationError(
78 f"entry tenant {enc.metadata.tenant_id} != session tenant "
79 f"{self.session.tenant.tenant_id}"
80 )
81 if enc.nonce in self._seen_nonces:
82 raise NonceReplayError(f"nonce {enc.nonce} already consumed")
83 aad = _aad(enc.metadata, enc.sequence_number, enc.key_len)
84 aes = AESGCM(self.session.symmetric_key)
85 try:
86 pt = aes.decrypt(
87 bytes.fromhex(enc.nonce), bytes.fromhex(enc.ciphertext), aad
88 )
89 except Exception as exc:
90 raise DecryptionError(f"AES-GCM decrypt failed: {exc}") from exc
91 self._seen_nonces.add(enc.nonce)
92 key_bytes = pt[: enc.key_len]
93 val_bytes = pt[enc.key_len:]
94 return KVCacheEntry(
95 metadata=enc.metadata,
96 key_tensor_bytes=key_bytes,
97 value_tensor_bytes=val_bytes,
98 )
99