src/pqc_enclave_sdk/audit.py
5.0 KB · 173 lines · python Raw
1 """Append-only audit log for enclave operations."""
2
3 from __future__ import annotations
4
5 import json
6 from dataclasses import asdict, dataclass
7 from datetime import datetime, timezone
8 from typing import Any
9
10
11 @dataclass
12 class EnclaveAuditEntry:
13 """One audit event: who did what, to which artifact, when."""
14
15 timestamp: str
16 operation: str # 'unlock' | 'lock' | 'put' | 'get' | 'delete' | 'policy_violation' | 'attest'
17 device_id: str
18 artifact_id: str = ""
19 artifact_name: str = ""
20 artifact_kind: str = ""
21 success: bool = True
22 details: str = ""
23
24 def to_dict(self) -> dict[str, Any]:
25 return asdict(self)
26
27 @classmethod
28 def from_dict(cls, data: dict[str, Any]) -> EnclaveAuditEntry:
29 return cls(**data)
30
31
32 class EnclaveAuditLog:
33 """Append-only in-memory audit log. Production should persist entries."""
34
35 def __init__(self, max_entries: int = 100_000) -> None:
36 self._entries: list[EnclaveAuditEntry] = []
37 self._max = max_entries
38
39 def _append(self, entry: EnclaveAuditEntry) -> EnclaveAuditEntry:
40 if len(self._entries) >= self._max:
41 self._entries.pop(0)
42 self._entries.append(entry)
43 return entry
44
45 @staticmethod
46 def _now() -> str:
47 return datetime.now(timezone.utc).isoformat()
48
49 # -- operation-specific log helpers ------------------------------------
50
51 def log_unlock(self, device_id: str, key_id: str) -> EnclaveAuditEntry:
52 return self._append(
53 EnclaveAuditEntry(
54 timestamp=self._now(),
55 operation="unlock",
56 device_id=device_id,
57 success=True,
58 details=f"key_id={key_id}",
59 )
60 )
61
62 def log_lock(self, device_id: str) -> EnclaveAuditEntry:
63 return self._append(
64 EnclaveAuditEntry(
65 timestamp=self._now(),
66 operation="lock",
67 device_id=device_id,
68 success=True,
69 )
70 )
71
72 def log_put(
73 self, device_id: str, artifact_id: str, artifact_name: str, artifact_kind: str
74 ) -> EnclaveAuditEntry:
75 return self._append(
76 EnclaveAuditEntry(
77 timestamp=self._now(),
78 operation="put",
79 device_id=device_id,
80 artifact_id=artifact_id,
81 artifact_name=artifact_name,
82 artifact_kind=artifact_kind,
83 success=True,
84 )
85 )
86
87 def log_get(
88 self,
89 device_id: str,
90 artifact_id: str,
91 success: bool = True,
92 details: str = "",
93 ) -> EnclaveAuditEntry:
94 return self._append(
95 EnclaveAuditEntry(
96 timestamp=self._now(),
97 operation="get",
98 device_id=device_id,
99 artifact_id=artifact_id,
100 success=success,
101 details=details,
102 )
103 )
104
105 def log_delete(
106 self, device_id: str, artifact_id: str, artifact_name: str
107 ) -> EnclaveAuditEntry:
108 return self._append(
109 EnclaveAuditEntry(
110 timestamp=self._now(),
111 operation="delete",
112 device_id=device_id,
113 artifact_id=artifact_id,
114 artifact_name=artifact_name,
115 success=True,
116 )
117 )
118
119 def log_policy_violation(
120 self, device_id: str, artifact_id: str, details: str
121 ) -> EnclaveAuditEntry:
122 return self._append(
123 EnclaveAuditEntry(
124 timestamp=self._now(),
125 operation="policy_violation",
126 device_id=device_id,
127 artifact_id=artifact_id,
128 success=False,
129 details=details,
130 )
131 )
132
133 def log_attest(
134 self, device_id: str, artifact_id: str, details: str = ""
135 ) -> EnclaveAuditEntry:
136 return self._append(
137 EnclaveAuditEntry(
138 timestamp=self._now(),
139 operation="attest",
140 device_id=device_id,
141 artifact_id=artifact_id,
142 success=True,
143 details=details,
144 )
145 )
146
147 # -- query -------------------------------------------------------------
148
149 def entries(
150 self,
151 limit: int = 100,
152 operation: str | None = None,
153 device_id: str | None = None,
154 artifact_id: str | None = None,
155 ) -> list[EnclaveAuditEntry]:
156 out = self._entries
157 if operation:
158 out = [e for e in out if e.operation == operation]
159 if device_id:
160 out = [e for e in out if e.device_id == device_id]
161 if artifact_id:
162 out = [e for e in out if e.artifact_id == artifact_id]
163 return out[-limit:][::-1]
164
165 def export_json(self) -> str:
166 return json.dumps([e.to_dict() for e in self._entries], indent=2)
167
168 def clear(self) -> None:
169 self._entries.clear()
170
171 def __len__(self) -> int:
172 return len(self._entries)
173