src/pqc_lint/scanner.py
| 1 | """Filesystem scanner.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import fnmatch |
| 6 | import os |
| 7 | import time |
| 8 | from dataclasses import dataclass, field |
| 9 | from datetime import datetime, timezone |
| 10 | from typing import Iterable |
| 11 | |
| 12 | from pqc_lint.findings import Finding, ScanReport |
| 13 | from pqc_lint.patterns import ALL_MATCHERS, MATCHERS_BY_LANGUAGE, PatternMatcher |
| 14 | from pqc_lint.rules import RULE_BY_ID |
| 15 | |
| 16 | DEFAULT_EXCLUDES = ( |
| 17 | "**/.git/**", |
| 18 | "**/node_modules/**", |
| 19 | "**/__pycache__/**", |
| 20 | "**/.venv/**", |
| 21 | "**/venv/**", |
| 22 | "**/dist/**", |
| 23 | "**/build/**", |
| 24 | "**/.pytest_cache/**", |
| 25 | "**/.ruff_cache/**", |
| 26 | "**/*.min.js", |
| 27 | ) |
| 28 | |
| 29 | # Hard size cap so we don't try to scan 500 MB binaries |
| 30 | MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 # 2 MB |
| 31 | |
| 32 | |
| 33 | def _matches_any(path: str, globs: Iterable[str]) -> bool: |
| 34 | normalized = path.replace(os.sep, "/") |
| 35 | return any(fnmatch.fnmatch(normalized, g) for g in globs) |
| 36 | |
| 37 | |
| 38 | @dataclass |
| 39 | class Scanner: |
| 40 | """Walks a directory and runs pattern matchers against each file.""" |
| 41 | excludes: tuple[str, ...] = DEFAULT_EXCLUDES |
| 42 | languages: tuple[str, ...] = () # empty = all |
| 43 | matchers: list[PatternMatcher] = field(default_factory=list) |
| 44 | max_file_size: int = MAX_FILE_SIZE_BYTES |
| 45 | |
| 46 | def __post_init__(self) -> None: |
| 47 | if not self.matchers: |
| 48 | if self.languages: |
| 49 | self.matchers = [ |
| 50 | MATCHERS_BY_LANGUAGE[lang] |
| 51 | for lang in self.languages |
| 52 | if lang in MATCHERS_BY_LANGUAGE |
| 53 | ] |
| 54 | else: |
| 55 | self.matchers = list(ALL_MATCHERS) |
| 56 | |
| 57 | def _pick_matcher(self, path: str) -> PatternMatcher | None: |
| 58 | for m in self.matchers: |
| 59 | if m.matches_file(path): |
| 60 | return m |
| 61 | return None |
| 62 | |
| 63 | def scan_file(self, file_path: str, root: str | None = None) -> list[Finding]: |
| 64 | matcher = self._pick_matcher(file_path) |
| 65 | if not matcher: |
| 66 | return [] |
| 67 | try: |
| 68 | if os.path.getsize(file_path) > self.max_file_size: |
| 69 | return [] |
| 70 | with open(file_path, "r", encoding="utf-8", errors="replace") as f: |
| 71 | content = f.read() |
| 72 | except (OSError, UnicodeDecodeError): |
| 73 | return [] |
| 74 | |
| 75 | rel = os.path.relpath(file_path, root) if root else file_path |
| 76 | rel = rel.replace(os.sep, "/") |
| 77 | return list(matcher.scan(rel, content, RULE_BY_ID)) |
| 78 | |
| 79 | def scan_path(self, path: str) -> ScanReport: |
| 80 | started = time.time() |
| 81 | report = ScanReport( |
| 82 | scan_root=path, |
| 83 | started_at=datetime.now(timezone.utc).isoformat(), |
| 84 | ) |
| 85 | |
| 86 | if os.path.isfile(path): |
| 87 | root = os.path.dirname(path) or "." |
| 88 | findings = self.scan_file(path, root=root) |
| 89 | report.findings.extend(findings) |
| 90 | report.files_scanned += 1 |
| 91 | report.duration_ms = int((time.time() - started) * 1000) |
| 92 | return report |
| 93 | |
| 94 | for dirpath, dirnames, filenames in os.walk(path): |
| 95 | # prune directories matching excludes |
| 96 | kept_dirs = [] |
| 97 | for d in dirnames: |
| 98 | candidate = os.path.join(dirpath, d) |
| 99 | if not _matches_any(candidate, self.excludes): |
| 100 | kept_dirs.append(d) |
| 101 | dirnames[:] = kept_dirs |
| 102 | |
| 103 | for fn in filenames: |
| 104 | fp = os.path.join(dirpath, fn) |
| 105 | if _matches_any(fp, self.excludes): |
| 106 | report.files_skipped += 1 |
| 107 | continue |
| 108 | matcher = self._pick_matcher(fp) |
| 109 | if not matcher: |
| 110 | report.files_skipped += 1 |
| 111 | continue |
| 112 | findings = self.scan_file(fp, root=path) |
| 113 | report.findings.extend(findings) |
| 114 | report.files_scanned += 1 |
| 115 | |
| 116 | report.duration_ms = int((time.time() - started) * 1000) |
| 117 | return report |
| 118 | |