src/pqc_hypervisor_attestation/continuous.py
| 1 | """ContinuousAttester — periodic attestation loop.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import os |
| 6 | import time |
| 7 | from collections.abc import Callable |
| 8 | from dataclasses import dataclass, field |
| 9 | |
| 10 | from pqc_hypervisor_attestation.backends.base import AttestationBackend |
| 11 | from pqc_hypervisor_attestation.claim import AttestationClaim, AttestationReport |
| 12 | from pqc_hypervisor_attestation.signer import Attester |
| 13 | |
| 14 | |
| 15 | @dataclass |
| 16 | class ContinuousAttester: |
| 17 | """Produce a fresh signed AttestationReport every N seconds. |
| 18 | |
| 19 | In production this runs in a daemon thread inside the workload process. |
| 20 | For testing and tight loops, call :meth:`attest_once` directly. |
| 21 | """ |
| 22 | |
| 23 | attester: Attester |
| 24 | backend: AttestationBackend |
| 25 | workload_id: str |
| 26 | expected_hashes: dict[str, str] = field(default_factory=dict) # region_id -> expected_hash |
| 27 | ttl_seconds: int = 300 |
| 28 | |
| 29 | def attest_once(self, nonce: str | None = None) -> AttestationReport: |
| 30 | """Enumerate regions, snapshot each, build and sign a report.""" |
| 31 | nonce = nonce or os.urandom(16).hex() |
| 32 | regions = self.backend.list_regions(self.workload_id) |
| 33 | claims: list[AttestationClaim] = [] |
| 34 | for region in regions: |
| 35 | snapshot = self.backend.snapshot(region) |
| 36 | expected = self.expected_hashes.get(region.region_id, "") |
| 37 | claims.append( |
| 38 | AttestationClaim.create( |
| 39 | region=region, |
| 40 | snapshot=snapshot, |
| 41 | expected_hash=expected, |
| 42 | workload_id=self.workload_id, |
| 43 | platform=self.backend.platform, |
| 44 | nonce=nonce, |
| 45 | ) |
| 46 | ) |
| 47 | report = AttestationReport.create( |
| 48 | claims=claims, |
| 49 | attester_id=self.attester.identity.did, |
| 50 | platform=self.backend.platform, |
| 51 | ttl_seconds=self.ttl_seconds, |
| 52 | ) |
| 53 | return self.attester.sign(report) |
| 54 | |
| 55 | def run_for( |
| 56 | self, |
| 57 | seconds: int, |
| 58 | interval: float = 1.0, |
| 59 | on_report: Callable[[AttestationReport], None] | None = None, |
| 60 | ) -> list[AttestationReport]: |
| 61 | """Run the attestation loop for a bounded number of seconds. |
| 62 | |
| 63 | Returns the list of reports produced. Intended for tests / demos. |
| 64 | Production deployments wire this into a systemd timer or equivalent. |
| 65 | """ |
| 66 | reports: list[AttestationReport] = [] |
| 67 | end = time.time() + seconds |
| 68 | while time.time() < end: |
| 69 | report = self.attest_once() |
| 70 | reports.append(report) |
| 71 | if on_report: |
| 72 | on_report(report) |
| 73 | time.sleep(interval) |
| 74 | return reports |
| 75 | |