diff --git a/.gitignore b/.gitignore index 9c9974b60..85078d77c 100644 --- a/.gitignore +++ b/.gitignore @@ -70,3 +70,4 @@ skill-strategy.md apm_modules/ build/tmp/ scout-pipeline-result.png +.copilot/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 37dd70a71..607dae1c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Content security scanning: `apm audit` command with `--file`, `--strip`; install-time pre-deployment gate that blocks critical hidden Unicode characters (override with `--force`); advisory scanning in `compile` and `pack` (#313) - Native Cursor IDE integration — `apm install` deploys primitives to `.cursor/` when the directory exists: instructions→rules (`.mdc`), agents, skills, hooks (`hooks.json`), and MCP (`mcp.json`) - Native OpenCode integration — `apm install` deploys primitives to `.opencode/` when the directory exists: agents, commands (from prompts), skills, and MCP (`opencode.json`) — inspired by @timvw (#257, #306) - `TargetProfile` data layer (`src/apm_cli/integration/targets.py`) — data-driven target definitions for scalable multi-target architecture diff --git a/README.md b/README.md index ee6449493..9de6cfa88 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ apm install # every agent is configured - **Install from anywhere** — GitHub, GitLab, Bitbucket, Azure DevOps, GitHub Enterprise, any git host - **Transitive dependencies** — packages can depend on packages; APM resolves the full tree - **Compile to standards** — `apm compile` produces `AGENTS.md` (GitHub Copilot, OpenCode), `CLAUDE.md` (Claude Code), and `.cursor/rules/` (Cursor) +- **Content security** — `apm audit` scans for hidden Unicode characters; `apm install` blocks compromised packages before agents can read them - **Create & share** — `apm pack` bundles your current configuration as a zipped package - **CI/CD ready** — [GitHub Action](https://github.com/microsoft/apm-action) for automated workflows diff --git a/docs/src/content/docs/enterprise/governance.md b/docs/src/content/docs/enterprise/governance.md index 59144cb51..b2e0f3ad2 100644 --- a/docs/src/content/docs/enterprise/governance.md +++ b/docs/src/content/docs/enterprise/governance.md @@ -100,6 +100,35 @@ No additional tooling is required. The lock file turns git into an agent configu --- +## Content scanning with `apm audit` + +APM scans for hidden Unicode characters that can embed invisible instructions in prompt files. For background on the threat model, severity levels, and the pre-deployment gate that blocks critical findings during `apm install`, see [Content scanning](../security/#content-scanning) in the security model. + +`apm audit` provides on-demand scanning independent of the install flow. + +### Usage + +```bash +apm audit # Scan all installed packages +apm audit # Scan a specific package +apm audit --file .cursorrules # Scan any file (even non-APM-managed) +apm audit --strip # Remove non-critical characters +``` + +### Exit codes + +| Code | Meaning | +|------|---------| +| 0 | Clean — no findings, or info-only | +| 1 | Critical findings — tag characters or bidi overrides detected | +| 2 | Warnings only — zero-width characters or mid-file BOM | + +### The `--file` escape hatch + +`apm audit --file .cursorrules` scans any file, not just APM-managed ones. This is useful for inspecting files obtained outside the APM workflow — downloaded rules files, copy-pasted instructions, or files from PRs. + +--- + ## CI enforcement with `apm audit --ci` :::note[Planned Feature] @@ -348,6 +377,7 @@ No agent configuration change can reach a protected branch without passing throu | Audit trail | Git history of `apm.lock.yaml` | Available | | Constitution injection | `memory/constitution.md` with hash verification | Available | | Transitive MCP trust control | `--trust-transitive-mcp` flag | Available | +| Content scanning | Pre-deploy gate blocks critical hidden Unicode; `apm audit` for on-demand checks | Available | | CI enforcement | `apm audit --ci` as required status check | Planned | | Drift detection | `apm audit --drift` | Planned | | Approved source policies | CODEOWNERS + PR review | Available (manual) | diff --git a/docs/src/content/docs/enterprise/security.md b/docs/src/content/docs/enterprise/security.md index 6af600fb9..ea5e9236a 100644 --- a/docs/src/content/docs/enterprise/security.md +++ b/docs/src/content/docs/enterprise/security.md @@ -1,15 +1,23 @@ --- title: "Security Model" -description: "How APM handles dependency provenance, path security, and supply chain integrity." +description: "How APM handles supply chain security for AI agents — attack surface boundaries, content scanning, dependency provenance, path safety, and MCP trust." sidebar: order: 3 --- This page documents APM's security posture for enterprise security reviews, compliance audits, and supply chain assessments. +## The prompt supply chain is different + +Traditional package managers install code that sits inert until a developer or CI pipeline explicitly executes it. Between `npm install` and `npm start`, there is a gap — time for `npm audit`, code review, and policy checks. + +**Agent configuration has no such gap.** The moment a skill, instruction, or prompt file lands in `.github/prompts/` or `.claude/agents/`, any IDE agent watching the filesystem — Copilot, Cursor, Claude Code — may already be ingesting it. There is no "execution step." File presence IS execution. + +This changes the security model fundamentally. APM treats package deployment as a **pre-deployment gate**: scan first, deploy only if clean. + ## What APM does -APM is a build-time dependency manager for AI prompts and configuration. It performs four operations: +APM is a build-time dependency manager for AI agent configuration. It performs four operations: 1. **Resolves git repositories** — clones or sparse-checks-out packages from GitHub or Azure DevOps. 2. **Deploys static files** — copies markdown, JSON, and YAML files into project directories (`.github/`, `.claude/`, `.cursor/`, `.opencode/`). @@ -53,17 +61,77 @@ The `resolved_commit` field is a full 40-character SHA, not a branch name or tag APM does not use a package registry. Dependencies are specified as git repository URLs in `apm.yaml`. This eliminates the registry compromise vector entirely — there is no centralized service that can be poisoned to redirect installs. -### Reproducible installs +## Content scanning + +### The threat + +Researchers have found hidden Unicode characters embedded in popular shared rules files. Tag characters (U+E0001–E007F) map 1:1 to invisible ASCII. Bidirectional overrides can reorder visible text. Zero-width joiners create invisible gaps. LLMs tokenize all of these individually, meaning models process instructions that developers cannot see on screen. + +### What APM detects + +| Severity | Characters | Risk | +|----------|-----------|------| +| Critical | Tag characters (U+E0001–E007F), bidi overrides (U+202A–E, U+2066–9) | Hidden instruction embedding. Zero legitimate use in prompt files. | +| Warning | Zero-width spaces/joiners (U+200B–D), mid-file BOM (U+FEFF) | Common copy-paste debris, but can hide content. | +| Info | Non-breaking spaces (U+00A0), unusual whitespace (U+2000–200A) | Mostly harmless, flagged for awareness. | + +### Pre-deployment gate + +During `apm install`, source files in `apm_modules/` are scanned **before** any integrator copies them to target directories: + +``` +download → scan source → block or deploy → report +``` + +- **Critical findings block deployment.** The package is downloaded and cached so you can inspect it (`apm_modules/owner/package/`), but nothing reaches agent-readable directories. +- **Warnings are non-blocking.** Zero-width characters are flagged in the diagnostics summary. Files are deployed normally. +- **`--force` overrides the block.** Consistent with existing collision semantics — an explicit "I know what I'm doing." +- **Multi-package installs continue.** A blocked package doesn't stop other packages from installing. + +### Compile and pack scanning + +Content scanning extends beyond install: + +- **`apm compile`** scans compiled output (AGENTS.md, CLAUDE.md, commands) before writing to disk. This is defense-in-depth — source files were already scanned at install, but compilation assembles content from multiple sources and the final output is what agents read. +- **`apm pack`** scans files before bundling. This catches hidden characters before a package is published, preventing authors from accidentally distributing tainted content. + +### On-demand scanning + +`apm audit` scans deployed files or any arbitrary file, independent of the install flow: + +```bash +apm audit # Scan all installed packages +apm audit --file .cursorrules # Scan any file +apm audit --strip # Remove non-critical characters +``` + +The `--file` flag is useful for inspecting files obtained outside APM — downloaded rules files, copy-pasted instructions, or files from pull requests. + +See [Content scanning with `apm audit`](../governance/#content-scanning-with-apm-audit) for usage details and exit codes. + +### Limitations + +Content scanning detects hidden Unicode characters. It does not detect: + +- Plain-text prompt injection (visible but malicious instructions) +- Homoglyph substitution (visually similar characters from different scripts) +- Semantic manipulation (subtly misleading but syntactically normal text) +- Binary payload embedding + +`--strip` removes non-critical characters from deployed copies. It does not modify the source package — the next `apm install` restores them. For persistent remediation, fix the upstream package or pin to a clean commit. + +### Planned hardening -Given the same `apm.lock.yaml`, `apm install` produces identical file output regardless of when or where it runs. The lock file is the single source of truth for dependency state. +- **Content integrity hashing** — SHA-256 checksums stored in `apm.lock.yaml` to verify downloaded content hasn't been tampered with. +- **Hook transparency** — display hook script contents during install so developers can review what will execute. ## Path security -APM deploys files only to controlled subdirectories within the project root. Three mechanisms enforce this boundary. +APM deploys files only to controlled subdirectories within the project root. ### Path traversal prevention -All deploy paths are validated before any file operation. The `validate_deploy_path` check enforces three rules: +All deploy paths are validated before any file operation: 1. **No `..` segments.** Any path containing `..` is rejected outright. 2. **Allowed prefixes only.** Paths must start with an allowed prefix (`.github/`, `.claude/`, `.cursor/`, or `.opencode/`). @@ -89,16 +157,6 @@ When APM deploys a file, it checks whether a file already exists at the target p - If the file is **not tracked** (user-authored or created by another tool), APM skips it and prints a warning. - The `--force` flag overrides collision detection, allowing APM to overwrite untracked files. -Managed file lookups use pre-normalized paths for O(1) set membership checks. - -### Managed files tracking - -The lock file records every file deployed by APM in the `deployed_files` list for each dependency. This enables: - -- **Clean uninstall.** `apm uninstall` removes exactly the files APM deployed, nothing more. -- **Orphan detection.** Files present on disk but absent from the lock file are flagged. -- **Collision awareness.** The managed set distinguishes APM-deployed files from user-authored files. - ## MCP server trust model APM integrates MCP (Model Context Protocol) server configurations from packages. Trust is explicit and scoped by dependency depth. @@ -109,43 +167,29 @@ MCP servers declared by your direct dependencies (packages listed in your `apm.y ### Transitive dependencies -MCP servers declared by transitive dependencies (dependencies of your dependencies) are **blocked by default**. APM prints a warning and skips the MCP server entry. +MCP servers declared by transitive dependencies (dependencies of your dependencies) are **blocked by default**. Transitive MCP servers can request tool access, file system permissions, or network capabilities — blocking them ensures that adding a prompt package cannot silently grant MCP access to an unknown transitive dependency. To allow transitive MCP servers, you must either: - **Re-declare the dependency** in your own `apm.yaml`, promoting it to a direct dependency. - **Pass `--trust-transitive-mcp`** to explicitly opt in to transitive MCP servers for that install. -### Design rationale - -Transitive MCP servers can request tool access, file system permissions, or network capabilities from the AI assistant. Blocking them by default ensures that adding a prompt package cannot silently grant MCP access to an unknown transitive dependency. - ## Token handling APM authenticates to git hosts using personal access tokens (PATs) read from environment variables. -### Token resolution - | Purpose | Environment variables (checked in order) | |---|---| | GitHub packages | `GITHUB_APM_PAT`, `GITHUB_TOKEN`, `GH_TOKEN` | | Azure DevOps packages | `ADO_APM_PAT` | -### Security properties - - **Never stored in files.** Tokens are read from the environment at runtime. They are never written to `apm.yaml`, `apm.lock.yaml`, or any generated file. - **Never logged.** Token values are not included in console output, error messages, or debug logs. - **Scoped to their git host.** A GitHub token is only sent to GitHub. An Azure DevOps token is only sent to Azure DevOps. Tokens are never transmitted to any other endpoint. -### Recommended token scope - -For GitHub, a fine-grained PAT with read-only `Contents` permission on the repositories you depend on is sufficient. APM only performs git clone and fetch operations. - -## Supply chain considerations - -### Attack surface comparison +For GitHub, a fine-grained PAT with read-only `Contents` permission on the repositories you depend on is sufficient. -APM's design eliminates several supply chain attack vectors common in traditional package managers: +## Attack surface comparison | Vector | Traditional package manager | APM | |---|---|---| @@ -154,58 +198,21 @@ APM's design eliminates several supply chain attack vectors common in traditiona | Post-install scripts | Arbitrary code runs after install | No code execution | | Typosquatting | Similar package names on registry | Dependencies are full git URLs | | Build-time injection | Malicious build steps execute | No build step — files are copied | - -### Auditing dependency changes - -Because `apm.lock.yaml` is a plain YAML file checked into version control, standard git tooling provides a full audit trail: - -```bash -# View all dependency changes over time -git log --oneline apm.lock.yaml - -# See exactly what changed in a specific commit -git diff HEAD~1 -- apm.lock.yaml - -# Find when a specific dependency was added -git log --all -p -- apm.lock.yaml | grep -A5 "owner/repo" -``` - -### Pinning and updates - -- `apm install` respects the existing lock file. Dependencies are not re-resolved unless explicitly requested. -- `apm update` re-resolves dependencies and updates the lock file. The diff is visible in version control before merging. -- There is no automatic update mechanism. Dependency changes require a deliberate action and a code review. +| Hidden content injection | Not applicable (binary packages) | Pre-deploy scan blocks critical hidden Unicode; `apm audit` for on-demand checks | ## Frequently asked questions -### Does APM execute any code from packages? - -No. APM copies static files (markdown, JSON, YAML) and generates compiled output from templates. It does not execute scripts, evaluate expressions, or run any code from the packages it installs. +### Can a package embed hidden instructions? -### Does APM phone home or collect telemetry? - -No. APM makes no network requests beyond git clone/fetch operations to resolve dependencies. There is no telemetry, analytics, or usage reporting of any kind. - -### Can a malicious package write files outside the project? - -No. All deploy paths are validated against the project root using path traversal checks, prefix allowlists, and resolved path containment. Symlinks are skipped entirely. A package cannot write files outside `.github/`, `.claude/`, `.cursor/`, or `.opencode/` within the project root. - -### Can a transitive dependency inject MCP servers? - -Not by default. Transitive MCP server declarations are blocked unless you explicitly opt in with `--trust-transitive-mcp` or re-declare the dependency as a direct dependency. +Not without detection. APM scans all package source files before deployment. Critical hidden characters (tag characters, bidi overrides) block deployment. `apm audit` provides on-demand scanning for any file, including those obtained outside APM. ### How do I audit what APM installed? -The `apm.lock.yaml` file records every dependency (with exact commit SHA) and every file deployed. It is a plain YAML file suitable for automated policy checks, diff review, and compliance tooling. +The `apm.lock.yaml` file records every dependency (with exact commit SHA) and every file deployed. It is a plain YAML file suitable for automated policy checks, diff review, and compliance tooling. See [Governance & Compliance](../governance/) for audit workflows. ### Is the APM binary signed? -APM is distributed as: - -- A PyPI package (`apm-cli`) built and published through GitHub Actions CI/CD. -- Pre-built binaries attached to GitHub Releases under the `microsoft` GitHub organization. - -Both distribution channels use GitHub Actions workflows with pinned dependencies and are auditable through the public repository. +APM is distributed as a PyPI package (`apm-cli`) and as pre-built binaries attached to GitHub Releases under the `microsoft` organization. Both distribution channels use GitHub Actions workflows with pinned dependencies and are auditable through the public repository. ### Where is the source code? diff --git a/docs/src/content/docs/index.mdx b/docs/src/content/docs/index.mdx index f152cfda8..f7abe885d 100644 --- a/docs/src/content/docs/index.mdx +++ b/docs/src/content/docs/index.mdx @@ -25,8 +25,8 @@ AI coding agents need context and capabilities to be useful — instructions, sk APM fixes this. You declare your project's agent configuration once in `apm.yml` — skills, prompts, instructions, agents, hooks, plugins, MCP servers — and every developer who clones your repo gets a fully configured agent setup in seconds. New developer joins the team? `git clone`, `cd`, `apm install`. Done. - - `apm.yml` declares skills, instructions, prompts, agents, hooks, plugins, and MCP servers. Like `package.json` for agent configuration. + + `apm.yml` declares skills, instructions, prompts, agents, hooks, plugins, and MCP servers — deployed to Copilot, Claude Code, Cursor, and OpenCode from a single source of truth. Packages depend on packages. APM resolves the full tree — transitive dependencies just work, like npm or pip. @@ -34,8 +34,8 @@ APM fixes this. You declare your project's agent configuration once in `apm.yml` Install from GitHub, GitLab, Bitbucket, Azure DevOps, GitHub Enterprise, or any self-hosted git server. - - Works with GitHub Copilot, Claude, Cursor, and OpenCode. The same manifest, the same setup, across agents. + + Skills, prompts, instructions, hooks — everything agents execute is an attack surface. APM scans packages before deployment, blocking threats before they reach your agents. diff --git a/docs/src/content/docs/reference/cli-commands.md b/docs/src/content/docs/reference/cli-commands.md index 61e821932..6292daac5 100644 --- a/docs/src/content/docs/reference/cli-commands.md +++ b/docs/src/content/docs/reference/cli-commands.md @@ -82,7 +82,7 @@ apm install [PACKAGES...] [OPTIONS] - `--exclude TEXT` - Exclude specific runtime from installation - `--only [apm|mcp]` - Install only specific dependency type - `--update` - Update dependencies to latest Git references -- `--force` - Overwrite locally-authored files on collision +- `--force` - Overwrite locally-authored files on collision; bypass security scan blocks - `--dry-run` - Show what would be installed without installing - `--parallel-downloads INTEGER` - Max concurrent package downloads (default: 4, 0 to disable) - `--verbose` - Show individual file paths and full error details in the diagnostic summary @@ -207,6 +207,7 @@ When you run `apm install`, APM automatically integrates primitives from install - **Smart updates**: Only updates when package version/commit changes - **Hooks**: Hook `.json` files → `.github/hooks/*.json` with scripts bundled - **Collision detection**: Skips local files that aren't managed by APM; use `--force` to overwrite +- **Security scanning**: Source files are scanned for hidden Unicode characters before deployment. Critical findings (tag characters, bidi overrides) block deployment; use `--force` to override **Diagnostic Summary:** @@ -329,6 +330,52 @@ apm prune --dry-run - Removes deployed integration files (prompts, agents, hooks, etc.) for pruned packages using the `deployed_files` manifest in `apm.lock.yaml` - Updates `apm.lock.yaml` to reflect the pruned state +### `apm audit` - Scan for hidden Unicode characters + +Scan installed packages or arbitrary files for hidden Unicode characters that could embed invisible instructions in prompt files. + +```bash +apm audit [PACKAGE] [OPTIONS] +``` + +**Arguments:** +- `PACKAGE` - Optional package key to scan (repo URL from lockfile). If omitted, scans all installed packages. + +**Options:** +- `--file PATH` - Scan an arbitrary file instead of installed packages +- `--strip` - Strip non-critical hidden characters (zero-width spaces, unusual whitespace). Critical findings are preserved for manual review. +- `-v, --verbose` - Show info-level findings and file details + +**Examples:** +```bash +# Scan all installed packages +apm audit + +# Scan a specific package +apm audit https://github.com/owner/repo + +# Scan any file (even non-APM-managed) +apm audit --file .cursorrules + +# Auto-strip zero-width characters +apm audit --strip + +# Verbose output with info-level findings +apm audit --verbose +``` + +**Exit codes:** +| Code | Meaning | +|------|---------| +| 0 | Clean — no findings, or info-only | +| 1 | Critical findings — tag characters or bidi overrides detected | +| 2 | Warnings only — zero-width characters or mid-file BOM | + +**What it detects:** +- **Critical**: Unicode tag characters (U+E0001–E007F), bidirectional overrides — these have zero legitimate use in prompt files +- **Warning**: Zero-width spaces/joiners, mid-file BOM — common copy-paste debris +- **Info**: Non-breaking spaces, unusual whitespace — mostly harmless + ### `apm pack` - Create a portable bundle Create a self-contained bundle from installed APM dependencies using the `deployed_files` recorded in `apm.lock.yaml` as the source of truth. @@ -364,6 +411,7 @@ apm pack -o dist/ **Behavior:** - Reads `apm.lock.yaml` to enumerate all `deployed_files` from installed dependencies +- Scans files for hidden Unicode characters before bundling — warns if findings are detected - Copies files preserving directory structure - Writes an enriched `apm.lock.yaml` inside the bundle with a `pack:` metadata section (the project's own `apm.lock.yaml` is never modified) @@ -869,6 +917,9 @@ apm compile --no-constitution - Exits with error code 1 if validation fails - No output file generation in validation-only mode +**Content Scanning:** +Compiled output is scanned for hidden Unicode characters before writing to disk. If findings are detected, a warning is displayed with instructions to run `apm audit --file` for details. This is defense-in-depth — source files are already scanned during `apm install`. + **Configuration Integration:** The compile command supports configuration via `apm.yml`: diff --git a/src/apm_cli/bundle/packer.py b/src/apm_cli/bundle/packer.py index 4b4820918..806d4bdc8 100644 --- a/src/apm_cli/bundle/packer.py +++ b/src/apm_cli/bundle/packer.py @@ -1,5 +1,6 @@ """Bundle packer -- creates self-contained APM bundles from the resolved dependency tree.""" +import os import shutil import tarfile from dataclasses import dataclass, field @@ -155,6 +156,34 @@ def pack_bundle( lockfile_enriched=True, ) + # 5b. Scan files for hidden characters before bundling + from ..security.content_scanner import ContentScanner + from ..utils.console import _rich_warning + + _scan_findings_total = 0 + for rel_path in unique_files: + src = project_root / rel_path + if src.is_symlink(): + continue + if src.is_file(): + findings = ContentScanner.scan_file(src) + if findings: + _scan_findings_total += len(findings) + elif src.is_dir(): + for dirpath, _dirnames, filenames in os.walk(src, followlinks=False): + for fname in filenames: + fpath = Path(dirpath) / fname + if fpath.is_symlink(): + continue + findings = ContentScanner.scan_file(fpath) + if findings: + _scan_findings_total += len(findings) + if _scan_findings_total: + _rich_warning( + f"Bundle contains {_scan_findings_total} hidden character(s) across source files " + f"— run 'apm audit' to inspect before publishing" + ) + # 6. Build output directory bundle_dir = output_dir / f"{pkg_name}-{pkg_version}" bundle_dir.mkdir(parents=True, exist_ok=True) diff --git a/src/apm_cli/cli.py b/src/apm_cli/cli.py index bdc0a4ef3..2a958159d 100644 --- a/src/apm_cli/cli.py +++ b/src/apm_cli/cli.py @@ -15,6 +15,7 @@ _check_and_notify_updates, print_version, ) +from apm_cli.commands.audit import audit from apm_cli.commands.compile import compile as compile_cmd from apm_cli.commands.config import config from apm_cli.commands.deps import deps @@ -52,6 +53,7 @@ def cli(ctx): # Register command groups +cli.add_command(audit) cli.add_command(deps) cli.add_command(pack_cmd, name="pack") cli.add_command(unpack_cmd, name="unpack") diff --git a/src/apm_cli/commands/audit.py b/src/apm_cli/commands/audit.py new file mode 100644 index 000000000..376f56ed8 --- /dev/null +++ b/src/apm_cli/commands/audit.py @@ -0,0 +1,421 @@ +"""APM audit command — content integrity scanning for prompt files. + +Scans installed APM packages (or arbitrary files) for hidden Unicode +characters that could embed invisible instructions. This is the first +pillar of ``apm audit``; lock-file consistency (``--ci``) and drift +detection (``--drift``) are planned as future modes. + +Exit codes: + 0 — clean (no findings, or info-only) + 1 — critical findings (tag characters, bidi overrides) + 2 — warnings (zero-width chars, no critical) +""" + +import sys +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import click + +from ..deps.lockfile import LockFile, get_lockfile_path +from ..integration.base_integrator import BaseIntegrator +from ..security.content_scanner import ContentScanner, ScanFinding +from ..utils.console import ( + _get_console, + _rich_echo, + _rich_error, + _rich_info, + _rich_success, + _rich_warning, + STATUS_SYMBOLS, +) + + +# ── Helpers ──────────────────────────────────────────────────────── + + +def _is_safe_lockfile_path(rel_path: str, project_root: Path) -> bool: + """Return True if a relative path from the lockfile is safe to read. + + Reuses the same logic as ``BaseIntegrator.validate_deploy_path`` + (no ``..``, allowed prefix, resolves within root). + """ + return BaseIntegrator.validate_deploy_path(rel_path, project_root) + + +def _scan_files_in_dir( + dir_path: Path, + base_label: str, +) -> Tuple[Dict[str, List[ScanFinding]], int]: + """Recursively scan all files under a directory. + + Uses ``os.walk(followlinks=False)`` to avoid following symlinked + directories outside the intended package tree. + + Returns (findings_by_file, files_scanned). + """ + import os + + findings: Dict[str, List[ScanFinding]] = {} + count = 0 + for dirpath, _dirs, filenames in os.walk(dir_path, followlinks=False): + for fname in filenames: + f = Path(dirpath) / fname + if f.is_symlink(): + continue + count += 1 + result = ContentScanner.scan_file(f) + if result: + label = f"{base_label}/{f.relative_to(dir_path).as_posix()}" + findings[label] = result + return findings, count + + +def _scan_lockfile_packages( + project_root: Path, + package_filter: Optional[str] = None, +) -> Tuple[Dict[str, List[ScanFinding]], int]: + """Scan deployed files tracked in apm.lock.yaml. + + Returns: + (findings_by_file, files_scanned) — findings grouped by file path + and total number of files scanned. + """ + lockfile_path = get_lockfile_path(project_root) + lock = LockFile.read(lockfile_path) + if lock is None: + return {}, 0 + + all_findings: Dict[str, List[ScanFinding]] = {} + files_scanned = 0 + + for dep_key, dep in lock.dependencies.items(): + # Filter to a specific package if requested + if package_filter and dep_key != package_filter: + continue + + for rel_path in dep.deployed_files: + # Path safety: reject traversal attempts from crafted lockfiles + if not _is_safe_lockfile_path(rel_path.rstrip("/"), project_root): + continue + + abs_path = project_root / rel_path + if not abs_path.exists(): + continue + + # Recurse into directories (e.g. skill folders) + if abs_path.is_dir(): + dir_findings, dir_count = _scan_files_in_dir(abs_path, rel_path.rstrip("/")) + files_scanned += dir_count + all_findings.update(dir_findings) + continue + + files_scanned += 1 + findings = ContentScanner.scan_file(abs_path) + if findings: + all_findings[rel_path] = findings + + return all_findings, files_scanned + + +def _scan_single_file(file_path: Path) -> Tuple[Dict[str, List[ScanFinding]], int]: + """Scan a single arbitrary file. + + Returns (findings_by_file, files_scanned). + """ + if not file_path.exists(): + _rich_error(f"File not found: {file_path}") + sys.exit(1) + if file_path.is_dir(): + _rich_error(f"Path is a directory, not a file: {file_path}") + sys.exit(1) + + findings = ContentScanner.scan_file(file_path) + files_scanned = 1 + if findings: + # Resolve to absolute so --strip can locate the file reliably + return {str(file_path.resolve()): findings}, files_scanned + return {}, files_scanned + + +def _has_actionable_findings( + findings_by_file: Dict[str, List[ScanFinding]], +) -> bool: + """Return True if any finding is critical or warning (not just info).""" + return any( + f.severity in ("critical", "warning") + for ff in findings_by_file.values() + for f in ff + ) + + +def _render_findings_table( + findings_by_file: Dict[str, List[ScanFinding]], + verbose: bool = False, +) -> None: + """Render a Rich table of scan findings.""" + console = _get_console() + + # Flatten into rows, sorted by severity (critical first) + severity_order = {"critical": 0, "warning": 1, "info": 2} + rows: List[ScanFinding] = [] + for findings in findings_by_file.values(): + rows.extend(findings) + rows.sort(key=lambda f: (severity_order.get(f.severity, 3), f.file, f.line)) + + # Filter out info-level in non-verbose mode + if not verbose: + rows = [r for r in rows if r.severity != "info"] + + if not rows: + return + + if console: + try: + from rich.table import Table + + table = Table( + title=f"{STATUS_SYMBOLS['search']} Content Scan Findings", + show_header=True, + header_style="bold cyan", + ) + table.add_column("Severity", style="bold", width=10) + table.add_column("File", style="white") + table.add_column("Location", style="dim", width=10) + table.add_column("Codepoint", style="bold white", width=10) + table.add_column("Description", style="white") + + sev_styles = { + "critical": "bold red", + "warning": "yellow", + "info": "dim", + } + for f in rows: + table.add_row( + f.severity.upper(), + f.file, + f"{f.line}:{f.column}", + f.codepoint, + f.description, + style=sev_styles.get(f.severity, "white"), + ) + console.print() + console.print(table) + return + except (ImportError, Exception): + pass + + # Fallback: plain text + _rich_echo("") + _rich_echo( + f"{STATUS_SYMBOLS['search']} Content Scan Findings", + color="cyan", + bold=True, + ) + for f in rows: + sev_label = f.severity.upper() + color = "red" if f.severity == "critical" else ( + "yellow" if f.severity == "warning" else "dim" + ) + _rich_echo( + f" {sev_label:<10} {f.file} {f.line}:{f.column} " + f"{f.codepoint} {f.description}", + color=color, + ) + + +def _render_summary( + findings_by_file: Dict[str, List[ScanFinding]], + files_scanned: int, +) -> None: + """Render a summary panel with counts.""" + all_findings: List[ScanFinding] = [] + for findings in findings_by_file.values(): + all_findings.extend(findings) + + counts = ContentScanner.summarize(all_findings) + critical = counts.get("critical", 0) + warning = counts.get("warning", 0) + info = counts.get("info", 0) + affected = len(findings_by_file) + + _rich_echo("") + if critical > 0: + _rich_echo( + f"{STATUS_SYMBOLS['error']} {critical} critical finding(s) in " + f"{affected} file(s) — hidden characters detected", + color="red", + bold=True, + ) + _rich_info(" Critical findings require manual review") + _rich_info(" These characters may embed invisible instructions") + elif warning > 0: + _rich_warning( + f"{STATUS_SYMBOLS['warning']} {warning} warning(s) in " + f"{affected} file(s) — zero-width or hidden characters" + ) + _rich_info(" Run 'apm audit --strip' to remove non-critical characters") + elif info > 0: + _rich_info( + f"{STATUS_SYMBOLS['info']} {info} info-level finding(s) in " + f"{affected} file(s) — unusual whitespace (use --verbose to see)" + ) + else: + _rich_success( + f"{STATUS_SYMBOLS['success']} {files_scanned} file(s) scanned — " + f"no issues found" + ) + + if info > 0 and (critical > 0 or warning > 0): + _rich_info(f" Plus {info} info-level finding(s) (use --verbose to see)") + + _rich_echo(f" {files_scanned} file(s) scanned", color="dim") + + +def _apply_strip( + findings_by_file: Dict[str, List[ScanFinding]], + project_root: Path, +) -> int: + """Strip non-critical characters from affected files. + + Only modifies files that resolve within *project_root* (for lockfile + paths) or that are given as absolute paths (for ``--file`` mode). + Returns number of files modified. + """ + modified = 0 + for rel_path, findings in findings_by_file.items(): + # Skip files with only critical findings (require manual review) + if all(f.severity == "critical" for f in findings): + continue + + abs_path = Path(rel_path) + if not abs_path.is_absolute(): + # Relative path from lockfile: validate within project_root + abs_path = project_root / rel_path + try: + abs_path.resolve().relative_to(project_root.resolve()) + except ValueError: + _rich_warning(f" Skipping {rel_path}: outside project root") + continue + + if not abs_path.exists(): + continue + + try: + original = abs_path.read_text(encoding="utf-8") + cleaned = ContentScanner.strip_non_critical(original) + if cleaned != original: + abs_path.write_text(cleaned, encoding="utf-8") + modified += 1 + _rich_info(f" {STATUS_SYMBOLS['check']} Cleaned: {rel_path}") + except (OSError, UnicodeDecodeError) as exc: + _rich_warning(f" Could not clean {rel_path}: {exc}") + + return modified + + +# ── Command ──────────────────────────────────────────────────────── + + +@click.command(help="Scan installed packages for hidden Unicode characters") +@click.argument("package", required=False) +@click.option( + "--file", + "file_path", + type=click.Path(exists=False), + help="Scan an arbitrary file (not just APM-managed files)", +) +@click.option( + "--strip", + is_flag=True, + help="Strip non-critical hidden characters (zero-width spaces, unusual whitespace)", +) +@click.option( + "--verbose", + "-v", + is_flag=True, + help="Show info-level findings and file details", +) +@click.pass_context +def audit(ctx, package, file_path, strip, verbose): + """Scan deployed prompt files for hidden Unicode characters. + + Detects invisible characters that could embed hidden instructions in + prompt, instruction, and rules files. Critical findings (tag characters, + bidi overrides) require manual review. Warnings (zero-width chars) can + be removed with --strip. + + \b + Examples: + apm audit # Scan all installed packages + apm audit my-package # Scan a specific package + apm audit --file .cursorrules # Scan any file + apm audit --strip # Remove non-critical chars + """ + project_root = Path.cwd() + + if file_path: + # -- File mode: scan a single arbitrary file -- + findings_by_file, files_scanned = _scan_single_file(Path(file_path)) + else: + # -- Package mode: scan from lockfile -- + lockfile_path = get_lockfile_path(project_root) + if not lockfile_path.exists(): + _rich_info( + "No apm.lock.yaml found — nothing to scan. " + "Use --file to scan a specific file." + ) + sys.exit(0) + + if package: + _rich_info(f"Scanning package: {package}") + else: + _rich_info("Scanning all installed packages...") + + findings_by_file, files_scanned = _scan_lockfile_packages( + project_root, package_filter=package, + ) + + if files_scanned == 0: + if package: + _rich_warning( + f"Package '{package}' not found in apm.lock.yaml " + f"or has no deployed files" + ) + else: + _rich_info("No deployed files found in apm.lock.yaml") + sys.exit(0) + + # -- Strip mode -- + if strip and findings_by_file: + has_critical = any( + ContentScanner.has_critical(f) for f in findings_by_file.values() + ) + modified = _apply_strip(findings_by_file, project_root) + if modified > 0: + _rich_success( + f"{STATUS_SYMBOLS['success']} Cleaned {modified} file(s)" + ) + if has_critical: + _rich_warning( + "Critical findings were preserved — they require manual review" + ) + _rich_info(" Inspect flagged files and remove tag/bidi characters") + sys.exit(1) + sys.exit(0) + + # -- Display findings -- + if findings_by_file: + _render_findings_table(findings_by_file, verbose=verbose) + + _render_summary(findings_by_file, files_scanned) + + # -- Exit code -- + # Info-only findings are informational → exit 0 + if not findings_by_file or not _has_actionable_findings(findings_by_file): + sys.exit(0) + + all_findings = [f for ff in findings_by_file.values() for f in ff] + if ContentScanner.has_critical(all_findings): + sys.exit(1) + sys.exit(2) diff --git a/src/apm_cli/commands/compile.py b/src/apm_cli/commands/compile.py index 2325016d7..f3933c8b9 100644 --- a/src/apm_cli/commands/compile.py +++ b/src/apm_cli/commands/compile.py @@ -7,6 +7,7 @@ from ..compilation import AgentsCompiler, CompilationConfig from ..primitives.discovery import discover_primitives +from ..security.content_scanner import ContentScanner from ..utils.console import ( STATUS_SYMBOLS, _rich_echo, @@ -551,6 +552,18 @@ def compile( if not dry_run: # Only rewrite when content materially changes (creation, update, missing constitution case) if c_status in ("CREATED", "UPDATED", "MISSING"): + # Defense-in-depth: scan compiled output before writing + findings = ContentScanner.scan_text( + final_content, filename=str(output_path) + ) + if findings: + _, summary = ContentScanner.classify(findings) + actionable = summary.get("critical", 0) + summary.get("warning", 0) + if actionable: + _rich_warning( + f"Compiled output contains {actionable} hidden character(s) " + f"— run 'apm audit --file {output_path}' to inspect" + ) try: _atomic_write(output_path, final_content) except OSError as e: diff --git a/src/apm_cli/commands/install.py b/src/apm_cli/commands/install.py index ddd09fc96..e05ac57ff 100644 --- a/src/apm_cli/commands/install.py +++ b/src/apm_cli/commands/install.py @@ -508,6 +508,91 @@ def install(ctx, packages, runtime, exclude, only, update, dry_run, force, verbo # --------------------------------------------------------------------------- +def _pre_deploy_security_scan( + install_path: Path, + diagnostics: DiagnosticCollector, + package_name: str = "", + force: bool = False, +) -> bool: + """Scan package source files for hidden characters BEFORE deployment. + + Prompt files are read by IDE agents the moment they land on disk — + unlike JavaScript, there is no gap between "install" and "execute". + This function acts as a gate: critical findings block deployment + entirely (unless ``--force``), while warnings are recorded but + allow deployment to proceed. + + Returns: + True if deployment should proceed, False to block. + """ + from ..security.content_scanner import ContentScanner + + all_findings: list = [] + found_critical = False + # os.walk with followlinks=False prevents traversal into symlinked dirs + import os + for dirpath, _dirnames, filenames in os.walk(install_path, followlinks=False): + for fname in filenames: + f = Path(dirpath) / fname + if f.is_symlink(): + continue + findings = ContentScanner.scan_file(f) + if findings: + all_findings.extend(findings) + if not force and not found_critical: + found_critical = ContentScanner.has_critical(findings) + if found_critical: + break + if found_critical and not force: + break + + if not all_findings: + return True + + has_critical, summary = ContentScanner.classify(all_findings) + crit_count = summary.get("critical", 0) + warn_count = summary.get("warning", 0) + + if has_critical and not force: + diagnostics.security( + message=( + "Blocked — critical hidden characters in source. " + "Use --force to override." + ), + package=package_name, + detail=f"at least {crit_count} critical, {warn_count} warning(s)", + severity="critical", + ) + _rich_error( + f" Blocked: {package_name or 'package'} contains " + f"critical hidden character(s)" + ) + _rich_info(f" └─ Inspect source: {install_path}") + _rich_info(" └─ Use --force to deploy anyway") + return False + + if has_critical: + # --force: deploy but warn loudly + diagnostics.security( + message="Deployed with --force despite critical hidden characters", + package=package_name, + detail=f"{crit_count} critical finding(s)", + severity="critical", + ) + elif warn_count > 0: + diagnostics.security( + message="Hidden characters in source files", + package=package_name, + detail=( + f"{warn_count} warning(s) — " + "run 'apm audit --strip' after install" + ), + severity="warning", + ) + + return True + + def _integrate_package_primitives( package_info, project_root, @@ -524,6 +609,7 @@ def _integrate_package_primitives( force, managed_files, diagnostics, + package_name="", ): """Run the full integration pipeline for a single package. @@ -1214,6 +1300,14 @@ def _collect_descendants(node, visited=None): # Run shared integration pipeline try: + # Pre-deploy security gate + if not _pre_deploy_security_scan( + install_path, diagnostics, + package_name=dep_key, force=force, + ): + package_deployed_files[dep_key] = [] + continue + int_result = _integrate_package_primitives( package_info, project_root, integrate_vscode=integrate_vscode, @@ -1228,6 +1322,7 @@ def _collect_descendants(node, visited=None): force=force, managed_files=managed_files, diagnostics=diagnostics, + package_name=dep_key, ) total_prompts_integrated += int_result["prompts"] total_agents_integrated += int_result["agents"] @@ -1382,6 +1477,14 @@ def _collect_descendants(node, visited=None): if hasattr(cached_package_info, 'package_type') and cached_package_info.package_type: package_types[dep_key] = cached_package_info.package_type.value + # Pre-deploy security gate + if not _pre_deploy_security_scan( + install_path, diagnostics, + package_name=dep_key, force=force, + ): + package_deployed_files[dep_key] = [] + continue + # VSCode + Claude + OpenCode integration (prompts + agents) if integrate_vscode or integrate_claude or integrate_opencode: # Integrate prompts @@ -1725,6 +1828,14 @@ def _collect_descendants(node, visited=None): _rich_info(f" └─ Package type: APM Package (apm.yml)") # Auto-integrate prompts and agents if enabled + # Pre-deploy security gate + if not _pre_deploy_security_scan( + package_info.install_path, diagnostics, + package_name=dep_ref.get_unique_key(), force=force, + ): + package_deployed_files[dep_ref.get_unique_key()] = [] + continue + if integrate_vscode or integrate_claude or integrate_opencode: try: # Integrate prompts + agents (dual-target: .github/ + .claude/) diff --git a/src/apm_cli/compilation/agents_compiler.py b/src/apm_cli/compilation/agents_compiler.py index 97cb3bb9d..b7d7fba1b 100644 --- a/src/apm_cli/compilation/agents_compiler.py +++ b/src/apm_cli/compilation/agents_compiler.py @@ -448,6 +448,7 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol # Write CLAUDE.md files files_written = 0 + from ..security.content_scanner import ContentScanner for claude_path, content in claude_result.content_map.items(): try: # Create directory if needed @@ -467,6 +468,17 @@ def _compile_claude_md(self, config: CompilationConfig, primitives: PrimitiveCol except Exception: pass # Use original content if injection fails + # Defense-in-depth: scan compiled output before writing + findings = ContentScanner.scan_text( + final_content, filename=str(claude_path) + ) + actionable = [f for f in findings if f.severity != "info"] + if actionable: + all_warnings.append( + f"CLAUDE.md contains {len(actionable)} hidden character(s) " + f"— run 'apm audit --file {claude_path}' to inspect" + ) + claude_path.write_text(final_content, encoding='utf-8') files_written += 1 except OSError as e: diff --git a/src/apm_cli/compilation/claude_formatter.py b/src/apm_cli/compilation/claude_formatter.py index 15510bed6..0622d5a3a 100644 --- a/src/apm_cli/compilation/claude_formatter.py +++ b/src/apm_cli/compilation/claude_formatter.py @@ -385,9 +385,20 @@ def generate_commands( files_written = 0 if not dry_run and generated_commands: try: + from ..security.content_scanner import ContentScanner commands_dir.mkdir(parents=True, exist_ok=True) for command_path, content in generated_commands.items(): + # Defense-in-depth: scan compiled command before writing + findings = ContentScanner.scan_text( + content, filename=str(command_path) + ) + actionable = [f for f in findings if f.severity != "info"] + if actionable: + warnings.append( + f"{command_path.name}: {len(actionable)} hidden character(s) " + f"— run 'apm audit --file {command_path}' to inspect" + ) command_path.write_text(content, encoding='utf-8') files_written += 1 diff --git a/src/apm_cli/security/__init__.py b/src/apm_cli/security/__init__.py new file mode 100644 index 000000000..8955c1502 --- /dev/null +++ b/src/apm_cli/security/__init__.py @@ -0,0 +1,5 @@ +"""Security utilities for APM content scanning.""" + +from apm_cli.security.content_scanner import ContentScanner, ScanFinding + +__all__ = ["ContentScanner", "ScanFinding"] diff --git a/src/apm_cli/security/content_scanner.py b/src/apm_cli/security/content_scanner.py new file mode 100644 index 000000000..5ccb7b159 --- /dev/null +++ b/src/apm_cli/security/content_scanner.py @@ -0,0 +1,224 @@ +"""Content scanner for detecting hidden Unicode characters in text files. + +Scans for invisible Unicode characters that could embed hidden instructions +in prompt, instruction, and rules files. These characters are invisible to +humans but LLMs tokenize them individually, meaning models can process +instructions that humans cannot see on screen. + +This module is intentionally dependency-free (no APM internals) so it can +be tested and used independently. +""" + +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +@dataclass(frozen=True) +class ScanFinding: + """A single suspicious character found during scanning.""" + + file: str + line: int + column: int + char: str + codepoint: str # hex, e.g. "U+200B" + severity: str # "critical", "warning", "info" + category: str # e.g. "tag-character", "bidi-override", "zero-width" + description: str + + +# Each entry: (range_start, range_end, severity, category, description) +# range_end is inclusive. +_SUSPICIOUS_RANGES: List[Tuple[int, int, str, str, str]] = [ + # ── Critical: no legitimate use in prompt/instruction files ── + # Unicode tag characters — invisible ASCII mapping + (0xE0001, 0xE007F, "critical", "tag-character", + "Unicode tag character (invisible ASCII mapping)"), + # Bidirectional override characters + (0x202A, 0x202A, "critical", "bidi-override", + "Left-to-right embedding (LRE)"), + (0x202B, 0x202B, "critical", "bidi-override", + "Right-to-left embedding (RLE)"), + (0x202C, 0x202C, "critical", "bidi-override", + "Pop directional formatting (PDF)"), + (0x202D, 0x202D, "critical", "bidi-override", + "Left-to-right override (LRO)"), + (0x202E, 0x202E, "critical", "bidi-override", + "Right-to-left override (RLO)"), + (0x2066, 0x2066, "critical", "bidi-override", + "Left-to-right isolate (LRI)"), + (0x2067, 0x2067, "critical", "bidi-override", + "Right-to-left isolate (RLI)"), + (0x2068, 0x2068, "critical", "bidi-override", + "First strong isolate (FSI)"), + (0x2069, 0x2069, "critical", "bidi-override", + "Pop directional isolate (PDI)"), + # ── Warning: common copy-paste debris but can hide instructions ── + (0x200B, 0x200B, "warning", "zero-width", + "Zero-width space"), + (0x200C, 0x200C, "warning", "zero-width", + "Zero-width non-joiner (ZWNJ)"), + (0x200D, 0x200D, "warning", "zero-width", + "Zero-width joiner (ZWJ)"), + (0x2060, 0x2060, "warning", "zero-width", + "Word joiner"), + (0x00AD, 0x00AD, "warning", "invisible-formatting", + "Soft hyphen"), + # FEFF as mid-file BOM is handled separately in scan logic + # ── Info: unusual whitespace, mostly harmless ── + (0x00A0, 0x00A0, "info", "unusual-whitespace", + "Non-breaking space"), + (0x2000, 0x200A, "info", "unusual-whitespace", + "Unicode whitespace character"), + (0x205F, 0x205F, "info", "unusual-whitespace", + "Medium mathematical space"), + (0x3000, 0x3000, "info", "unusual-whitespace", + "Ideographic space"), + (0x180E, 0x180E, "info", "unusual-whitespace", + "Mongolian vowel separator"), +] + +# Pre-build a lookup for O(1) per-character classification. +# Maps codepoint → (severity, category, description) +_CHAR_LOOKUP: Dict[int, Tuple[str, str, str]] = {} +for _start, _end, _sev, _cat, _desc in _SUSPICIOUS_RANGES: + for _cp in range(_start, _end + 1): + _CHAR_LOOKUP[_cp] = (_sev, _cat, _desc) + + +class ContentScanner: + """Scans text content for hidden or suspicious Unicode characters.""" + + @staticmethod + def scan_text(content: str, filename: str = "") -> List[ScanFinding]: + """Scan a string for suspicious Unicode characters. + + Returns a list of findings, one per suspicious character, with + line/column positions (1-based). + """ + if not content: + return [] + + # Fast path: pure-ASCII content cannot contain any suspicious + # codepoints. str.isascii() runs at C speed (<1 µs for typical + # prompt files) and lets us skip the Python-level character loop + # for the ~90 %+ of files that are plain ASCII. + if content.isascii(): + return [] + + findings: List[ScanFinding] = [] + lines = content.split("\n") + + for line_idx, line_text in enumerate(lines): + for col_idx, ch in enumerate(line_text): + cp = ord(ch) + + # Special case: BOM (U+FEFF) at the very start of the + # file is standard practice; mid-file is suspicious. + if cp == 0xFEFF: + if line_idx == 0 and col_idx == 0: + findings.append(ScanFinding( + file=filename, + line=1, + column=1, + char=repr(ch), + codepoint="U+FEFF", + severity="info", + category="bom", + description="Byte order mark at start of file", + )) + else: + findings.append(ScanFinding( + file=filename, + line=line_idx + 1, + column=col_idx + 1, + char=repr(ch), + codepoint="U+FEFF", + severity="warning", + category="zero-width", + description="Byte order mark in middle of file " + "(possible hidden content)", + )) + continue + + entry = _CHAR_LOOKUP.get(cp) + if entry is not None: + sev, cat, desc = entry + findings.append(ScanFinding( + file=filename, + line=line_idx + 1, + column=col_idx + 1, + char=repr(ch), + codepoint=f"U+{cp:04X}", + severity=sev, + category=cat, + description=desc, + )) + + return findings + + @staticmethod + def scan_file(path: Path) -> List[ScanFinding]: + """Read a file and scan its content. + + Handles encoding errors gracefully — returns an empty list if the + file cannot be decoded as UTF-8 (binary files, etc.). + """ + try: + content = path.read_text(encoding="utf-8") + except (UnicodeDecodeError, OSError): + return [] + return ContentScanner.scan_text(content, filename=str(path)) + + @staticmethod + def has_critical(findings: List[ScanFinding]) -> bool: + """Return True if any finding has critical severity.""" + return any(f.severity == "critical" for f in findings) + + @staticmethod + def summarize(findings: List[ScanFinding]) -> Dict[str, int]: + """Return counts by severity level.""" + counts: Dict[str, int] = {"critical": 0, "warning": 0, "info": 0} + for f in findings: + counts[f.severity] = counts.get(f.severity, 0) + 1 + return counts + + @staticmethod + def classify( + findings: List[ScanFinding], + ) -> Tuple[bool, Dict[str, int]]: + """Combined has_critical + summarize in a single pass. + + Returns (has_critical, severity_counts). + """ + critical = False + counts: Dict[str, int] = {"critical": 0, "warning": 0, "info": 0} + for f in findings: + counts[f.severity] = counts.get(f.severity, 0) + 1 + if f.severity == "critical": + critical = True + return critical, counts + + @staticmethod + def strip_non_critical(content: str) -> str: + """Remove warning and info-level characters from content. + + Critical characters (tag chars, bidi overrides) are preserved — + they require manual review. + """ + result: List[str] = [] + for ch in content: + cp = ord(ch) + # Strip leading BOM (info-level) + if cp == 0xFEFF and not result: + continue # strip leading BOM too (it's info-level) + entry = _CHAR_LOOKUP.get(cp) + if entry is not None: + sev = entry[0] + if sev in ("warning", "info"): + continue # strip it + elif cp == 0xFEFF: + continue # mid-file BOM is warning-level + result.append(ch) + return "".join(result) diff --git a/src/apm_cli/utils/console.py b/src/apm_cli/utils/console.py index ea06b4ee9..5b2db0f92 100644 --- a/src/apm_cli/utils/console.py +++ b/src/apm_cli/utils/console.py @@ -81,7 +81,7 @@ def _rich_echo(message: str, color: str = "white", style: str = None, bold: bool style_str = color if bold: style_str = f"bold {color}" - console.print(message, style=style_str) + console.print(message, style=style_str, highlight=False, markup=False) return except Exception: pass diff --git a/src/apm_cli/utils/diagnostics.py b/src/apm_cli/utils/diagnostics.py index 96ec94411..e217a32d4 100644 --- a/src/apm_cli/utils/diagnostics.py +++ b/src/apm_cli/utils/diagnostics.py @@ -23,8 +23,15 @@ CATEGORY_OVERWRITE = "overwrite" CATEGORY_WARNING = "warning" CATEGORY_ERROR = "error" +CATEGORY_SECURITY = "security" -_CATEGORY_ORDER = [CATEGORY_COLLISION, CATEGORY_OVERWRITE, CATEGORY_WARNING, CATEGORY_ERROR] +_CATEGORY_ORDER = [ + CATEGORY_SECURITY, + CATEGORY_COLLISION, + CATEGORY_OVERWRITE, + CATEGORY_WARNING, + CATEGORY_ERROR, +] @dataclass(frozen=True) @@ -35,6 +42,7 @@ class Diagnostic: category: str package: str = "" detail: str = "" + severity: str = "" # e.g. "critical", "warning", "info" — used by security category class DiagnosticCollector: @@ -101,6 +109,25 @@ def error(self, message: str, package: str = "", detail: str = "") -> None: ) ) + def security( + self, + message: str, + package: str = "", + detail: str = "", + severity: str = "warning", + ) -> None: + """Record a security finding (hidden characters, etc.).""" + with self._lock: + self._diagnostics.append( + Diagnostic( + message=message, + category=CATEGORY_SECURITY, + package=package, + detail=detail, + severity=severity, + ) + ) + # ------------------------------------------------------------------ # Query helpers # ------------------------------------------------------------------ @@ -114,6 +141,19 @@ def has_diagnostics(self) -> bool: def error_count(self) -> int: return sum(1 for d in self._diagnostics if d.category == CATEGORY_ERROR) + @property + def security_count(self) -> int: + """Return number of security findings.""" + return sum(1 for d in self._diagnostics if d.category == CATEGORY_SECURITY) + + @property + def has_critical_security(self) -> bool: + """Return True if any critical-severity security finding exists.""" + return any( + d.category == CATEGORY_SECURITY and d.severity == "critical" + for d in self._diagnostics + ) + def by_category(self) -> Dict[str, List[Diagnostic]]: """Return diagnostics grouped by category, preserving insertion order.""" groups: Dict[str, List[Diagnostic]] = {} @@ -154,7 +194,9 @@ def render_summary(self) -> None: if not items: continue - if cat == CATEGORY_COLLISION: + if cat == CATEGORY_SECURITY: + self._render_security_group(items) + elif cat == CATEGORY_COLLISION: self._render_collision_group(items) elif cat == CATEGORY_OVERWRITE: self._render_overwrite_group(items) @@ -173,6 +215,46 @@ def render_summary(self) -> None: # -- Per-category renderers ------------------------------------ + def _render_security_group(self, items: List[Diagnostic]) -> None: + critical = [d for d in items if d.severity == "critical"] + warnings = [d for d in items if d.severity == "warning"] + info = [d for d in items if d.severity == "info"] + + if critical: + _rich_echo( + f" [!] {len(critical)} critical security finding(s) — " + f"hidden characters detected", + color="red", + bold=True, + ) + _rich_info(" Run 'apm audit' for full details") + if self.verbose: + by_pkg = _group_by_package(critical) + for pkg, diags in by_pkg.items(): + if pkg: + _rich_echo(f" [{pkg}]", color="dim") + for d in diags: + _rich_echo(f" └─ {d.message}", color="red") + + if warnings: + _rich_warning( + f" [!] {len(warnings)} file(s) contain zero-width or hidden characters" + ) + if not self.verbose: + _rich_info(" Run with --verbose to see details") + else: + by_pkg = _group_by_package(warnings) + for pkg, diags in by_pkg.items(): + if pkg: + _rich_echo(f" [{pkg}]", color="dim") + for d in diags: + _rich_echo(f" └─ {d.message}", color="dim") + + if info and self.verbose: + _rich_info( + f" [i] {len(info)} file(s) contain unusual whitespace characters" + ) + def _render_collision_group(self, items: List[Diagnostic]) -> None: count = len(items) noun = "file" if count == 1 else "files" diff --git a/tests/unit/test_audit_command.py b/tests/unit/test_audit_command.py new file mode 100644 index 000000000..8c92c90b6 --- /dev/null +++ b/tests/unit/test_audit_command.py @@ -0,0 +1,364 @@ +"""Tests for the ``apm audit`` command.""" + +import textwrap +from pathlib import Path + +import pytest +from click.testing import CliRunner + +from apm_cli.commands.audit import audit, _scan_single_file, _apply_strip +from apm_cli.security.content_scanner import ContentScanner + + +# ── Fixtures ──────────────────────────────────────────────────────── + + +@pytest.fixture +def runner(): + return CliRunner() + + +@pytest.fixture +def clean_file(tmp_path): + """A file with no suspicious characters.""" + p = tmp_path / "clean.md" + p.write_text("# Clean file\nNo hidden characters here.\n", encoding="utf-8") + return p + + +@pytest.fixture +def warning_file(tmp_path): + """A file containing zero-width spaces (warning-level).""" + p = tmp_path / "warning.md" + p.write_text( + "Hello\u200Bworld\nSecond\u200Dline\n", + encoding="utf-8", + ) + return p + + +@pytest.fixture +def critical_file(tmp_path): + """A file containing tag characters (critical-level).""" + p = tmp_path / "critical.md" + # U+E0001 = LANGUAGE TAG, U+E0068 = TAG LATIN SMALL LETTER H + p.write_text( + "Normal text\U000E0001\U000E0068\U000E0065\U000E006Cmore text\n", + encoding="utf-8", + ) + return p + + +@pytest.fixture +def mixed_file(tmp_path): + """A file with both critical and warning characters.""" + p = tmp_path / "mixed.md" + p.write_text( + "line one\u200B\nline two\U000E0041\n", + encoding="utf-8", + ) + return p + + +@pytest.fixture +def info_only_file(tmp_path): + """A file with only info-level findings (NBSP).""" + p = tmp_path / "info.md" + p.write_text("Hello\u00A0world\n", encoding="utf-8") + return p + + +@pytest.fixture +def lockfile_project(tmp_path): + """A project with apm.lock.yaml and deployed files.""" + lock_content = textwrap.dedent("""\ + lockfile_version: '1' + generated_at: '2025-01-01T00:00:00Z' + dependencies: + - repo_url: https://github.com/test/test-pkg + resolved_ref: main + resolved_commit: abc123 + deployed_files: + - .github/prompts/test.md + - .github/instructions/guide.md + """) + (tmp_path / "apm.lock.yaml").write_text(lock_content, encoding="utf-8") + + (tmp_path / ".github" / "prompts").mkdir(parents=True) + (tmp_path / ".github" / "instructions").mkdir(parents=True) + + (tmp_path / ".github" / "prompts" / "test.md").write_text( + "Clean prompt content\n", encoding="utf-8" + ) + (tmp_path / ".github" / "instructions" / "guide.md").write_text( + "Guide with hidden\u200Bchar\n", encoding="utf-8" + ) + + return tmp_path + + +@pytest.fixture +def lockfile_project_critical(tmp_path): + """A project where deployed files contain critical characters.""" + lock_content = textwrap.dedent("""\ + lockfile_version: '1' + generated_at: '2025-01-01T00:00:00Z' + dependencies: + - repo_url: https://github.com/test/bad-pkg + resolved_ref: main + resolved_commit: def456 + deployed_files: + - .github/prompts/evil.md + """) + (tmp_path / "apm.lock.yaml").write_text(lock_content, encoding="utf-8") + + (tmp_path / ".github" / "prompts").mkdir(parents=True) + (tmp_path / ".github" / "prompts" / "evil.md").write_text( + "Looks normal\U000E0001hidden\n", encoding="utf-8" + ) + + return tmp_path + + +@pytest.fixture +def lockfile_project_with_dir(tmp_path): + """A project with a skill directory entry in deployed_files.""" + lock_content = textwrap.dedent("""\ + lockfile_version: '1' + generated_at: '2025-01-01T00:00:00Z' + dependencies: + - repo_url: https://github.com/test/skill-pkg + resolved_ref: main + resolved_commit: abc123 + deployed_files: + - .github/skills/my-skill/ + """) + (tmp_path / "apm.lock.yaml").write_text(lock_content, encoding="utf-8") + + skill_dir = tmp_path / ".github" / "skills" / "my-skill" + skill_dir.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "skill with hidden\u200Bchar\n", encoding="utf-8" + ) + (skill_dir / "helper.md").write_text("clean helper\n", encoding="utf-8") + + return tmp_path + + +# ── --file mode tests ──────────────────────────────────────────── + + +class TestFileMode: + """Tests for ``apm audit --file ``.""" + + def test_clean_file_exit_zero(self, runner, clean_file): + result = runner.invoke(audit, ["--file", str(clean_file)]) + assert result.exit_code == 0 + assert "no issues found" in result.output.lower() + + def test_warning_file_exit_two(self, runner, warning_file): + result = runner.invoke(audit, ["--file", str(warning_file)]) + assert result.exit_code == 2 + + def test_critical_file_exit_one(self, runner, critical_file): + result = runner.invoke(audit, ["--file", str(critical_file)]) + assert result.exit_code == 1 + + def test_mixed_file_exit_one(self, runner, mixed_file): + """Critical findings take precedence over warnings.""" + result = runner.invoke(audit, ["--file", str(mixed_file)]) + assert result.exit_code == 1 + + def test_nonexistent_file_errors(self, runner, tmp_path): + result = runner.invoke(audit, ["--file", str(tmp_path / "nope.txt")]) + assert result.exit_code == 1 + assert "not found" in result.output.lower() + + def test_directory_errors(self, runner, tmp_path): + result = runner.invoke(audit, ["--file", str(tmp_path)]) + assert result.exit_code == 1 + assert "directory" in result.output.lower() + + def test_verbose_shows_info(self, runner, tmp_path): + """--verbose includes info-level findings.""" + p = tmp_path / "info.md" + p.write_text("Hello\u00A0world\n", encoding="utf-8") # NBSP = info + result = runner.invoke(audit, ["--file", str(p), "--verbose"]) + # Info findings should appear in verbose output + assert "U+00A0" in result.output + + def test_info_only_exit_zero(self, runner, info_only_file): + """Info-only findings are informational — exit 0.""" + result = runner.invoke(audit, ["--file", str(info_only_file)]) + assert result.exit_code == 0 + + +# ── Lockfile mode tests ────────────────────────────────────────── + + +class TestLockfileMode: + """Tests for ``apm audit`` scanning from apm.lock.yaml.""" + + def test_no_lockfile_exit_zero(self, runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + result = runner.invoke(audit, []) + assert result.exit_code == 0 + assert "nothing to scan" in result.output.lower() + + def test_clean_lockfile_exit_zero(self, runner, lockfile_project, monkeypatch): + # Make both files clean + (lockfile_project / ".github" / "instructions" / "guide.md").write_text( + "Clean guide\n", encoding="utf-8" + ) + monkeypatch.chdir(lockfile_project) + result = runner.invoke(audit, []) + assert result.exit_code == 0 + + def test_warning_findings_exit_two(self, runner, lockfile_project, monkeypatch): + monkeypatch.chdir(lockfile_project) + result = runner.invoke(audit, []) + assert result.exit_code == 2 + + def test_critical_findings_exit_one( + self, runner, lockfile_project_critical, monkeypatch, + ): + monkeypatch.chdir(lockfile_project_critical) + result = runner.invoke(audit, []) + assert result.exit_code == 1 + + def test_package_filter(self, runner, lockfile_project, monkeypatch): + monkeypatch.chdir(lockfile_project) + # Filter by repo URL (the lockfile key) + result = runner.invoke(audit, ["https://github.com/test/test-pkg"]) + # Should still find the warning in guide.md + assert result.exit_code == 2 + + def test_package_filter_not_found(self, runner, lockfile_project, monkeypatch): + monkeypatch.chdir(lockfile_project) + result = runner.invoke(audit, ["nonexistent-pkg"]) + assert result.exit_code == 0 + assert "not found" in result.output.lower() + + def test_dir_entries_scanned_recursively( + self, runner, lockfile_project_with_dir, monkeypatch, + ): + """Skill directories recorded in deployed_files should be scanned.""" + monkeypatch.chdir(lockfile_project_with_dir) + result = runner.invoke(audit, []) + # SKILL.md has a zero-width char → warning → exit 2 + assert result.exit_code == 2 + + def test_path_traversal_rejected(self, runner, tmp_path, monkeypatch): + """Paths with .. should be rejected to prevent lockfile attacks.""" + lock_content = textwrap.dedent("""\ + lockfile_version: '1' + generated_at: '2025-01-01T00:00:00Z' + dependencies: + - repo_url: https://github.com/evil/pkg + resolved_ref: main + resolved_commit: abc123 + deployed_files: + - ../../etc/passwd + """) + (tmp_path / "apm.lock.yaml").write_text(lock_content, encoding="utf-8") + monkeypatch.chdir(tmp_path) + result = runner.invoke(audit, []) + # Should not crash, and should report 0 files scanned + assert result.exit_code == 0 + + +# ── --strip mode tests ─────────────────────────────────────────── + + +class TestStripMode: + """Tests for ``apm audit --strip``.""" + + def test_strip_removes_warnings(self, runner, warning_file): + result = runner.invoke(audit, ["--file", str(warning_file), "--strip"]) + assert result.exit_code == 0 + # File should now be clean + content = warning_file.read_text(encoding="utf-8") + assert "\u200B" not in content + assert "\u200D" not in content + + def test_strip_preserves_critical(self, runner, critical_file): + result = runner.invoke(audit, ["--file", str(critical_file), "--strip"]) + # Should still exit 1 because critical chars remain + assert result.exit_code == 1 + content = critical_file.read_text(encoding="utf-8") + # Critical tag chars should still be present + assert "\U000E0001" in content + + def test_strip_mixed_removes_warnings_keeps_critical(self, runner, mixed_file): + result = runner.invoke(audit, ["--file", str(mixed_file), "--strip"]) + assert result.exit_code == 1 # critical still present + content = mixed_file.read_text(encoding="utf-8") + assert "\u200B" not in content # warning stripped + assert "\U000E0041" in content # critical preserved + + def test_strip_clean_file_noop(self, runner, clean_file): + original = clean_file.read_text(encoding="utf-8") + result = runner.invoke(audit, ["--file", str(clean_file), "--strip"]) + assert result.exit_code == 0 + assert clean_file.read_text(encoding="utf-8") == original + + def test_strip_lockfile_mode(self, runner, lockfile_project, monkeypatch): + monkeypatch.chdir(lockfile_project) + result = runner.invoke(audit, ["--strip"]) + assert result.exit_code == 0 + # The warning char in guide.md should be stripped + guide = lockfile_project / ".github" / "instructions" / "guide.md" + content = guide.read_text(encoding="utf-8") + assert "\u200B" not in content + + +# ── _scan_single_file helper tests ─────────────────────────────── + + +class TestScanSingleFile: + """Direct tests for the _scan_single_file helper.""" + + def test_returns_findings_and_count(self, clean_file): + findings, count = _scan_single_file(clean_file) + assert findings == {} + assert count == 1 + + def test_findings_keyed_by_path(self, warning_file): + findings, count = _scan_single_file(warning_file) + assert count == 1 + assert len(findings) == 1 + key = list(findings.keys())[0] + assert str(warning_file.resolve()) == key + + +# ── _apply_strip helper tests ──────────────────────────────────── + + +class TestApplyStrip: + """Direct tests for the _apply_strip helper.""" + + def test_returns_count_of_modified(self, warning_file): + findings, _ = _scan_single_file(warning_file) + modified = _apply_strip(findings, warning_file.parent) + assert modified == 1 + + def test_skips_critical_only_files(self, critical_file): + findings, _ = _scan_single_file(critical_file) + modified = _apply_strip(findings, critical_file.parent) + # File has only critical findings → should not be modified + assert modified == 0 + + def test_rejects_path_outside_root(self, tmp_path): + """_apply_strip must not write files outside project root.""" + evil_path = tmp_path / "outside" / "evil.md" + evil_path.parent.mkdir(parents=True) + evil_path.write_text("Hello\u200Bworld\n", encoding="utf-8") + + findings = ContentScanner.scan_file(evil_path) + # Use a relative path that tries to escape + findings_by_file = {"../../outside/evil.md": findings} + + project = tmp_path / "project" + project.mkdir() + modified = _apply_strip(findings_by_file, project) + assert modified == 0 diff --git a/tests/unit/test_content_scanner.py b/tests/unit/test_content_scanner.py new file mode 100644 index 000000000..23876d3bc --- /dev/null +++ b/tests/unit/test_content_scanner.py @@ -0,0 +1,313 @@ +"""Tests for the content scanner module.""" + +import tempfile +from pathlib import Path + +import pytest + +from apm_cli.security.content_scanner import ContentScanner, ScanFinding + + +class TestScanText: + """Tests for ContentScanner.scan_text().""" + + def test_clean_text_returns_empty(self): + """Ordinary ASCII+emoji text produces no findings.""" + content = "# My Prompt\n\nDo the thing. 🚀\n" + findings = ContentScanner.scan_text(content) + assert findings == [] + + def test_empty_string_returns_empty(self): + findings = ContentScanner.scan_text("") + assert findings == [] + + def test_whitespace_only_returns_empty(self): + findings = ContentScanner.scan_text(" \n\n\t\t\n") + assert findings == [] + + # ── Critical: tag characters ── + + def test_tag_character_detected_as_critical(self): + """U+E0001 (language tag) must be flagged as critical.""" + content = f"Hello \U000E0001 world" + findings = ContentScanner.scan_text(content, filename="test.md") + assert len(findings) == 1 + assert findings[0].severity == "critical" + assert findings[0].category == "tag-character" + assert findings[0].codepoint == "U+E0001" + assert findings[0].file == "test.md" + + def test_multiple_tag_characters(self): + """Full range of tag chars embedded in text.""" + # Embed a few tag characters that map to invisible ASCII + tag_a = chr(0xE0041) # TAG LATIN CAPITAL LETTER A + tag_b = chr(0xE0042) # TAG LATIN CAPITAL LETTER B + content = f"some{tag_a}text{tag_b}here" + findings = ContentScanner.scan_text(content) + assert len(findings) == 2 + assert all(f.severity == "critical" for f in findings) + assert findings[0].codepoint == "U+E0041" + assert findings[1].codepoint == "U+E0042" + + def test_tag_cancel_detected(self): + """U+E007F (CANCEL TAG) is also critical.""" + content = f"text{chr(0xE007F)}end" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "critical" + assert findings[0].codepoint == "U+E007F" + + # ── Critical: bidi overrides ── + + def test_bidi_lro_detected(self): + """U+202D (LRO) left-to-right override.""" + content = f"normal \u202D overridden" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "critical" + assert findings[0].category == "bidi-override" + + def test_bidi_rlo_detected(self): + """U+202E (RLO) right-to-left override.""" + content = f"normal \u202E reversed" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "critical" + assert findings[0].codepoint == "U+202E" + + def test_bidi_isolates_detected(self): + """U+2066-U+2069 isolates are critical.""" + content = f"a\u2066b\u2067c\u2068d\u2069e" + findings = ContentScanner.scan_text(content) + assert len(findings) == 4 + assert all(f.severity == "critical" for f in findings) + + # ── Warning: zero-width characters ── + + def test_zero_width_space_detected(self): + """U+200B zero-width space.""" + content = f"hello\u200Bworld" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "warning" + assert findings[0].category == "zero-width" + + def test_zwj_detected(self): + """U+200D zero-width joiner.""" + content = f"hello\u200Dworld" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "warning" + assert findings[0].codepoint == "U+200D" + + def test_zwnj_detected(self): + """U+200C zero-width non-joiner.""" + content = f"hello\u200Cworld" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "warning" + + def test_word_joiner_detected(self): + """U+2060 word joiner.""" + content = f"hello\u2060world" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "warning" + + def test_soft_hyphen_detected(self): + """U+00AD soft hyphen.""" + content = f"hel\u00ADlo" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "warning" + assert findings[0].category == "invisible-formatting" + + # ── Info: unusual whitespace ── + + def test_nbsp_detected_as_info(self): + """U+00A0 non-breaking space.""" + content = f"hello\u00A0world" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "info" + assert findings[0].category == "unusual-whitespace" + + def test_em_space_detected(self): + """U+2003 em space (in the U+2000-U+200A range).""" + content = f"hello\u2003world" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "info" + + def test_ideographic_space(self): + """U+3000 ideographic space.""" + content = f"hello\u3000world" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "info" + + # ── BOM handling ── + + def test_bom_at_start_is_info(self): + """BOM (U+FEFF) at file start is standard — info severity.""" + content = "\uFEFF# My Document" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "info" + assert findings[0].category == "bom" + assert findings[0].line == 1 + assert findings[0].column == 1 + + def test_bom_mid_file_is_warning(self): + """BOM in the middle of a file is suspicious.""" + content = "line one\n\uFEFFline two" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].severity == "warning" + assert findings[0].category == "zero-width" + assert findings[0].line == 2 + + # ── Position accuracy ── + + def test_line_column_accuracy(self): + """Findings report correct 1-based line and column numbers.""" + # Place a zero-width space at line 3, col 6 + content = "line1\nline2\nline3\u200Brest" + findings = ContentScanner.scan_text(content) + assert len(findings) == 1 + assert findings[0].line == 3 + assert findings[0].column == 6 + + def test_multiple_findings_on_same_line(self): + content = f"a\u200Bb\u200Cc" + findings = ContentScanner.scan_text(content) + assert len(findings) == 2 + assert findings[0].column == 2 + assert findings[1].column == 4 + + # ── Mixed content ── + + def test_mixed_severities(self): + """Content with chars from all severity levels.""" + content = f"\u00A0visible\u200Btext\u202Ehidden" + findings = ContentScanner.scan_text(content) + severities = {f.severity for f in findings} + assert severities == {"info", "warning", "critical"} + + def test_normal_unicode_not_flagged(self): + """Legitimate Unicode (CJK, accented chars, emoji) is fine.""" + content = "日本語テスト café résumé 🎉 ñ ü ö" + findings = ContentScanner.scan_text(content) + assert findings == [] + + +class TestScanFile: + """Tests for ContentScanner.scan_file().""" + + def test_scan_clean_file(self, tmp_path): + f = tmp_path / "clean.md" + f.write_text("# Clean file\nNo issues here.", encoding="utf-8") + findings = ContentScanner.scan_file(f) + assert findings == [] + + def test_scan_file_with_findings(self, tmp_path): + f = tmp_path / "suspicious.md" + f.write_text(f"hello\u200Bworld", encoding="utf-8") + findings = ContentScanner.scan_file(f) + assert len(findings) == 1 + assert findings[0].file == str(f) + + def test_binary_file_returns_empty(self, tmp_path): + f = tmp_path / "binary.bin" + f.write_bytes(b"\x80\x81\x82\xff\xfe") + findings = ContentScanner.scan_file(f) + assert findings == [] + + def test_nonexistent_file_returns_empty(self, tmp_path): + f = tmp_path / "does_not_exist.md" + findings = ContentScanner.scan_file(f) + assert findings == [] + + def test_latin1_file_returns_empty(self, tmp_path): + """Non-UTF-8 encoded files should be skipped gracefully.""" + f = tmp_path / "latin1.txt" + f.write_bytes("Stra\xdfe".encode("latin-1")) + findings = ContentScanner.scan_file(f) + assert findings == [] + + def test_bom_plus_critical_detected(self, tmp_path): + """Files with BOM and critical chars should report both.""" + f = tmp_path / "bom_critical.md" + f.write_text("\ufeff" + "tag\U000E0041char\n", encoding="utf-8") + findings = ContentScanner.scan_file(f) + severities = {fnd.severity for fnd in findings} + assert "critical" in severities + assert "info" in severities # Leading BOM is info-level + + +class TestHasCritical: + def test_no_findings(self): + assert ContentScanner.has_critical([]) is False + + def test_only_warnings(self): + findings = [ScanFinding("f", 1, 1, "", "U+200B", "warning", "zw", "")] + assert ContentScanner.has_critical(findings) is False + + def test_with_critical(self): + findings = [ScanFinding("f", 1, 1, "", "U+E0001", "critical", "tag", "")] + assert ContentScanner.has_critical(findings) is True + + +class TestSummarize: + def test_empty(self): + result = ContentScanner.summarize([]) + assert result == {"critical": 0, "warning": 0, "info": 0} + + def test_mixed(self): + findings = [ + ScanFinding("f", 1, 1, "", "", "critical", "", ""), + ScanFinding("f", 1, 2, "", "", "critical", "", ""), + ScanFinding("f", 1, 3, "", "", "warning", "", ""), + ScanFinding("f", 1, 4, "", "", "info", "", ""), + ] + result = ContentScanner.summarize(findings) + assert result == {"critical": 2, "warning": 1, "info": 1} + + +class TestStripNonCritical: + def test_strips_zero_width_chars(self): + content = f"hello\u200Bworld" + result = ContentScanner.strip_non_critical(content) + assert result == "helloworld" + + def test_strips_nbsp(self): + content = f"hello\u00A0world" + result = ContentScanner.strip_non_critical(content) + assert result == "helloworld" + + def test_preserves_critical_chars(self): + """Tag characters and bidi overrides are NOT stripped.""" + tag = chr(0xE0041) + content = f"hello{tag}world" + result = ContentScanner.strip_non_critical(content) + assert tag in result + + def test_strips_leading_bom(self): + content = f"\uFEFF# Title" + result = ContentScanner.strip_non_critical(content) + assert result == "# Title" + + def test_strips_mid_file_bom(self): + content = f"line1\n\uFEFFline2" + result = ContentScanner.strip_non_critical(content) + assert result == "line1\nline2" + + def test_clean_content_unchanged(self): + content = "# Normal content\nWith normal text." + result = ContentScanner.strip_non_critical(content) + assert result == content + + def test_strips_soft_hyphen(self): + content = f"hel\u00ADlo" + result = ContentScanner.strip_non_critical(content) + assert result == "hello" diff --git a/tests/unit/test_install_scanning.py b/tests/unit/test_install_scanning.py new file mode 100644 index 000000000..c64b69aef --- /dev/null +++ b/tests/unit/test_install_scanning.py @@ -0,0 +1,176 @@ +"""Tests for install-time content scanning integration. + +Verifies that ``_pre_deploy_security_scan()`` blocks deployment on +critical findings and allows deployment on warnings/clean. +""" + +from pathlib import Path + +import pytest + +from apm_cli.commands.install import _pre_deploy_security_scan +from apm_cli.security.content_scanner import ContentScanner +from apm_cli.utils.diagnostics import DiagnosticCollector + + +@pytest.fixture +def clean_files(tmp_path): + """Create several clean text files.""" + paths = [] + for name in ("a.md", "b.md", "c.md"): + p = tmp_path / name + p.write_text(f"# {name}\nClean content.\n", encoding="utf-8") + paths.append(p) + return paths + + +@pytest.fixture +def mixed_files(tmp_path): + """Create files with varying severity levels.""" + clean = tmp_path / "clean.md" + clean.write_text("No issues here.\n", encoding="utf-8") + + warning = tmp_path / "warning.md" + warning.write_text("Has zero\u200Bwidth.\n", encoding="utf-8") + + critical = tmp_path / "critical.md" + critical.write_text("Has tag\U000E0041char.\n", encoding="utf-8") + + return [clean, warning, critical] + + +# ── Diagnostics security rendering tests ──────────────────────────── + + +class TestDiagnosticsSecurityRendering: + """Tests for security category rendering in DiagnosticCollector.""" + + def test_render_summary_includes_security(self, mixed_files, capsys): + diag = DiagnosticCollector() + for f in mixed_files: + findings = ContentScanner.scan_file(f) + if findings: + has_crit, summary = ContentScanner.classify(findings) + sev = "critical" if has_crit else "warning" + diag.security( + message=str(f), package="pkg", + detail=f"{len(findings)} finding(s)", severity=sev, + ) + diag.render_summary() + captured = capsys.readouterr() + assert "Diagnostics" in captured.out or "security" in captured.out.lower() + + def test_critical_security_flag(self, tmp_path): + p = tmp_path / "evil.md" + p.write_text("x\U000E0001y\n", encoding="utf-8") + diag = DiagnosticCollector() + findings = ContentScanner.scan_file(p) + diag.security( + message=str(p), package="pkg", + detail=f"{len(findings)} finding(s)", severity="critical", + ) + assert diag.has_critical_security is True + + def test_no_critical_when_only_warnings(self, tmp_path): + p = tmp_path / "warn.md" + p.write_text("x\u200By\n", encoding="utf-8") + diag = DiagnosticCollector() + findings = ContentScanner.scan_file(p) + diag.security( + message=str(p), package="pkg", + detail=f"{len(findings)} finding(s)", severity="warning", + ) + assert diag.has_critical_security is False + + +# ── Pre-deploy security scan tests ─────────────────────────────── + + +class TestPreDeploySecurityScan: + """Tests for _pre_deploy_security_scan() — the pre-deployment gate.""" + + def test_clean_package_allows_deploy(self, tmp_path): + (tmp_path / "prompt.md").write_text("Clean content\n", encoding="utf-8") + diag = DiagnosticCollector() + assert _pre_deploy_security_scan(tmp_path, diag, package_name="pkg") is True + assert diag.security_count == 0 + + def test_critical_chars_block_deploy(self, tmp_path): + (tmp_path / "evil.md").write_text( + "hidden\U000E0001tag\n", encoding="utf-8" + ) + diag = DiagnosticCollector() + result = _pre_deploy_security_scan( + tmp_path, diag, package_name="pkg", force=False, + ) + assert result is False + assert diag.has_critical_security + + def test_critical_chars_with_force_allows_deploy(self, tmp_path): + (tmp_path / "evil.md").write_text( + "hidden\U000E0001tag\n", encoding="utf-8" + ) + diag = DiagnosticCollector() + result = _pre_deploy_security_scan( + tmp_path, diag, package_name="pkg", force=True, + ) + assert result is True + assert diag.has_critical_security # still records the finding + + def test_warnings_allow_deploy(self, tmp_path): + (tmp_path / "warn.md").write_text( + "zero\u200Bwidth\n", encoding="utf-8" + ) + diag = DiagnosticCollector() + result = _pre_deploy_security_scan( + tmp_path, diag, package_name="pkg", + ) + assert result is True + assert diag.security_count == 1 + assert not diag.has_critical_security + + def test_scans_nested_files(self, tmp_path): + """Source files in subdirectories are scanned.""" + sub = tmp_path / "subdir" + sub.mkdir() + (sub / "deep.md").write_text("tag\U000E0041char\n", encoding="utf-8") + diag = DiagnosticCollector() + result = _pre_deploy_security_scan( + tmp_path, diag, package_name="pkg", force=False, + ) + assert result is False + + def test_empty_package_allows_deploy(self, tmp_path): + diag = DiagnosticCollector() + assert _pre_deploy_security_scan(tmp_path, diag) is True + assert diag.security_count == 0 + + def test_package_name_in_diagnostic(self, tmp_path): + (tmp_path / "x.md").write_text("z\u200Bw\n", encoding="utf-8") + diag = DiagnosticCollector() + _pre_deploy_security_scan(tmp_path, diag, package_name="my-pkg") + items = diag.by_category().get("security", []) + assert len(items) == 1 + assert items[0].package == "my-pkg" + + def test_does_not_follow_symlinked_directories(self, tmp_path): + """Symlinked directories should not be traversed.""" + # Create a directory outside the package with a critical file + outside = tmp_path / "outside" + outside.mkdir() + (outside / "evil.md").write_text("tag\U000E0001char\n", encoding="utf-8") + + # Package directory with a symlink pointing outside + pkg = tmp_path / "pkg" + pkg.mkdir() + (pkg / "clean.md").write_text("Clean\n", encoding="utf-8") + try: + (pkg / "escape").symlink_to(outside) + except OSError: + pytest.skip("symlinks not supported on this platform") + + diag = DiagnosticCollector() + result = _pre_deploy_security_scan(pkg, diag, package_name="test") + # Should allow deploy — the evil file is behind a symlink + assert result is True + assert diag.security_count == 0