src/pqc_audit_log_fs/guard.py
| 1 | """FilesystemGuard - enforce append-only semantics where the OS supports it.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import os |
| 6 | import platform |
| 7 | import stat |
| 8 | import subprocess |
| 9 | |
| 10 | from pqc_audit_log_fs.errors import ImmutabilityViolationError |
| 11 | |
| 12 | |
| 13 | class FilesystemGuard: |
| 14 | """Try to mark sealed segment files append-only / immutable where possible. |
| 15 | |
| 16 | Platform behavior: |
| 17 | - Linux with chattr: sets +a (append-only) on .log and +i (immutable) on .sig.json |
| 18 | - macOS: sets uchg (user immutable) via chflags |
| 19 | - Windows / others: in-process check only (drops write bits via chmod) |
| 20 | |
| 21 | Failures to apply OS-level flags are non-fatal unless `strict=True`. |
| 22 | """ |
| 23 | |
| 24 | def __init__(self, strict: bool = False) -> None: |
| 25 | self.strict = strict |
| 26 | self._platform = platform.system() |
| 27 | |
| 28 | def seal(self, path: str, mode: str = "immutable") -> None: |
| 29 | """Mark `path` as immutable (or append-only) where possible.""" |
| 30 | # 1. Unix file permissions: drop write bits |
| 31 | try: |
| 32 | current = os.stat(path).st_mode |
| 33 | os.chmod(path, current & ~0o222) # remove write for u/g/o |
| 34 | except OSError as exc: |
| 35 | if self.strict: |
| 36 | raise ImmutabilityViolationError( |
| 37 | f"chmod failed for {path}: {exc}" |
| 38 | ) from exc |
| 39 | |
| 40 | # 2. Platform-specific immutable flag |
| 41 | if self._platform == "Linux": |
| 42 | try: |
| 43 | flag = "+a" if mode == "append-only" else "+i" |
| 44 | subprocess.run( |
| 45 | ["chattr", flag, path], |
| 46 | check=False, capture_output=True, timeout=5, |
| 47 | ) |
| 48 | except (OSError, subprocess.SubprocessError) as exc: |
| 49 | if self.strict: |
| 50 | raise ImmutabilityViolationError( |
| 51 | f"chattr failed for {path}" |
| 52 | ) from exc |
| 53 | elif self._platform == "Darwin": |
| 54 | try: |
| 55 | subprocess.run( |
| 56 | ["chflags", "uchg", path], |
| 57 | check=False, capture_output=True, timeout=5, |
| 58 | ) |
| 59 | except (OSError, subprocess.SubprocessError) as exc: |
| 60 | if self.strict: |
| 61 | raise ImmutabilityViolationError( |
| 62 | f"chflags failed for {path}" |
| 63 | ) from exc |
| 64 | # Windows / others: no-op beyond chmod |
| 65 | |
| 66 | def verify_read_only(self, path: str) -> bool: |
| 67 | """Return True if the file appears read-only from our process's POV.""" |
| 68 | if not os.path.exists(path): |
| 69 | return False |
| 70 | try: |
| 71 | st = os.stat(path) |
| 72 | writable = bool(st.st_mode & stat.S_IWUSR) |
| 73 | return not writable |
| 74 | except OSError: |
| 75 | return False |
| 76 | |