src/pqc_hypervisor_attestation/continuous.py
2.6 KB · 75 lines · python Raw
1 """ContinuousAttester — periodic attestation loop."""
2
3 from __future__ import annotations
4
5 import os
6 import time
7 from collections.abc import Callable
8 from dataclasses import dataclass, field
9
10 from pqc_hypervisor_attestation.backends.base import AttestationBackend
11 from pqc_hypervisor_attestation.claim import AttestationClaim, AttestationReport
12 from pqc_hypervisor_attestation.signer import Attester
13
14
15 @dataclass
16 class ContinuousAttester:
17 """Produce a fresh signed AttestationReport every N seconds.
18
19 In production this runs in a daemon thread inside the workload process.
20 For testing and tight loops, call :meth:`attest_once` directly.
21 """
22
23 attester: Attester
24 backend: AttestationBackend
25 workload_id: str
26 expected_hashes: dict[str, str] = field(default_factory=dict) # region_id -> expected_hash
27 ttl_seconds: int = 300
28
29 def attest_once(self, nonce: str | None = None) -> AttestationReport:
30 """Enumerate regions, snapshot each, build and sign a report."""
31 nonce = nonce or os.urandom(16).hex()
32 regions = self.backend.list_regions(self.workload_id)
33 claims: list[AttestationClaim] = []
34 for region in regions:
35 snapshot = self.backend.snapshot(region)
36 expected = self.expected_hashes.get(region.region_id, "")
37 claims.append(
38 AttestationClaim.create(
39 region=region,
40 snapshot=snapshot,
41 expected_hash=expected,
42 workload_id=self.workload_id,
43 platform=self.backend.platform,
44 nonce=nonce,
45 )
46 )
47 report = AttestationReport.create(
48 claims=claims,
49 attester_id=self.attester.identity.did,
50 platform=self.backend.platform,
51 ttl_seconds=self.ttl_seconds,
52 )
53 return self.attester.sign(report)
54
55 def run_for(
56 self,
57 seconds: int,
58 interval: float = 1.0,
59 on_report: Callable[[AttestationReport], None] | None = None,
60 ) -> list[AttestationReport]:
61 """Run the attestation loop for a bounded number of seconds.
62
63 Returns the list of reports produced. Intended for tests / demos.
64 Production deployments wire this into a systemd timer or equivalent.
65 """
66 reports: list[AttestationReport] = []
67 end = time.time() + seconds
68 while time.time() < end:
69 report = self.attest_once()
70 reports.append(report)
71 if on_report:
72 on_report(report)
73 time.sleep(interval)
74 return reports
75