diff --git a/README.md b/README.md index 8f5997c6d..dd15e69e9 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,16 @@ strix --target api.your-app.com --instruction "Focus on business logic flaws and strix --target api.your-app.com --instruction-file ./instruction.md ``` +### MCP Server (AI Agent Integration) + +Use Strix as an MCP server to integrate with AI coding agents like Claude Code, Cursor, and Windsurf: + +```bash +pip install strix-mcp +``` + +See [`strix-mcp/README.md`](strix-mcp/README.md) for setup instructions and the full tool coverage map. + ### Headless Mode Run Strix programmatically without interactive UI using the `-n/--non-interactive` flag—perfect for servers and automated jobs. The CLI prints real-time vulnerability findings, and the final report before exiting. Exits with non-zero code when vulnerabilities are found. diff --git a/docs/superpowers/plans/2026-03-17-recon-phase.md b/docs/superpowers/plans/2026-03-17-recon-phase.md new file mode 100644 index 000000000..31bc07edd --- /dev/null +++ b/docs/superpowers/plans/2026-03-17-recon-phase.md @@ -0,0 +1,1083 @@ +# Recon Phase Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a Phase 0 reconnaissance phase to Strix's scan flow so Claude automatically discovers attack surface before vulnerability testing. + +**Architecture:** Four coordinated changes — (1) add `"recon"` note category + `nuclei_scan` and `download_sourcemaps` MCP tools in `tools.py`, (2) add recon agent templates + `phase` field to `generate_plan()` in `stack_detector.py`, (3) create 6 recon knowledge modules in `strix/skills/reconnaissance/`, (4) update `methodology.md` with Phase 0 instructions. + +**Tech Stack:** Python 3, FastMCP, Docker sandbox (Kali Linux), pytest + +**Spec:** `docs/superpowers/specs/2026-03-17-recon-phase-design.md` + +**Test command:** `cd strix-mcp && python -m pytest tests/ -v --tb=short -o "addopts=" --ignore=tests/test_integration.py` + +--- + +### Task 1: Add "recon" note category + +**Files:** +- Modify: `strix-mcp/src/strix_mcp/tools.py:1021` +- Test: `strix-mcp/tests/test_tools.py` + +- [ ] **Step 1: Move `_VALID_NOTE_CATEGORIES` to module scope (without adding "recon" yet)** + +**Important:** `_VALID_NOTE_CATEGORIES` is currently defined at line 1021 *inside* `register_tools()` as a local variable. It cannot be imported by tests. Move it to module scope first. + +In `strix-mcp/src/strix_mcp/tools.py`: + +1. Add at module scope (after `_SEVERITY_ORDER` at line 136, before `_normalize_severity`): + +```python +VALID_NOTE_CATEGORIES = ["general", "findings", "methodology", "questions", "plan"] +``` + +2. Delete the local `_VALID_NOTE_CATEGORIES` at line 1021 (inside `register_tools()`) + +3. Update all references from `_VALID_NOTE_CATEGORIES` to `VALID_NOTE_CATEGORIES` inside `register_tools()` (the `create_note` function at ~line 1043) + +- [ ] **Step 2: Write the failing test** + +In `strix-mcp/tests/test_tools.py`, add at the end of the file: + +```python +class TestReconNoteCategory: + def test_recon_is_valid_category(self): + """The 'recon' category should be accepted by the notes system.""" + from strix_mcp.tools import VALID_NOTE_CATEGORIES + assert "recon" in VALID_NOTE_CATEGORIES +``` + +- [ ] **Step 3: Run test to verify it fails** + +Run: `cd strix-mcp && python -m pytest tests/test_tools.py::TestReconNoteCategory -v --tb=short -o "addopts="` +Expected: FAIL with `assert 'recon' in ['general', 'findings', 'methodology', 'questions', 'plan']` + +- [ ] **Step 4: Add "recon" to the list** + +In `strix-mcp/src/strix_mcp/tools.py`, change the module-scope constant: + +```python +VALID_NOTE_CATEGORIES = ["general", "findings", "methodology", "questions", "plan", "recon"] +``` + +Also update the docstring for `create_note` to include `recon`: + +```python + category: general | findings | methodology | questions | plan | recon +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `cd strix-mcp && python -m pytest tests/test_tools.py::TestReconNoteCategory -v --tb=short -o "addopts="` +Expected: PASS + +- [ ] **Step 6: Run full test suite** + +Run: `cd strix-mcp && python -m pytest tests/ -v --tb=short -o "addopts=" --ignore=tests/test_integration.py` +Expected: All tests pass + +- [ ] **Step 7: Commit** + +```bash +git add strix-mcp/src/strix_mcp/tools.py strix-mcp/tests/test_tools.py +git commit -m "feat(mcp): add 'recon' to valid note categories" +``` + +--- + +### Task 2: Add `phase` field to `generate_plan()` and recon templates + +**Files:** +- Modify: `strix-mcp/src/strix_mcp/stack_detector.py:54-345` +- Test: `strix-mcp/tests/test_stack_detector.py` + +- [ ] **Step 1: Write failing tests for recon templates** + +In `strix-mcp/tests/test_stack_detector.py`, add a new test class at the end: + +```python +class TestReconPhase: + def test_web_app_plan_includes_recon_agents(self): + """Web app targets should get phase-0 recon agents.""" + stack = detect_stack(EMPTY_SIGNALS) + plan = generate_plan(stack) + recon_agents = [e for e in plan if e.get("phase") == 0] + assert len(recon_agents) >= 2, f"Expected >=2 recon agents, got {len(recon_agents)}" + # Should have surface discovery and infrastructure + tasks = [a["task"].lower() for a in recon_agents] + assert any("directory" in t or "ffuf" in t or "surface" in t for t in tasks) + assert any("nmap" in t or "nuclei" in t or "infrastructure" in t for t in tasks) + + def test_domain_plan_includes_subdomain_enum(self): + """Domain targets should get subdomain enumeration agent.""" + stack = detect_stack(EMPTY_SIGNALS) + stack["target_types"] = ["domain"] + plan = generate_plan(stack) + recon_agents = [e for e in plan if e.get("phase") == 0] + tasks = [a["task"].lower() for a in recon_agents] + assert any("subdomain" in t for t in tasks), f"No subdomain agent in: {tasks}" + + def test_web_app_no_subdomain_enum(self): + """Web app targets (no domain type) should NOT get subdomain enumeration.""" + stack = detect_stack(EMPTY_SIGNALS) + # No target_types set — pure web_app + plan = generate_plan(stack) + recon_agents = [e for e in plan if e.get("phase") == 0] + tasks = [a["task"].lower() for a in recon_agents] + assert not any("subdomain" in t for t in tasks), f"Unexpected subdomain agent in: {tasks}" + + def test_all_plan_entries_have_phase(self): + """Every plan entry must have a 'phase' field (0 or 1).""" + stack = detect_stack(EMPTY_SIGNALS) + plan = generate_plan(stack) + for entry in plan: + assert "phase" in entry, f"Entry missing 'phase': {entry}" + assert entry["phase"] in (0, 1), f"Invalid phase: {entry['phase']}" + + def test_vuln_agents_have_phase_1(self): + """Existing vulnerability agents should have phase 1.""" + stack = detect_stack(EMPTY_SIGNALS) + plan = generate_plan(stack) + vuln_agents = [e for e in plan if e.get("phase") == 1] + assert len(vuln_agents) >= 3, "Should have at least 3 phase-1 vuln agents" + + def test_recon_modules_not_filtered_by_module_rules(self): + """Recon agent modules should survive even though they're not in MODULE_RULES.""" + stack = detect_stack(EMPTY_SIGNALS) + plan = generate_plan(stack) + recon_agents = [e for e in plan if e.get("phase") == 0] + for agent in recon_agents: + assert len(agent["modules"]) > 0, f"Recon agent has no modules: {agent}" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd strix-mcp && python -m pytest tests/test_stack_detector.py::TestReconPhase -v --tb=short -o "addopts="` +Expected: FAIL (no `phase` field, no recon agents) + +- [ ] **Step 3: Add RECON_TEMPLATES and update generate_plan()** + +In `strix-mcp/src/strix_mcp/stack_detector.py`, add `_RECON_TEMPLATES` after `_AGENT_TEMPLATES` (after line 202): + +```python +# --------------------------------------------------------------------------- +# Recon agent templates (Phase 0 — run before vulnerability agents) +# --------------------------------------------------------------------------- +_RECON_TEMPLATES: list[dict[str, Any]] = [ + { + "id": "recon_surface_discovery", + "task": ( + "Map the attack surface: run directory brute-forcing with ffuf against " + "the target using common and stack-specific wordlists. Check all discovered " + "JS bundles for source maps using download_sourcemaps. Query Wayback Machine " + "for historical endpoints. Write all results as structured recon notes." + ), + "modules": ["directory_bruteforce", "source_map_discovery"], + "triggers": ["web_app", "domain"], + "confidence": "high", + }, + { + "id": "recon_infrastructure", + "task": ( + "Infrastructure reconnaissance: run nmap port scan against the target " + "to discover non-standard ports and services. Run nuclei_scan with default " + "templates for quick vulnerability wins. Write all results as structured " + "recon notes. Nuclei findings are auto-filed as vulnerability reports." + ), + "modules": ["port_scanning", "nuclei_scanning"], + "triggers": ["web_app", "domain"], + "confidence": "high", + }, + { + "id": "recon_subdomain_enum", + "task": ( + "Enumerate subdomains using subfinder and certificate transparency logs. " + "Validate live hosts with httpx. Check for subdomain takeover on dangling " + "CNAMEs. Cross-reference with scope rules before any testing. Write all " + "results as structured recon notes." + ), + "modules": ["subdomain_enumeration"], + "triggers": ["domain"], + "confidence": "high", + }, +] +``` + +Then modify `generate_plan()` (starting at line 316) to process recon templates first and add `phase` to all entries: + +```python + plan: list[dict[str, Any]] = [] + + # --- Phase 0: Recon agents (bypass MODULE_RULES filtering) --- + for template in _RECON_TEMPLATES: + if not any(t in active_triggers for t in template["triggers"]): + continue + plan.append({ + "task": template["task"], + "modules": list(template["modules"]), # include as-is, no filtering + "priority": "high", + "confidence": template["confidence"], + "phase": 0, + }) + + # --- Phase 1: Vulnerability agents (existing logic) --- + for template in _AGENT_TEMPLATES: + # Include template only if any of its triggers are active + if not any(t in active_triggers for t in template["triggers"]): + continue + + # Filter modules to only those in recommended set + filtered_modules = [m for m in template["modules"] if m in recommended_modules] + if not filtered_modules: + continue + + # Determine confidence + if template.get("signal_strength") == "specific": + probe_dependent = any(t in _PROBE_CONFIRMED_TRIGGERS for t in template["triggers"]) + if probe_dependent and probes_were_stale: + confidence = "low" + else: + confidence = "high" + else: + confidence = "medium" + + plan.append({ + "task": template["task"], + "modules": filtered_modules, + "priority": template["priority"], + "confidence": confidence, + "phase": 1, + }) + + return plan +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd strix-mcp && python -m pytest tests/test_stack_detector.py::TestReconPhase -v --tb=short -o "addopts="` +Expected: All 6 tests PASS + +- [ ] **Step 5: Fix existing regression: `test_generic_triggers_are_medium_confidence`** + +The existing test at `test_stack_detector.py:258` asserts ALL plan entries have `confidence == "medium"` for an empty stack. After our change, phase-0 recon agents with `confidence: "high"` will break this test. Fix it to only check phase-1 agents: + +In `strix-mcp/tests/test_stack_detector.py`, change the `test_generic_triggers_are_medium_confidence` method: + +```python + def test_generic_triggers_are_medium_confidence(self): + """Phase-1 templates triggered only by 'always' or 'web_app' (generic) should be medium confidence.""" + # Empty stack — only 'always' and 'web_app' triggers fire + stack = detect_stack(EMPTY_SIGNALS) + plan = generate_plan(stack) + # Only check phase-1 (vuln) agents — phase-0 recon agents have high confidence by design + vuln_agents = [e for e in plan if e.get("phase") == 1] + for entry in vuln_agents: + assert entry["confidence"] == "medium", f"Expected medium for generic trigger: {entry}" +``` + +- [ ] **Step 6: Run full test suite to check for regressions** + +Run: `cd strix-mcp && python -m pytest tests/ -v --tb=short -o "addopts=" --ignore=tests/test_integration.py` +Expected: All tests pass. + +- [ ] **Step 7: Commit** + +```bash +git add strix-mcp/src/strix_mcp/stack_detector.py strix-mcp/tests/test_stack_detector.py +git commit -m "feat(mcp): add recon templates and phase field to generate_plan" +``` + +--- + +### Task 3: Implement `nuclei_scan` tool + +**Files:** +- Modify: `strix-mcp/src/strix_mcp/tools.py` +- Test: `strix-mcp/tests/test_tools.py` + +**Testing note:** The `nuclei_scan` and `download_sourcemaps` MCP tool functions are async closures registered inside `register_tools()` that depend on a live `SandboxManager` with Docker. They cannot be unit-tested without mocking the entire sandbox proxy layer. We test the **helper functions** (`parse_nuclei_jsonl`, `build_nuclei_command`, etc.) which contain all the parsing/logic, and rely on **integration tests** (Task 7 + Docker-based tests) to validate the tool end-to-end. This matches the existing pattern — no other proxied tools in `tools.py` have unit tests for the tool function itself. + +- [ ] **Step 1: Write failing tests** + +In `strix-mcp/tests/test_tools.py`, add: + +```python +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + + +class TestNucleiScan: + """Tests for the nuclei_scan MCP tool logic.""" + + def _make_jsonl(self, findings: list[dict]) -> str: + """Build JSONL string from a list of finding dicts.""" + return "\n".join(json.dumps(f) for f in findings) + + def test_parse_nuclei_jsonl(self): + """parse_nuclei_jsonl should extract template-id, matched-at, severity, and info.""" + from strix_mcp.tools import parse_nuclei_jsonl + + jsonl = self._make_jsonl([ + { + "template-id": "git-config", + "matched-at": "https://target.com/.git/config", + "severity": "medium", + "info": {"name": "Git Config File", "description": "Exposed git config"}, + }, + { + "template-id": "exposed-env", + "matched-at": "https://target.com/.env", + "severity": "high", + "info": {"name": "Exposed .env", "description": "Environment file exposed"}, + }, + ]) + findings = parse_nuclei_jsonl(jsonl) + assert len(findings) == 2 + assert findings[0]["template_id"] == "git-config" + assert findings[0]["url"] == "https://target.com/.git/config" + assert findings[0]["severity"] == "medium" + assert findings[0]["name"] == "Git Config File" + + def test_parse_nuclei_jsonl_skips_bad_lines(self): + """Malformed JSONL lines should be skipped, not crash.""" + from strix_mcp.tools import parse_nuclei_jsonl + + jsonl = 'not valid json\n{"template-id": "ok", "matched-at": "https://x.com", "severity": "low", "info": {"name": "OK", "description": "ok"}}\n{broken' + findings = parse_nuclei_jsonl(jsonl) + assert len(findings) == 1 + assert findings[0]["template_id"] == "ok" + + def test_parse_nuclei_jsonl_empty(self): + """Empty JSONL should return empty list.""" + from strix_mcp.tools import parse_nuclei_jsonl + + assert parse_nuclei_jsonl("") == [] + assert parse_nuclei_jsonl(" \n ") == [] + + def test_build_nuclei_command(self): + """build_nuclei_command should produce correct CLI command.""" + from strix_mcp.tools import build_nuclei_command + + cmd = build_nuclei_command( + target="https://example.com", + severity="critical,high", + rate_limit=50, + templates=["cves", "exposures"], + output_file="/tmp/results.jsonl", + ) + assert "nuclei" in cmd + assert "-u https://example.com" in cmd + assert "-severity critical,high" in cmd + assert "-rate-limit 50" in cmd + assert "-t cves" in cmd + assert "-t exposures" in cmd + assert "-jsonl" in cmd + assert "-o /tmp/results.jsonl" in cmd + + def test_build_nuclei_command_no_templates(self): + """Without templates, command should not include -t flags.""" + from strix_mcp.tools import build_nuclei_command + + cmd = build_nuclei_command( + target="https://example.com", + severity="critical,high,medium", + rate_limit=100, + templates=None, + output_file="/tmp/results.jsonl", + ) + assert "-t " not in cmd +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd strix-mcp && python -m pytest tests/test_tools.py::TestNucleiScan -v --tb=short -o "addopts="` +Expected: FAIL with `ImportError: cannot import name 'parse_nuclei_jsonl'` + +- [ ] **Step 3: Implement helper functions** + +In `strix-mcp/src/strix_mcp/tools.py`, add after the `_normalize_severity` function (after line 142), before `_deduplicate_reports`: + +```python +# --- Nuclei JSONL parsing --- + +def parse_nuclei_jsonl(jsonl: str) -> list[dict[str, Any]]: + """Parse nuclei JSONL output into structured findings. + + Each valid line becomes a dict with keys: template_id, url, severity, name, description. + Malformed lines are silently skipped. + """ + findings: list[dict[str, Any]] = [] + for line in jsonl.strip().splitlines(): + line = line.strip() + if not line: + continue + try: + data = json.loads(line) + except json.JSONDecodeError: + continue + info = data.get("info", {}) + findings.append({ + "template_id": data.get("template-id", "unknown"), + "url": data.get("matched-at", ""), + "severity": data.get("severity", "info"), + "name": info.get("name", ""), + "description": info.get("description", ""), + }) + return findings + + +def build_nuclei_command( + target: str, + severity: str, + rate_limit: int, + templates: list[str] | None, + output_file: str, +) -> str: + """Build a nuclei CLI command string.""" + parts = [ + "nuclei", + f"-u {target}", + f"-severity {severity}", + f"-rate-limit {rate_limit}", + "-jsonl", + f"-o {output_file}", + "-silent", + ] + if templates: + for t in templates: + parts.append(f"-t {t}") + return " ".join(parts) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd strix-mcp && python -m pytest tests/test_tools.py::TestNucleiScan -v --tb=short -o "addopts="` +Expected: All 5 tests PASS + +- [ ] **Step 5: Implement the `nuclei_scan` MCP tool** + +In `strix-mcp/src/strix_mcp/tools.py`, inside `register_tools()`, add after the `suggest_chains` tool (after line ~630) and before the `# --- Proxied Tools ---` comment (line 632). This groups recon tools with the other non-proxied scan coordination tools: + +```python + # --- Recon Tools --- + + @mcp.tool() + async def nuclei_scan( + target: str, + templates: list[str] | None = None, + severity: str = "critical,high,medium", + rate_limit: int = 100, + timeout: int = 600, + agent_id: str | None = None, + ) -> str: + """Run nuclei vulnerability scanner against a target. + + Launches nuclei in the sandbox, parses structured output, + and auto-files confirmed findings as vulnerability reports. + + target: URL or host to scan + templates: template categories (e.g. ["cves", "exposures"]). Defaults to all. + severity: comma-separated severity filter (default "critical,high,medium") + rate_limit: max requests per second (default 100) + timeout: max seconds to wait for completion (default 600) + agent_id: subagent identifier from dispatch_agent (omit for coordinator)""" + scan = sandbox.active_scan + if scan is None: + return json.dumps({"error": "No active scan. Call start_scan first."}) + + output_file = f"/tmp/nuclei_{uuid.uuid4().hex[:8]}.jsonl" + cmd = build_nuclei_command( + target=target, + severity=severity, + rate_limit=rate_limit, + templates=templates, + output_file=output_file, + ) + + # Launch nuclei in background + bg_cmd = f"nohup {cmd} > /dev/null 2>&1 & echo $!" + launch_result = await sandbox.proxy_tool("terminal_execute", { + "command": bg_cmd, + "timeout": 10, + **({"agent_id": agent_id} if agent_id else {}), + }) + pid = "" + if isinstance(launch_result, dict): + output = launch_result.get("output", "") + pid = output.strip().splitlines()[-1].strip() if output.strip() else "" + + # Poll for completion + import asyncio + elapsed = 0 + poll_interval = 15 + timed_out = False + while elapsed < timeout: + await asyncio.sleep(poll_interval) + elapsed += poll_interval + check = await sandbox.proxy_tool("terminal_execute", { + "command": f"kill -0 {pid} 2>/dev/null && echo running || echo done", + "timeout": 5, + **({"agent_id": agent_id} if agent_id else {}), + }) + status = "" + if isinstance(check, dict): + status = check.get("output", "").strip() + if "done" in status: + break + else: + timed_out = True + + # Read results file + read_result = await sandbox.proxy_tool("terminal_execute", { + "command": f"cat {output_file} 2>/dev/null || echo ''", + "timeout": 10, + **({"agent_id": agent_id} if agent_id else {}), + }) + jsonl_output = "" + if isinstance(read_result, dict): + jsonl_output = read_result.get("output", "") + + # Parse findings + findings = parse_nuclei_jsonl(jsonl_output) + + # Auto-file via tracer (requires active tracer) + tracer = get_global_tracer() + if tracer is None: + return json.dumps({ + "error": "No tracer active — nuclei findings cannot be filed. Ensure start_scan was called.", + "total_findings": len(findings), + "findings": [ + {"template_id": f["template_id"], "severity": f["severity"], "url": f["url"]} + for f in findings + ], + }) + + filed = 0 + skipped = 0 + for f in findings: + title = f"{f['name']} — {f['url']}" + existing = tracer.get_existing_vulnerabilities() + normalized = _normalize_title(title) + if _find_duplicate(normalized, existing) is not None: + skipped += 1 + continue + tracer.add_vulnerability_report( + title=title, + severity=_normalize_severity(f["severity"]), + description=f"**Nuclei template:** {f['template_id']}\n\n{f['description']}", + endpoint=f["url"], + ) + filed += 1 + + severity_breakdown: dict[str, int] = {} + for f in findings: + sev = _normalize_severity(f["severity"]) + severity_breakdown[sev] = severity_breakdown.get(sev, 0) + 1 + + return json.dumps({ + "target": target, + "templates_used": templates or ["all"], + "total_findings": len(findings), + "auto_filed": filed, + "skipped_duplicates": skipped, + "timed_out": timed_out, + "severity_breakdown": severity_breakdown, + "findings": [ + {"template_id": f["template_id"], "severity": f["severity"], "url": f["url"]} + for f in findings + ], + }) +``` + +- [ ] **Step 6: Run full test suite** + +Run: `cd strix-mcp && python -m pytest tests/ -v --tb=short -o "addopts=" --ignore=tests/test_integration.py` +Expected: All tests pass + +- [ ] **Step 7: Commit** + +```bash +git add strix-mcp/src/strix_mcp/tools.py strix-mcp/tests/test_tools.py +git commit -m "feat(mcp): add nuclei_scan tool with auto-report filing" +``` + +--- + +### Task 4: Implement `download_sourcemaps` tool + +**Files:** +- Modify: `strix-mcp/src/strix_mcp/tools.py` +- Test: `strix-mcp/tests/test_tools.py` + +**Testing note:** Same as Task 3 — helpers are unit-tested, the tool function requires Docker for integration testing. + +**Implementation note:** The `download_sourcemaps` tool builds a Python script as a string and executes it via `python_action` in the sandbox. This avoids 30-60+ proxy round trips but makes the code harder to read. Regex patterns and the target URL are injected via `repr()` + `.replace()` to avoid escaping issues inside nested string literals. If debugging this at runtime, the easiest approach is to print the `script` variable before execution to inspect the generated code. + +- [ ] **Step 1: Write failing tests** + +In `strix-mcp/tests/test_tools.py`, add: + +```python +class TestSourcemapHelpers: + def test_extract_script_urls(self): + """extract_script_urls should find all script src attributes.""" + from strix_mcp.tools import extract_script_urls + + html = ''' + + + + + ''' + urls = extract_script_urls(html, "https://example.com") + assert "https://example.com/assets/main.js" in urls + assert "https://cdn.example.com/lib.js" in urls + assert "https://example.com/assets/vendor.js" in urls + assert len(urls) == 3 + + def test_extract_script_urls_empty(self): + """No script tags should return empty list.""" + from strix_mcp.tools import extract_script_urls + + assert extract_script_urls("
hi", "https://x.com") == [] + + def test_extract_sourcemap_url(self): + """extract_sourcemap_url should find sourceMappingURL comment.""" + from strix_mcp.tools import extract_sourcemap_url + + js = "var x=1;\n//# sourceMappingURL=main.js.map" + assert extract_sourcemap_url(js) == "main.js.map" + + def test_extract_sourcemap_url_at_syntax(self): + """Should also find //@ sourceMappingURL syntax.""" + from strix_mcp.tools import extract_sourcemap_url + + js = "var x=1;\n//@ sourceMappingURL=old.js.map" + assert extract_sourcemap_url(js) == "old.js.map" + + def test_extract_sourcemap_url_not_found(self): + """No sourceMappingURL should return None.""" + from strix_mcp.tools import extract_sourcemap_url + + assert extract_sourcemap_url("var x=1;") is None + + def test_scan_for_notable_patterns(self): + """scan_for_notable should find API_KEY and SECRET patterns.""" + from strix_mcp.tools import scan_for_notable + + sources = { + "src/config.ts": "const API_KEY = 'abc123';\nconst name = 'test';", + "src/auth.ts": "const SECRET = 'mysecret';", + "src/utils.ts": "function add(a, b) { return a + b; }", + } + notable = scan_for_notable(sources) + assert any("config.ts" in n and "API_KEY" in n for n in notable) + assert any("auth.ts" in n and "SECRET" in n for n in notable) + assert not any("utils.ts" in n for n in notable) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd strix-mcp && python -m pytest tests/test_tools.py::TestSourcemapHelpers -v --tb=short -o "addopts="` +Expected: FAIL with `ImportError` + +- [ ] **Step 3: Implement helper functions** + +In `strix-mcp/src/strix_mcp/tools.py`, add after `build_nuclei_command` (before `_deduplicate_reports`): + +```python +# --- Source map discovery helpers --- + +import re as _re +from urllib.parse import urljoin as _urljoin + + +def extract_script_urls(html: str, base_url: str) -> list[str]: + """Extract absolute URLs of ', html, re.DOTALL | re.IGNORECASE, + ) + inline_js = "\n".join(s for s in inline_scripts if len(s) > 50) + if inline_js: + # Analyze inline scripts as a virtual bundle + _analyze_bundle( + inline_js, "(inline)", patterns, framework_signals, findings, + ) + else: + findings["errors"].append(f"Failed to fetch {target_url}: HTTP {resp.status_code}") + except Exception as e: + findings["errors"].append(f"Failed to fetch {target_url}: {e}") + + # Deduplicate URLs + seen_urls: set[str] = set() + unique_js_urls: list[str] = [] + for url in js_urls: + if url not in seen_urls: + seen_urls.add(url) + unique_js_urls.append(url) + + # Fetch and analyze each bundle + for js_url in unique_js_urls: + try: + resp = await client.get(js_url, headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + }) + if resp.status_code != 200: + findings["errors"].append(f"HTTP {resp.status_code} for {js_url}") + continue + + content = resp.text + if len(content) > max_bundle_size: + findings["bundles_skipped"] += 1 + continue + + findings["bundles_analyzed"] += 1 + _analyze_bundle( + content, js_url, patterns, framework_signals, findings, + ) + + except Exception as e: + findings["errors"].append(f"Failed to fetch {js_url}: {e}") + + # Deduplicate all list fields + for key in [ + "api_endpoints", "collection_names", "environment_variables", + "secrets", "oauth_ids", "internal_hostnames", "websocket_urls", + "route_definitions", "cspt_sinks", "postmessage_listeners", + "internal_packages", "interesting_strings", + ]: + findings[key] = sorted(set(findings[key])) + + findings["total_findings"] = sum( + len(findings[k]) for k in [ + "api_endpoints", "collection_names", "environment_variables", + "secrets", "oauth_ids", "internal_hostnames", "websocket_urls", + "route_definitions", "cspt_sinks", "postmessage_listeners", + "internal_packages", + ] + ) + + return json.dumps(findings) + + # --- Smart API Surface Discovery (MCP-side, direct HTTP) --- + + @mcp.tool() + async def discover_api( + target_url: str, + extra_paths: list[str] | None = None, + extra_headers: dict[str, str] | None = None, + ) -> str: + """Smart API surface discovery. Probes a target with multiple content-types, + detects GraphQL/gRPC-web services, checks for OpenAPI specs, and identifies + responsive API paths. No sandbox required. + + Goes beyond path fuzzing — detects what kind of API the target speaks + and returns the information needed to test it. + + target_url: base URL to probe (e.g. "https://api.example.com") + extra_paths: additional paths to probe beyond the defaults + extra_headers: additional headers to include in all probes (e.g. app-specific version headers) + + Use during reconnaissance when the target returns generic responses to curl + (e.g. SPA shells, empty 200s) to discover the actual API surface.""" + import httpx + + base = target_url.rstrip("/") + base_headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + **(extra_headers or {}), + } + + results: dict[str, Any] = { + "target_url": target_url, + "graphql": None, + "grpc_web": None, + "openapi_spec": None, + "responsive_paths": [], + "content_type_probes": [], + "errors": [], + } + + # --- Paths to probe --- + api_paths = [ + "/api", "/api/v1", "/api/v2", "/api/v3", + "/v1", "/v2", "/v3", + "/rest", "/rest/v1", + "/graphql", "/api/graphql", "/gql", "/query", + "/health", "/healthz", "/ready", "/status", + "/.well-known/openapi.json", "/.well-known/openapi.yaml", + ] + if extra_paths: + api_paths.extend(extra_paths) + + # --- OpenAPI/Swagger spec locations --- + spec_paths = [ + "/openapi.json", "/openapi.yaml", "/swagger.json", "/swagger.yaml", + "/api-docs", "/api-docs.json", "/api/swagger.json", + "/docs/openapi.json", "/v1/openapi.json", "/api/v1/openapi.json", + "/swagger/v1/swagger.json", "/.well-known/openapi.json", + ] + + # --- GraphQL detection paths --- + graphql_paths = ["/graphql", "/api/graphql", "/gql", "/query", "/api/query"] + + # --- Content-types to probe --- + content_types = [ + ("application/json", '{"query":"test"}'), + ("application/x-www-form-urlencoded", "query=test"), + ("application/grpc-web+proto", b"\x00\x00\x00\x00\x05\x0a\x03foo"), + ("application/grpc-web-text", "AAAABQ=="), + ("multipart/form-data; boundary=strix", "--strix\r\nContent-Disposition: form-data; name=\"test\"\r\n\r\nvalue\r\n--strix--"), + ("application/x-protobuf", b"\x0a\x04test"), + ] + + async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client: + + # --- Phase 1: GraphQL detection --- + graphql_introspection = '{"query":"{ __schema { types { name } } }"}' + for gql_path in graphql_paths: + try: + resp = await client.post( + f"{base}{gql_path}", + headers={**base_headers, "Content-Type": "application/json"}, + content=graphql_introspection, + ) + if resp.status_code == 200: + body = resp.text + if "__schema" in body or '"types"' in body or '"data"' in body: + try: + data = resp.json() + except Exception: + data = {} + type_names = [] + schema = data.get("data", {}).get("__schema", {}) + if schema: + type_names = [t.get("name", "") for t in schema.get("types", [])[:20]] + results["graphql"] = { + "path": gql_path, + "introspection": "enabled" if schema else "partial", + "types": type_names, + } + break + # Check if GraphQL but introspection disabled + elif resp.status_code in (400, 405): + body = resp.text + if "graphql" in body.lower() or "must provide" in body.lower() or "query" in body.lower(): + results["graphql"] = { + "path": gql_path, + "introspection": "disabled", + "hint": body[:200], + } + break + except Exception: + pass + + # --- Phase 2: gRPC-web detection --- + grpc_paths = ["/", "/api", "/grpc", "/service"] + for grpc_path in grpc_paths: + try: + resp = await client.post( + f"{base}{grpc_path}", + headers={ + **base_headers, + "Content-Type": "application/grpc-web+proto", + "X-Grpc-Web": "1", + }, + content=b"\x00\x00\x00\x00\x00", + ) + # gRPC services typically return specific headers or status codes + grpc_status = resp.headers.get("grpc-status") + content_type = resp.headers.get("content-type", "") + if grpc_status is not None or "grpc" in content_type.lower(): + results["grpc_web"] = { + "path": grpc_path, + "grpc_status": grpc_status, + "content_type": content_type, + } + break + # Some WAFs block gRPC specifically + if resp.status_code in (403, 406) and "grpc" in resp.text.lower(): + results["grpc_web"] = { + "path": grpc_path, + "status": "blocked_by_waf", + "hint": resp.text[:200], + } + break + except Exception: + pass + + # --- Phase 3: OpenAPI/Swagger spec discovery --- + for spec_path in spec_paths: + try: + resp = await client.get( + f"{base}{spec_path}", + headers=base_headers, + ) + if resp.status_code == 200: + body = resp.text[:500] + if any(marker in body for marker in ['"openapi"', '"swagger"', "openapi:", "swagger:"]): + try: + spec_data = resp.json() + endpoints = [] + for path, methods in spec_data.get("paths", {}).items(): + for method in methods: + if method.upper() in ("GET", "POST", "PUT", "DELETE", "PATCH"): + endpoints.append(f"{method.upper()} {path}") + results["openapi_spec"] = { + "url": f"{base}{spec_path}", + "title": spec_data.get("info", {}).get("title", ""), + "version": spec_data.get("info", {}).get("version", ""), + "endpoint_count": len(endpoints), + "endpoints": endpoints[:50], + } + except Exception: + results["openapi_spec"] = { + "url": f"{base}{spec_path}", + "format": "yaml_or_unparseable", + } + break + except Exception: + pass + + # --- Phase 4: Path probing with multiple content-types (concurrent) --- + sem = asyncio.Semaphore(5) # max 5 concurrent path probes + + async def _probe_path(path: str) -> dict[str, Any] | None: + async with sem: + url = f"{base}{path}" + path_results: dict[str, Any] = {"path": path, "responses": {}} + interesting = False + + try: + resp = await client.get(url, headers=base_headers) + path_results["responses"]["GET"] = { + "status": resp.status_code, + "content_type": resp.headers.get("content-type", ""), + "body_length": len(resp.text), + } + if resp.status_code not in (404, 405, 502, 503): + interesting = True + except Exception: + pass + + for ct, body in content_types: + try: + resp = await client.post( + url, + headers={**base_headers, "Content-Type": ct}, + content=body if isinstance(body, bytes) else body.encode(), + ) + ct_key = ct.split(";")[0] + path_results["responses"][f"POST_{ct_key}"] = { + "status": resp.status_code, + "content_type": resp.headers.get("content-type", ""), + "body_length": len(resp.text), + } + if resp.status_code not in (404, 405, 502, 503): + interesting = True + except Exception: + pass + + return path_results if interesting else None + + probe_results = await asyncio.gather(*[_probe_path(p) for p in api_paths]) + results["responsive_paths"] = [r for r in probe_results if r is not None] + + # --- Phase 5: Content-type differential on base URL --- + # Probes the root URL specifically — api_paths may not include "/" and + # some SPAs only respond differently at the root. + for ct, body in content_types: + try: + resp = await client.post( + base, + headers={**base_headers, "Content-Type": ct if "boundary" not in ct else ct}, + content=body if isinstance(body, bytes) else body.encode(), + ) + ct_key = ct.split(";")[0] + results["content_type_probes"].append({ + "content_type": ct_key, + "status": resp.status_code, + "response_content_type": resp.headers.get("content-type", ""), + "body_length": len(resp.text), + }) + except Exception as e: + results["content_type_probes"].append({ + "content_type": ct.split(";")[0], + "error": str(e), + }) + + # --- Summary --- + results["summary"] = { + "has_graphql": results["graphql"] is not None, + "has_grpc_web": results["grpc_web"] is not None, + "has_openapi_spec": results["openapi_spec"] is not None, + "responsive_path_count": len(results["responsive_paths"]), + } + + return json.dumps(results) + + # --- Cross-Tool Chain Reasoning (MCP-side) --- + + @mcp.tool() + async def reason_chains( + firebase_results: dict[str, Any] | None = None, + js_analysis: dict[str, Any] | None = None, + services: dict[str, Any] | None = None, + session_comparison: dict[str, Any] | None = None, + api_discovery: dict[str, Any] | None = None, + ) -> str: + """Reason about vulnerability chains by correlating findings across + multiple recon tools. Pass the JSON results from firebase_audit, + analyze_js_bundles, discover_services, compare_sessions, and/or + discover_api. Also reads existing vulnerability reports from the + current scan. + + Returns chain hypotheses — each with evidence (what you found), + chain description (what attack this enables), missing links (what's + needed to prove it), and a concrete next action. + + Call after running recon tools to discover higher-order attack paths + that no single tool would surface alone. + + firebase_results: output from firebase_audit + js_analysis: output from analyze_js_bundles + services: output from discover_services + session_comparison: output from compare_sessions + api_discovery: output from discover_api""" + from .chaining import reason_cross_tool_chains + + # Collect existing vuln reports if scan is active + tracer = get_global_tracer() + vuln_reports = tracer.get_existing_vulnerabilities() if tracer else [] + + chains = reason_cross_tool_chains( + firebase_results=firebase_results, + js_analysis=js_analysis, + services=services, + session_comparison=session_comparison, + api_discovery=api_discovery, + vuln_reports=vuln_reports, + ) + + # Sort by severity + severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} + chains.sort(key=lambda c: severity_order.get(c.get("severity", "low"), 99)) + + return json.dumps({ + "total_chains": len(chains), + "chains": chains, + }) + + # --- CMS & Third-Party Service Discovery (MCP-side, direct HTTP + DNS) --- + + @mcp.tool() + async def discover_services( + target_url: str, + check_dns: bool = True, + ) -> str: + """Discover third-party services and CMS platforms used by the target. + Scans page source and JS bundles for service identifiers, then probes + each discovered service to check if its API is publicly accessible. + No sandbox required. + + Detects: Sanity CMS, Firebase, Supabase, Stripe, Algolia, Sentry, + Segment, LaunchDarkly, Intercom, Mixpanel, Google Analytics, Amplitude, + Contentful, Prismic, Strapi, Auth0, Okta, AWS Cognito. + + target_url: URL to scan for third-party service identifiers + check_dns: whether to lookup DNS TXT records for service verification strings (default true) + + Use during reconnaissance to find hidden attack surface in third-party integrations.""" + import httpx + + service_patterns: dict[str, list[tuple[re.Pattern[str], int]]] = { + "sanity": [ + (re.compile(r'''projectId["':\s]+["']([a-z0-9]{8,12})["']'''), 1), + (re.compile(r'''cdn\.sanity\.io/[^"']*?([a-z0-9]{8,12})'''), 1), + ], + "firebase": [ + (re.compile(r'''["']([a-z0-9\-]+)\.firebaseapp\.com["']'''), 1), + (re.compile(r'''["']([a-z0-9\-]+)\.firebaseio\.com["']'''), 1), + ], + "supabase": [ + (re.compile(r'''["']([a-z]{20})\.supabase\.co["']'''), 1), + (re.compile(r'''supabaseUrl["':\s]+["'](https://[a-z]+\.supabase\.co)["']'''), 1), + ], + "stripe": [ + (re.compile(r'''["'](pk_(?:live|test)_[A-Za-z0-9]{20,})["']'''), 1), + ], + "algolia": [ + (re.compile(r'''(?:appId|applicationId|application_id)["':\s]+["']([A-Z0-9]{10})["']''', re.IGNORECASE), 1), + ], + "sentry": [ + (re.compile(r'''["'](https://[a-f0-9]+@[a-z0-9]+\.ingest\.sentry\.io/\d+)["']'''), 1), + ], + "segment": [ + (re.compile(r'''(?:writeKey|write_key)["':\s]+["']([A-Za-z0-9]{20,})["']'''), 1), + (re.compile(r'''analytics\.load\(["']([A-Za-z0-9]{20,})["']\)'''), 1), + ], + "intercom": [ + (re.compile(r'''intercomSettings.*?app_id["':\s]+["']([a-z0-9]{8})["']''', re.IGNORECASE), 1), + ], + "mixpanel": [ + (re.compile(r'''mixpanel\.init\(["']([a-f0-9]{32})["']'''), 1), + ], + "google_analytics": [ + (re.compile(r'''["'](G-[A-Z0-9]{10,})["']'''), 1), + (re.compile(r'''["'](UA-\d{6,}-\d{1,})["']'''), 1), + (re.compile(r'''["'](GTM-[A-Z0-9]{6,})["']'''), 1), + ], + "auth0": [ + (re.compile(r'''["']([a-zA-Z0-9]+\.(?:us|eu|au|jp)\.auth0\.com)["']'''), 1), + ], + "contentful": [ + (re.compile(r'''cdn\.contentful\.com/spaces/([a-z0-9]{12})'''), 1), + ], + } + + results: dict[str, Any] = { + "target_url": target_url, + "discovered_services": {}, + "dns_txt_records": [], + "probes": {}, + "errors": [], + } + + # Phase 1: Fetch page and config endpoints + page_content = "" + async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: + try: + resp = await client.get(target_url, headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + }) + if resp.status_code == 200: + page_content = resp.text + except Exception as e: + results["errors"].append(f"Failed to fetch {target_url}: {e}") + + for config_path in ["/__/firebase/init.json", "/env.js", "/config.js"]: + try: + resp = await client.get( + f"{target_url.rstrip('/')}{config_path}", + headers={"User-Agent": "Mozilla/5.0"}, + ) + if resp.status_code == 200 and len(resp.text) > 10: + page_content += "\n" + resp.text + except Exception: + pass + + # Phase 2: Pattern matching + for service_name, patterns_list in service_patterns.items(): + for pattern, group_idx in patterns_list: + for m in pattern.finditer(page_content): + val = m.group(group_idx) + if service_name not in results["discovered_services"]: + results["discovered_services"][service_name] = [] + if val not in results["discovered_services"][service_name]: + results["discovered_services"][service_name].append(val) + + # Phase 3: Probe discovered services + discovered = results["discovered_services"] + + for project_id in discovered.get("sanity", []): + try: + query = '*[_type != ""][0...5]{_type, _id}' + resp = await client.get( + f"https://{project_id}.api.sanity.io/v2021-10-21/data/query/production", + params={"query": query}, + ) + if resp.status_code == 200: + data = resp.json() + doc_types = sorted({ + doc["_type"] for doc in data.get("result", []) if doc.get("_type") + }) + results["probes"][f"sanity_{project_id}"] = { + "status": "accessible", + "document_types": doc_types, + "sample_count": len(data.get("result", [])), + } + else: + results["probes"][f"sanity_{project_id}"] = {"status": "denied"} + except Exception as e: + results["probes"][f"sanity_{project_id}"] = {"status": f"error: {e}"} + + for key in discovered.get("stripe", []): + if key.startswith("pk_"): + results["probes"][f"stripe_{key[:15]}"] = { + "status": "publishable_key_exposed", + "key_type": "live" if "pk_live" in key else "test", + } + + for dsn in discovered.get("sentry", []): + if "ingest.sentry.io" in dsn: + results["probes"]["sentry_dsn"] = { + "status": "dsn_exposed", + "dsn": dsn, + } + + # Phase 4: DNS TXT records + if check_dns: + from urllib.parse import urlparse + hostname = urlparse(target_url).hostname or "" + parts = hostname.split(".") + domains = [hostname] + if len(parts) > 2: + domains.append(".".join(parts[-2:])) + + for domain in domains: + try: + proc = await asyncio.create_subprocess_exec( + "dig", "+short", "TXT", domain, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) + if stdout: + for line in stdout.decode().strip().splitlines(): + txt = line.strip().replace('" "', '').strip('"') + if txt: + results["dns_txt_records"].append({"domain": domain, "record": txt}) + except FileNotFoundError: + results["errors"].append("DNS TXT lookup skipped: 'dig' not found on system") + break + except Exception: + pass + + results["total_services"] = len(results["discovered_services"]) + results["total_probes"] = len(results["probes"]) + + return json.dumps(results) + + # --- K8s Service Enumeration Wordlist Generator --- + + # Service registry: maps service name -> default ports + K8S_SERVICES: dict[str, list[int]] = { + # K8s core + "kubernetes": [443, 6443], + "kube-dns": [53], + "metrics-server": [443], + "coredns": [53], + # Monitoring + "grafana": [3000], + "prometheus": [9090], + "alertmanager": [9093], + "victoria-metrics": [8428], + "thanos": [9090, 10901], + "loki": [3100], + "tempo": [3200], + # GitOps + "argocd-server": [443, 8080], + # Security + "vault": [8200], + "cert-manager": [9402], + # Service mesh + "istiod": [15010, 15012], + "istio-ingressgateway": [443, 80], + # Auth + "keycloak": [8080, 8443], + "hydra": [4444, 4445], + "dex": [5556], + "oauth2-proxy": [4180], + # Data + "redis": [6379], + "rabbitmq": [5672, 15672], + "kafka": [9092], + "elasticsearch": [9200], + "nats": [4222], + # AWS EKS + "aws-load-balancer-controller": [9443], + "external-dns": [7979], + "ebs-csi-controller": [9808], + "cluster-autoscaler": [8085], + } + _TARGET_DEFAULT_PORTS = [443, 8080, 5432, 3000] + + @mcp.tool() + async def k8s_enumerate( + target_name: str | None = None, + namespaces: list[str] | None = None, + ports: list[int] | None = None, + scheme: str = "https", + max_urls: int = 500, + ) -> str: + """Generate a K8s service enumeration wordlist for SSRF probing. + No sandbox required. + + Returns service URLs to test via SSRF. Each service is mapped to its + known default ports (not a cartesian product), keeping the list compact. + + target_name: company/product name for generating custom service names (e.g. "neon") + namespaces: custom namespaces (default: common K8s namespaces) + ports: ADDITIONAL ports to scan on top of each service's defaults + scheme: URL scheme (default "https") + max_urls: maximum URLs to return (default 500) + + Usage: get the URL list, then use python_action to spray them through + your SSRF vector and observe which ones resolve.""" + + # Build service -> ports mapping (start from registry defaults) + service_ports: dict[str, list[int]] = { + svc: list(svc_ports) for svc, svc_ports in K8S_SERVICES.items() + } + + # Target-specific services with default ports + if target_name: + name = target_name.lower().strip() + for suffix in ["-api", "-proxy", "-auth", "-control-plane", "-storage", "-compute"]: + service_ports[f"{name}{suffix}"] = list(_TARGET_DEFAULT_PORTS) + + # Append user-supplied additional ports to every service + if ports: + for svc in service_ports: + for p in ports: + if p not in service_ports[svc]: + service_ports[svc].append(p) + + # Namespaces + default_namespaces = [ + "default", "kube-system", "monitoring", "argocd", + "vault", "cert-manager", "istio-system", + ] + if target_name: + default_namespaces.append(target_name.lower().strip()) + ns_list = namespaces or default_namespaces + + # Namespace affinity: map services to their likely namespaces + # Only generate URLs for plausible service→namespace combinations + ns_affinity: dict[str, list[str]] = { + "default": ["kubernetes"], + "kube-system": ["kube-dns", "coredns", "metrics-server", "aws-load-balancer-controller", + "external-dns", "ebs-csi-controller", "cluster-autoscaler"], + "monitoring": ["grafana", "prometheus", "alertmanager", "victoria-metrics", + "thanos", "loki", "tempo"], + "argocd": ["argocd-server"], + "vault": ["vault"], + "cert-manager": ["cert-manager"], + "istio-system": ["istiod", "istio-ingressgateway", "envoy", "linkerd-controller"], + } + # Target-specific services go to target namespace + if target_name: + name = target_name.lower().strip() + ns_affinity[name] = [f"{name}{s}" for s in ["-api", "-proxy", "-auth", "-control-plane", "-storage", "-compute"]] + + # Services not in any affinity map go to all namespaces + mapped_services = set() + for svcs in ns_affinity.values(): + mapped_services.update(svcs) + unmapped = [s for s in service_ports if s not in mapped_services] + + # Generate URLs — use affinity when available, fallback to default+kube-system for unmapped + by_namespace: dict[str, list[str]] = {ns: [] for ns in ns_list} + total = 0 + for ns in ns_list: + affinity_svcs = ns_affinity.get(ns, []) + for svc in affinity_svcs: + if svc in service_ports: + for port in service_ports[svc]: + by_namespace[ns].append(f"{scheme}://{svc}.{ns}.svc.cluster.local:{port}") + total += 1 + # Unmapped services only go to default and kube-system + if ns in ("default", "kube-system"): + for svc in unmapped: + for port in service_ports[svc]: + by_namespace[ns].append(f"{scheme}://{svc}.{ns}.svc.cluster.local:{port}") + total += 1 + + # Remove empty namespaces + by_namespace = {ns: urls for ns, urls in by_namespace.items() if urls} + + # Short-form names (service only) + short_forms: list[str] = [f"{scheme}://{svc}" for svc in service_ports] + + # Cap output — distribute evenly across namespaces + omitted = 0 + if max_urls <= 0: + by_namespace = {ns: [] for ns in by_namespace} + omitted = total + total = 0 + elif total > max_urls: + per_ns = max(max_urls // len(by_namespace), 1) + new_total = 0 + for ns in by_namespace: + if len(by_namespace[ns]) > per_ns: + omitted += len(by_namespace[ns]) - per_ns + by_namespace[ns] = by_namespace[ns][:per_ns] + new_total += len(by_namespace[ns]) + total = new_total + + result: dict[str, Any] = { + "total_urls": total, + "services": list(service_ports.keys()), + "namespaces": ns_list, + "urls_by_namespace": by_namespace, + "short_forms": short_forms, + "usage_hint": ( + "Spray these URLs through your SSRF vector. Compare responses to a " + "baseline (known-bad hostname) to identify which services resolve. " + "Short forms work when K8s DNS search domains are configured." + ), + } + if omitted: + result["omitted_urls"] = omitted + result["note"] = f"{omitted} URLs omitted due to max_urls={max_urls} cap." + + return json.dumps(result) + + # --- Blind SSRF Oracle Builder --- + + @mcp.tool() + async def ssrf_oracle( + ssrf_url: str, + ssrf_param: str = "url", + ssrf_method: str = "POST", + ssrf_headers: dict[str, str] | None = None, + ssrf_body_template: str | None = None, + agent_id: str | None = None, + ) -> str: + """Calibrate a blind SSRF oracle by testing response differentials. + Requires an active sandbox. + + Given a confirmed blind SSRF endpoint, tests with known-good and known-bad + targets to build an oracle (timing, status codes) that can distinguish + successful from failed internal requests. + + ssrf_url: the vulnerable endpoint URL (e.g. webhook config endpoint) + ssrf_param: parameter name that accepts the target URL (default "url") + ssrf_method: HTTP method (default POST) + ssrf_headers: additional headers for the SSRF request + ssrf_body_template: request body template with {TARGET_URL} placeholder + agent_id: subagent identifier from dispatch_agent + + NOTE on retry oracle: This tool detects timing and status differentials + from the SSRF config endpoint response. For webhook-style SSRFs where the + real oracle is in delivery retries, you need a 2-phase approach: + (1) set webhook URL to a redirect → interactsh/webhook.site + (2) trigger the event that fires the webhook + (3) count incoming requests at the receiver + Use python_action for this — this tool handles the config-response oracle. + + Returns: oracle calibration data — baseline responses, timing differentials, + and recommended exploitation approach.""" + + scan = sandbox.active_scan + if scan is None: + return json.dumps({"error": "No active scan. Call start_scan first."}) + + extra_headers = ssrf_headers or {} + method = ssrf_method.upper() + + # Helper to send one SSRF probe through the sandbox proxy + async def _send_probe(target_url: str) -> dict[str, Any]: + """Send a single probe through the SSRF vector and measure response.""" + if ssrf_body_template: + body_str = ssrf_body_template.replace("{TARGET_URL}", target_url) + try: + body = json.loads(body_str) + except (json.JSONDecodeError, ValueError): + body = body_str + else: + body = {ssrf_param: target_url} + + req_kwargs: dict[str, Any] = { + "url": ssrf_url, + "method": method, + "headers": { + "Content-Type": "application/json", + **extra_headers, + }, + } + + if isinstance(body, dict): + req_kwargs["body"] = json.dumps(body) + else: + req_kwargs["body"] = str(body) + + if agent_id: + req_kwargs["agent_id"] = agent_id + + t0 = time.monotonic() + try: + resp = await sandbox.proxy_tool("send_request", req_kwargs) + elapsed_ms = round((time.monotonic() - t0) * 1000) + status = resp.get("status_code", resp.get("response", {}).get("status_code", 0)) + body_text = resp.get("body", resp.get("response", {}).get("body", "")) + body_len = len(body_text) if isinstance(body_text, str) else 0 + return { + "status_code": status, + "elapsed_ms": elapsed_ms, + "body_length": body_len, + "body_preview": body_text[:300] if isinstance(body_text, str) else "", + "error": None, + } + except Exception as exc: + elapsed_ms = round((time.monotonic() - t0) * 1000) + return { + "status_code": 0, + "elapsed_ms": elapsed_ms, + "body_length": 0, + "body_preview": "", + "error": str(exc), + } + + # --- Phase 1: Baseline calibration --- + probe_targets = { + "reachable": "https://httpbin.org/status/200", + "unreachable": "https://192.0.2.1/", + "dns_fail": "https://this-domain-does-not-exist-strix-test.invalid/", + } + + baseline: dict[str, Any] = {} + for label, target in probe_targets.items(): + baseline[label] = await _send_probe(target) + + # --- Phase 2: Retry oracle detection --- + retry_oracle: dict[str, Any] = {"detected": False} + + # Probe with status 500 to see if SSRF retries + probe_500 = await _send_probe("https://httpbin.org/status/500") + probe_200 = await _send_probe("https://httpbin.org/status/200") + + # If 500 takes significantly longer than 200, the server may be retrying + if probe_500["elapsed_ms"] > probe_200["elapsed_ms"] * 2 + 500: + retry_oracle["detected"] = True + retry_oracle["evidence"] = ( + f"500 target took {probe_500['elapsed_ms']}ms vs " + f"{probe_200['elapsed_ms']}ms for 200 target — " + f"likely retrying on failure" + ) + retry_oracle["timing_500_ms"] = probe_500["elapsed_ms"] + retry_oracle["timing_200_ms"] = probe_200["elapsed_ms"] + + # --- Phase 3: Timing oracle detection --- + timing_oracle: dict[str, Any] = {"detected": False} + + probe_fast = baseline["reachable"] + probe_slow = await _send_probe("https://httpbin.org/delay/3") + probe_dead = baseline["unreachable"] + + fast_ms = probe_fast["elapsed_ms"] + slow_ms = probe_slow["elapsed_ms"] + dead_ms = probe_dead["elapsed_ms"] + + # Timing oracle exists if slow target causes slower SSRF response + if slow_ms > fast_ms * 1.5 + 1000: + timing_oracle["detected"] = True + timing_oracle["evidence"] = ( + f"Response time correlates with target: fast={fast_ms}ms, " + f"slow={slow_ms}ms, unreachable={dead_ms}ms" + ) + timing_oracle["fast_ms"] = fast_ms + timing_oracle["slow_ms"] = slow_ms + timing_oracle["unreachable_ms"] = dead_ms + + # --- Phase 4: Status differential detection --- + status_oracle: dict[str, Any] = {"detected": False} + + statuses = { + probe_targets["reachable"]: baseline["reachable"]["status_code"], + probe_targets["unreachable"]: baseline["unreachable"]["status_code"], + probe_targets["dns_fail"]: baseline["dns_fail"]["status_code"], + } + unique_statuses = set(statuses.values()) + if len(unique_statuses) > 1 and 0 not in unique_statuses: + status_oracle["detected"] = True + status_oracle["evidence"] = ( + f"Different status codes for different targets: {statuses}" + ) + status_oracle["status_map"] = statuses + + # --- Phase 5: Body differential detection --- + body_oracle: dict[str, Any] = {"detected": False} + body_lengths = { + "reachable": baseline["reachable"]["body_length"], + "unreachable": baseline["unreachable"]["body_length"], + "dns_fail": baseline["dns_fail"]["body_length"], + } + unique_lengths = set(body_lengths.values()) + if len(unique_lengths) > 1: + body_oracle["detected"] = True + body_oracle["evidence"] = f"Different body sizes: {body_lengths}" + body_oracle["body_lengths"] = body_lengths + + # --- Build recommended approach --- + oracles_detected = [] + if retry_oracle["detected"]: + oracles_detected.append("retry") + if timing_oracle["detected"]: + oracles_detected.append("timing") + if status_oracle["detected"]: + oracles_detected.append("status_differential") + if body_oracle["detected"]: + oracles_detected.append("body_differential") + + if not oracles_detected: + recommended = ( + "No clear oracle detected. Try: (1) use a webhook/callback URL " + "(e.g. webhook.site) as target to count callbacks for retry detection, " + "(2) increase timing thresholds with longer delays, " + "(3) test with error-triggering internal targets." + ) + elif "status_differential" in oracles_detected: + recommended = ( + "Use status code differential for port scanning — different status " + "codes reveal whether internal targets respond. Most reliable oracle." + ) + elif "retry" in oracles_detected: + recommended = ( + "Use retry oracle for port scanning — probe internal IPs and count " + "callbacks (via webhook.site) to determine if service is running. " + "500/error responses trigger retries; 200 responses do not." + ) + elif "timing" in oracles_detected: + recommended = ( + "Use timing oracle for service discovery — response time correlates " + "with target response time. Compare fast (responding service) vs " + "slow (non-responding IP) to identify live services." + ) + else: + recommended = ( + "Use body differential for service discovery — different response " + "body sizes indicate the SSRF target's response affects the output." + ) + + return json.dumps({ + "type": "blind_ssrf", + "ssrf_endpoint": ssrf_url, + "oracles": { + "retry": retry_oracle, + "timing": timing_oracle, + "status_differential": status_oracle, + "body_differential": body_oracle, + }, + "oracles_detected": oracles_detected, + "recommended_approach": recommended, + "baseline": baseline, + "total_probes_sent": 7, + }) + + # --- HTTP Request Smuggling Detection (MCP-side, direct HTTP) --- + + @mcp.tool() + async def test_request_smuggling( + target_url: str, + timeout: int = 10, + ) -> str: + """Test for HTTP request smuggling vulnerabilities by probing for parser + discrepancies between front-end proxies and back-end servers. No sandbox required. + + Tests CL.TE, TE.CL, TE.TE, and TE.0 variants. Also detects proxy/CDN + stack via fingerprinting headers. + + target_url: base URL to test (e.g. "https://example.com") + timeout: seconds to wait per probe (default 10, higher values detect timing-based smuggling) + + Use during reconnaissance when the target is behind a CDN or reverse proxy. + Load the 'request_smuggling' skill for detailed exploitation guidance.""" + import httpx + + base = target_url.rstrip("/") + results: dict[str, Any] = { + "target_url": target_url, + "proxy_stack": {}, + "baseline": {}, + "probes": [], + "te_obfuscation_results": [], + "summary": {"potential_vulnerabilities": 0, "tested_variants": 0}, + "note": ( + "httpx may normalize Content-Length and Transfer-Encoding headers. " + "Results marked 'potential' should be confirmed with raw socket probes." + ), + } + + # CDN/proxy signature headers to look for + cdn_signatures: dict[str, str] = { + "cf-ray": "cloudflare", + "x-amz-cf-id": "cloudfront", + "x-akamai-transformed": "akamai", + "x-fastly-request-id": "fastly", + "x-varnish": "varnish", + } + proxy_headers = [ + "server", "via", "x-served-by", "x-cache", "x-cache-hits", + "cf-ray", "x-amz-cf-id", "x-akamai-transformed", + "x-fastly-request-id", "x-varnish", + ] + + async with httpx.AsyncClient( + timeout=timeout, + follow_redirects=False, + http1=True, + http2=False, + ) as client: + + # --- Phase 1: Baseline + proxy fingerprinting --- + try: + t0 = time.monotonic() + baseline_resp = await client.get( + base, + headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}, + ) + baseline_time_ms = round((time.monotonic() - t0) * 1000) + + results["baseline"] = { + "status": baseline_resp.status_code, + "response_time_ms": baseline_time_ms, + } + + # Collect proxy stack info + proxy_stack: dict[str, str] = {} + detected_cdn: str | None = None + for hdr in proxy_headers: + val = baseline_resp.headers.get(hdr) + if val: + proxy_stack[hdr] = val + if hdr in cdn_signatures: + detected_cdn = cdn_signatures[hdr] + if detected_cdn: + proxy_stack["cdn"] = detected_cdn + results["proxy_stack"] = proxy_stack + + except Exception as e: + results["baseline"] = {"error": str(e)} + return json.dumps(results) + + baseline_status = baseline_resp.status_code + baseline_ms = baseline_time_ms + + # Helper: send a probe and classify + async def _probe( + variant: str, + headers: dict[str, str], + body: bytes, + ) -> dict[str, Any]: + probe_result: dict[str, Any] = { + "variant": variant, + "status": "not_vulnerable", + "evidence": "", + } + try: + t0 = time.monotonic() + resp = await client.post( + base, + headers={ + "User-Agent": "Mozilla/5.0", + **headers, + }, + content=body, + ) + elapsed_ms = round((time.monotonic() - t0) * 1000) + + # Detect anomalies + status_changed = resp.status_code != baseline_status + is_error = resp.status_code in (400, 500, 501, 502) + is_slow = elapsed_ms > (baseline_ms * 5 + 2000) + + if is_slow: + probe_result["status"] = "potential" + probe_result["evidence"] = ( + f"response timeout ({elapsed_ms}ms vs {baseline_ms}ms baseline)" + ) + elif is_error and not status_changed: + probe_result["evidence"] = ( + f"error status {resp.status_code} (same as baseline)" + ) + elif status_changed and is_error: + probe_result["status"] = "potential" + probe_result["evidence"] = ( + f"status changed to {resp.status_code} " + f"(baseline {baseline_status})" + ) + else: + probe_result["evidence"] = ( + f"normal {resp.status_code} response in {elapsed_ms}ms" + ) + + probe_result["response_status"] = resp.status_code + probe_result["response_time_ms"] = elapsed_ms + + except httpx.ReadTimeout: + probe_result["status"] = "potential" + probe_result["evidence"] = ( + f"read timeout ({timeout}s) — back-end may be waiting for more data" + ) + except Exception as e: + probe_result["status"] = "error" + probe_result["evidence"] = str(e) + + return probe_result + + # --- Phase 2: CL.TE probe --- + # Front-end uses Content-Length, back-end uses Transfer-Encoding. + # CL says 4 bytes, but TE body is longer — leftover poisons next request. + clte_body = b"1\r\nZ\r\n0\r\n\r\n" + clte_result = await _probe( + "CL.TE", + { + "Content-Length": "4", + "Transfer-Encoding": "chunked", + }, + clte_body, + ) + results["probes"].append(clte_result) + + # --- Phase 3: TE.CL probe --- + # Front-end uses Transfer-Encoding, back-end uses Content-Length. + # TE ends at chunk 0, but CL includes extra bytes. + tecl_body = b"0\r\n\r\nSMUGGLED" + tecl_result = await _probe( + "TE.CL", + { + "Content-Length": "50", + "Transfer-Encoding": "chunked", + }, + tecl_body, + ) + results["probes"].append(tecl_result) + + # --- Phase 4: TE.TE obfuscation variants --- + # NOTE: dual TE header probes may not work as intended — httpx + # normalizes header names to lowercase, merging duplicate keys. + # Results for dual_te_* variants should be confirmed with raw sockets. + te_obfuscations: list[tuple[str, dict[str, str]]] = [ + ("xchunked", {"Transfer-Encoding": "xchunked"}), + ("space_before_colon", {"Transfer-Encoding ": "chunked"}), + ("tab_after_colon", {"Transfer-Encoding": "\tchunked"}), + ("dual_te_chunked_x", {"Transfer-Encoding": "chunked", "Transfer-encoding": "x"}), + ("dual_te_chunked_cow", {"Transfer-Encoding": "chunked", "Transfer-encoding": "cow"}), + ] + + for label, te_headers in te_obfuscations: + te_result = await _probe( + f"TE.TE ({label})", + { + "Content-Length": "4", + **te_headers, + }, + b"1\r\nZ\r\n0\r\n\r\n", + ) + results["te_obfuscation_results"].append(te_result) + + # --- Phase 5: TE.0 probe --- + # Send TE:chunked with CL:0 but include chunked data — if front-end + # strips TE and uses CL:0, the chunked data stays in the pipeline. + te0_result = await _probe( + "TE.0", + { + "Transfer-Encoding": "chunked", + "Content-Length": "0", + }, + b"1\r\nZ\r\n0\r\n\r\n", + ) + results["probes"].append(te0_result) + + # --- Summary --- + all_probes = results["probes"] + results["te_obfuscation_results"] + results["summary"]["tested_variants"] = len(all_probes) + results["summary"]["potential_vulnerabilities"] = sum( + 1 for p in all_probes if p["status"] == "potential" + ) + + return json.dumps(results) + + # --- Web Cache Poisoning / Cache Deception Detection (MCP-side, direct HTTP) --- + + @mcp.tool() + async def test_cache_poisoning( + target_url: str, + paths: list[str] | None = None, + ) -> str: + """Test for web cache poisoning by systematically probing unkeyed headers + and cache deception via parser discrepancies. No sandbox required. + + Tests unkeyed headers (X-Forwarded-Host, X-Forwarded-Scheme, etc.) and + cache deception paths (appending .css/.js/.png to authenticated endpoints). + + target_url: base URL to test + paths: specific paths to test (default: /, /login, /account, /api) + + Load the 'cache_poisoning' skill for detailed exploitation guidance.""" + import httpx + + base = target_url.rstrip("/") + test_paths = paths or ["/", "/login", "/account", "/api"] + + results: dict[str, Any] = { + "target_url": target_url, + "cache_detected": False, + "cache_type": None, + "unkeyed_headers": [], + "cache_deception": [], + "summary": {"poisoning_vectors": 0, "deception_vectors": 0, "total_probes": 0}, + } + + # Cache detection header mapping + cache_indicators = { + "x-cache": None, + "cf-cache-status": "cloudflare", + "age": None, + "x-cache-hits": None, + "x-varnish": "varnish", + } + + # Unkeyed headers to test with their canary values + unkeyed_probes: list[tuple[str, str, str]] = [ + ("X-Forwarded-Host", "canary.example.com", "body"), + ("X-Forwarded-Scheme", "nothttps", "redirect"), + ("X-Forwarded-Proto", "nothttps", "redirect"), + ("X-Original-URL", "/canary-path", "body"), + ("X-Rewrite-URL", "/canary-path", "body"), + ("X-HTTP-Method-Override", "POST", "behavior"), + ("X-Forwarded-Port", "1337", "body"), + ("X-Custom-IP-Authorization", "127.0.0.1", "body"), + ] + + # Cache deception extensions and parser tricks + deception_extensions = [".css", ".js", ".png", ".svg", "/style.css", "/x.js"] + parser_tricks = [";.css", "%0A.css", "%00.css"] + deception_paths = ["/account", "/profile", "/settings", "/dashboard", "/me"] + + async with httpx.AsyncClient( + timeout=15, + follow_redirects=False, + http1=True, + http2=False, + ) as client: + + ua_headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + } + + # --- Phase 1: Cache detection --- + # Send two identical requests and compare caching headers + try: + resp1 = await client.get(base + test_paths[0], headers=ua_headers) + resp2 = await client.get(base + test_paths[0], headers=ua_headers) + + cache_type: str | None = None + cache_detected = False + + for hdr, cdn_name in cache_indicators.items(): + val1 = resp1.headers.get(hdr) + val2 = resp2.headers.get(hdr) + + if val2: + # Check for cache HIT indicators + if hdr == "x-cache" and "hit" in val2.lower(): + cache_detected = True + elif hdr == "cf-cache-status" and val2.upper() in ("HIT", "DYNAMIC", "REVALIDATED"): + cache_detected = True + cache_type = "cloudflare" + elif hdr == "age": + try: + if int(val2) > 0: + cache_detected = True + except ValueError: + pass + elif hdr == "x-cache-hits": + try: + if int(val2) > 0: + cache_detected = True + except ValueError: + pass + elif hdr == "x-varnish": + # Two IDs in x-varnish means cached + if len(val2.split()) >= 2: + cache_detected = True + cache_type = "varnish" + + if cdn_name and not cache_type: + cache_type = cdn_name + + # Also detect cache from Cache-Control / Pragma + cc = resp2.headers.get("cache-control", "") + if "public" in cc or ("max-age=" in cc and "max-age=0" not in cc and "no-cache" not in cc): + cache_detected = True + + results["cache_detected"] = cache_detected + results["cache_type"] = cache_type + + except Exception as e: + results["cache_deception"].append({"error": f"Cache detection failed: {e}"}) + + # --- Phase 2: Unkeyed header testing --- + probe_count = 0 + for header_name, canary_value, reflection_type in unkeyed_probes: + for path in test_paths: + probe_count += 1 + entry: dict[str, Any] = { + "header": header_name, + "path": path, + "reflected": False, + "cached": False, + "severity": None, + "reflection_location": None, + } + + # Use a cache buster so each probe is independent + cache_buster = f"cb={uuid.uuid4().hex[:8]}" + sep = "&" if "?" in path else "?" + probe_url = f"{base}{path}{sep}{cache_buster}" + + try: + resp = await client.get( + probe_url, + headers={ + **ua_headers, + header_name: canary_value, + }, + ) + + body = resp.text + location = resp.headers.get("location", "") + set_cookie = resp.headers.get("set-cookie", "") + + # Check reflection + reflected = False + reflection_loc = None + + if canary_value in body: + reflected = True + reflection_loc = "body" + elif canary_value in location: + reflected = True + reflection_loc = "location_header" + elif canary_value in set_cookie: + reflected = True + reflection_loc = "set_cookie" + elif header_name == "X-Forwarded-Scheme" and resp.status_code in (301, 302): + # Redirect often means the scheme header was processed + if "https" in location or canary_value in location: + reflected = True + reflection_loc = "redirect" + elif header_name == "X-Forwarded-Proto" and resp.status_code in (301, 302): + reflected = True + reflection_loc = "redirect" + + entry["reflected"] = reflected + entry["reflection_location"] = reflection_loc + + # Check if cached + is_cached = False + x_cache = resp.headers.get("x-cache", "") + cf_status = resp.headers.get("cf-cache-status", "") + age = resp.headers.get("age", "") + + if "hit" in x_cache.lower(): + is_cached = True + elif cf_status.upper() in ("HIT", "REVALIDATED"): + is_cached = True + elif age: + try: + is_cached = int(age) > 0 + except ValueError: + pass + + entry["cached"] = is_cached + + if reflected and is_cached: + entry["severity"] = "high" + elif reflected: + entry["severity"] = "medium" + + except Exception as e: + entry["error"] = str(e) + + # Only record interesting results (reflected or errors) + if entry.get("reflected") or entry.get("error"): + results["unkeyed_headers"].append(entry) + + # --- Phase 3: Cache deception testing --- + for path in deception_paths: + # First get the baseline for this path + try: + baseline_resp = await client.get( + f"{base}{path}", + headers=ua_headers, + ) + baseline_status = baseline_resp.status_code + baseline_length = len(baseline_resp.text) + # Skip if path returns 404 — nothing to deceive + if baseline_status == 404: + continue + except Exception: + continue + + for ext in deception_extensions + parser_tricks: + probe_count += 1 + deception_url = f"{base}{path}{ext}" + + deception_entry: dict[str, Any] = { + "path": f"{path}{ext}", + "returns_dynamic_content": False, + "cached": False, + "severity": None, + } + + try: + resp = await client.get(deception_url, headers=ua_headers) + + # Check if it returns content similar to the original path + resp_length = len(resp.text) + is_dynamic = ( + resp.status_code == baseline_status + and resp.status_code != 404 + and resp_length > 100 + and abs(resp_length - baseline_length) / max(baseline_length, 1) < 0.5 + ) + + deception_entry["returns_dynamic_content"] = is_dynamic + deception_entry["response_status"] = resp.status_code + + # Check caching + is_cached = False + cc = resp.headers.get("cache-control", "") + x_cache = resp.headers.get("x-cache", "") + cf_status = resp.headers.get("cf-cache-status", "") + age = resp.headers.get("age", "") + + if "hit" in x_cache.lower(): + is_cached = True + elif cf_status.upper() in ("HIT", "REVALIDATED"): + is_cached = True + elif age: + try: + is_cached = int(age) > 0 + except ValueError: + pass + elif "public" in cc or ("max-age=" in cc and "max-age=0" not in cc and "no-cache" not in cc): + is_cached = True + + deception_entry["cached"] = is_cached + + if is_dynamic and is_cached: + deception_entry["severity"] = "high" + elif is_dynamic: + deception_entry["severity"] = "low" + + except Exception as e: + deception_entry["error"] = str(e) + + # Only record interesting results + if deception_entry.get("returns_dynamic_content") or deception_entry.get("error"): + results["cache_deception"].append(deception_entry) + + # --- Summary --- + results["summary"]["poisoning_vectors"] = sum( + 1 for h in results["unkeyed_headers"] + if h.get("reflected") and h.get("cached") + ) + results["summary"]["deception_vectors"] = sum( + 1 for d in results["cache_deception"] + if d.get("returns_dynamic_content") and d.get("cached") + ) + results["summary"]["total_probes"] = probe_count + + return json.dumps(results) diff --git a/strix-mcp/src/strix_mcp/tools_helpers.py b/strix-mcp/src/strix_mcp/tools_helpers.py new file mode 100644 index 000000000..a6ab40bed --- /dev/null +++ b/strix-mcp/src/strix_mcp/tools_helpers.py @@ -0,0 +1,357 @@ +"""Module-level helper functions and constants extracted from tools.py.""" +from __future__ import annotations + +import json +import re +from typing import Any +from urllib.parse import urljoin + +# --- Title normalization for deduplication --- + +_TITLE_SYNONYMS: dict[str, str] = { + "content-security-policy": "csp", + "content security policy": "csp", + "cross-site request forgery": "csrf", + "cross site request forgery": "csrf", + "cross-site scripting": "xss", + "cross site scripting": "xss", + "server-side request forgery": "ssrf", + "server side request forgery": "ssrf", + "sql injection": "sqli", + "nosql injection": "nosqli", + "xml external entity": "xxe", + "remote code execution": "rce", + "insecure direct object reference": "idor", + "broken access control": "bac", + "missing x-frame-options": "x-frame-options missing", + "x-content-type-options missing": "x-content-type-options missing", + "strict-transport-security missing": "hsts missing", + "missing hsts": "hsts missing", + "missing strict-transport-security": "hsts missing", +} + + +def _normalize_title(title: str) -> str: + """Normalize a vulnerability title for deduplication.""" + t = title.lower().strip() + t = " ".join(t.split()) + for synonym, canonical in sorted( + _TITLE_SYNONYMS.items(), key=lambda x: -len(x[0]) + ): + t = t.replace(synonym, canonical) + return t + + +def _find_duplicate( + normalized_title: str, reports: list[dict[str, Any]] +) -> int | None: + """Find index of an existing report with the same normalized title.""" + for i, report in enumerate(reports): + if _normalize_title(report["title"]) == normalized_title: + return i + return None + + +# --- OWASP Top 10 (2021) categorization --- + +_OWASP_KEYWORDS: list[tuple[str, list[str]]] = [ + ("A01 Broken Access Control", [ + "idor", "bac", "broken access", "insecure direct object", + "privilege escalation", "path traversal", "directory traversal", + "forced browsing", "cors", "missing access control", + "open redirect", "unauthorized access", "access control", + "subdomain takeover", + ]), + ("A02 Cryptographic Failures", [ + "weak cipher", "weak encryption", "cleartext", "plain text password", + "insecure tls", "ssl", "certificate", "weak hash", + ]), + ("A03 Injection", [ + "sqli", "sql injection", "nosql injection", "xss", "cross-site scripting", + "command injection", "xxe", "xml external entity", "ldap injection", + "xpath injection", "template injection", "ssti", "crlf injection", + "header injection", "rce", "remote code execution", "code injection", + "prototype pollution", + ]), + ("A04 Insecure Design", [ + "business logic", "race condition", "mass assignment", + "insecure design", "missing rate limit", + ]), + ("A05 Security Misconfiguration", [ + "misconfiguration", "missing csp", "csp", "missing header", + "x-frame-options", "x-content-type", "hsts", "strict-transport", + "server information", "debug mode", "default credential", + "directory listing", "stack trace", "verbose error", + "sentry", "source map", "security header", + "information disclosure", "exposed env", "actuator exposed", + "swagger exposed", "phpinfo", "server version", + ]), + ("A06 Vulnerable and Outdated Components", [ + "outdated", "vulnerable component", "known vulnerability", + "cve-", "end of life", + ]), + ("A07 Identification and Authentication Failures", [ + "jwt", "authentication", "session", "credential", "password", + "brute force", "session fixation", "token", "oauth", "2fa", "mfa", + ]), + ("A08 Software and Data Integrity Failures", [ + "deserialization", "integrity", "unsigned", "untrusted data", + "ci/cd", "auto-update", + ]), + ("A09 Security Logging and Monitoring Failures", [ + "logging", "monitoring", "audit", "insufficient logging", + ]), + ("A10 Server-Side Request Forgery", [ + "ssrf", "server-side request forgery", + ]), +] + + +def _categorize_owasp(title: str) -> str: + """Map a vulnerability title to an OWASP Top 10 (2021) category.""" + title_lower = title.lower() + for category, keywords in _OWASP_KEYWORDS: + if any(kw in title_lower for kw in keywords): + return category + return "Other" + + +_SEVERITY_ORDER = ["info", "low", "medium", "high", "critical"] + +VALID_NOTE_CATEGORIES = ["general", "findings", "methodology", "questions", "plan", "recon"] + + +def _normalize_severity(severity: str) -> str: + """Normalize severity to a known value, defaulting to 'info'.""" + normed = severity.lower().strip() if severity else "info" + return normed if normed in _SEVERITY_ORDER else "info" + + +# --- Nuclei JSONL parsing --- + +def parse_nuclei_jsonl(jsonl: str) -> list[dict[str, Any]]: + """Parse nuclei JSONL output into structured findings. + + Each valid line becomes a dict with keys: template_id, url, severity, name, description. + Malformed lines are silently skipped. + """ + findings: list[dict[str, Any]] = [] + for line in jsonl.strip().splitlines(): + line = line.strip() + if not line: + continue + try: + data = json.loads(line) + except json.JSONDecodeError: + continue + info = data.get("info", {}) + findings.append({ + "template_id": data.get("template-id", "unknown"), + "url": data.get("matched-at", ""), + "severity": data.get("severity", "info"), + "name": info.get("name", ""), + "description": info.get("description", ""), + }) + return findings + + +def build_nuclei_command( + target: str, + severity: str, + rate_limit: int, + templates: list[str] | None, + output_file: str, +) -> str: + """Build a nuclei CLI command string.""" + parts = [ + "nuclei", + f"-u {target}", + f"-severity {severity}", + f"-rate-limit {rate_limit}", + "-jsonl", + f"-o {output_file}", + "-stats", # show progress stats on stderr + "-stats-interval 10", # every 10 seconds + "-no-httpx", # skip httpx probe (target already known live) + "-env-vars=false", # bypass system proxy for direct scanning + ] + if templates: + for t in templates: + parts.append(f"-t {t}") + else: + # Default: use focused template tags instead of loading all 2000+ + # These cover the highest-value checks without the full scan overhead + parts.append("-tags exposure,misconfig,cve,takeover,default-login,token") + return " ".join(parts) + + +# --- Source map discovery helpers --- + + +def extract_script_urls(html: str, base_url: str) -> list[str]: + """Extract absolute URLs of