Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/agent_bom/runtime/detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
DANGEROUS_ARG_PATTERNS,
RESPONSE_BASE64_PATTERN,
RESPONSE_CLOAKING_PATTERNS,
RESPONSE_INJECTION_PATTERNS,
RESPONSE_INVISIBLE_CHARS,
RESPONSE_SVG_PATTERNS,
SUSPICIOUS_SEQUENCES,
Expand Down Expand Up @@ -383,4 +384,104 @@ def check(self, tool_name: str, response_text: str) -> list[Alert]:
)
)

# Prompt injection patterns (cache poisoning / cross-agent injection)
for pattern_name, pattern in RESPONSE_INJECTION_PATTERNS:
matches = pattern.findall(response_text)
if matches:
alerts.append(
Alert(
detector="response_inspector",
severity=AlertSeverity.CRITICAL,
message=f"Prompt injection detected: {pattern_name} in response from {tool_name}",
details={
"tool": tool_name,
"pattern": pattern_name,
"category": "prompt_injection",
"match_count": len(matches),
"preview": matches[0][:120] if matches else "",
},
)
)

return alerts


# ─── Vector DB Injection Detector ────────────────────────────────────────────


class VectorDBInjectionDetector:
"""Detect prompt injection in vector DB / RAG retrieval responses.

Vector databases are a cache poisoning attack surface: an attacker who
can write to the vector store (or poison upstream documents) can inject
instructions that the LLM will execute when the agent retrieves context.

This detector identifies tool calls that look like vector DB retrievals
(similarity_search, query, retrieve, search, fetch_context, etc.) and
applies full prompt injection scanning to their responses.

See also: ToxicPattern.CACHE_POISON and ToxicPattern.CROSS_AGENT_POISON
in toxic_combos.py.
"""

# Tool name patterns that indicate a vector DB / RAG retrieval
_VECTOR_TOOL_PATTERNS = re.compile(
r"(?:similarity[_\s]search|semantic[_\s]search|vector[_\s](?:search|query|lookup)|"
r"retriev(?:e|al)|fetch[_\s](?:context|docs?|chunks?)|rag[_\s](?:query|search)|"
r"search[_\s](?:docs?|knowledge|embeddings?)|query[_\s](?:index|store|db|database)|"
r"get[_\s]context|lookup[_\s](?:docs?|knowledge))",
re.IGNORECASE,
)

def __init__(self) -> None:
self._inspector = ResponseInspector()

def is_vector_tool(self, tool_name: str) -> bool:
"""Return True if tool_name looks like a vector DB retrieval tool."""
return bool(self._VECTOR_TOOL_PATTERNS.search(tool_name))

def check(self, tool_name: str, response_text: str) -> list[Alert]:
"""Check a tool response for prompt injection (cache poisoning).

Always runs injection pattern checks regardless of tool name.
If the tool looks like a vector DB retrieval, also runs the full
ResponseInspector suite and upgrades severity to CRITICAL.
"""
alerts: list[Alert] = []

# Injection patterns — always check
for pattern_name, pattern in RESPONSE_INJECTION_PATTERNS:
matches = pattern.findall(response_text)
if matches:
is_vector = self.is_vector_tool(tool_name)
alerts.append(
Alert(
detector="vector_db_injection",
severity=AlertSeverity.CRITICAL,
message=(
f"{'Cache poisoning' if is_vector else 'Content injection'} detected: "
f"{pattern_name} in {'vector DB retrieval' if is_vector else 'tool response'} "
f"from {tool_name}"
),
details={
"tool": tool_name,
"pattern": pattern_name,
"category": "cache_poison" if is_vector else "content_injection",
"is_vector_tool": is_vector,
"match_count": len(matches),
"preview": matches[0][:120] if matches else "",
},
)
)

# For confirmed vector tools also run full cloaking/SVG/invisible checks
if self.is_vector_tool(tool_name):
for alert in self._inspector.check(tool_name, response_text):
# Re-tag detector and upgrade severity
alert.detector = "vector_db_injection"
if alert.severity == AlertSeverity.HIGH:
alert.severity = AlertSeverity.CRITICAL
alert.details["category"] = "cache_poison_" + alert.details.get("category", "unknown")
alerts.append(alert)

return alerts
62 changes: 62 additions & 0 deletions src/agent_bom/runtime/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,68 @@
RESPONSE_BASE64_PATTERN = re.compile(r"(?:^|[^A-Za-z0-9+/])([A-Za-z0-9+/]{60,}={0,2})(?:$|[^A-Za-z0-9+/])")


# ─── Prompt injection patterns in tool responses ──────────────────────────────

# Patterns that indicate a tool response (e.g. from a vector DB retrieval or
# RAG context fetch) is attempting to inject instructions into the LLM.
# Used by ResponseInspector to detect cache poisoning and cross-agent injection.
RESPONSE_INJECTION_PATTERNS: list[tuple[str, re.Pattern]] = [
# Role / persona overrides
(
"Role override",
re.compile(
r"\b(?:ignore|disregard|forget|override)\b.{0,40}\b(?:instructions?|system\s+prompt|previous|above|rules?|constraints?)\b",
re.IGNORECASE,
),
),
(
"System prompt injection",
re.compile(
r"<(?:system|assistant|user|im_start|im_end)[>\s]",
re.IGNORECASE,
),
),
(
"Jailbreak trigger",
re.compile(
r"\b(?:DAN|jailbreak|do\s+anything\s+now|developer\s+mode|god\s+mode|unrestricted\s+mode|sudo\s+mode)\b",
re.IGNORECASE,
),
),
# Instruction injection
(
"Instruction injection",
re.compile(
r"\b(?:new\s+instruction|additional\s+instruction|important\s+instruction|secret\s+instruction|hidden\s+instruction)\b",
re.IGNORECASE,
),
),
(
"Task hijack",
re.compile(
r"\b(?:instead(?:\s+of)?|actually|your\s+real\s+task|your\s+actual\s+(?:goal|purpose|job)|from\s+now\s+on)\b.{0,60}\b(?:you\s+(?:must|should|will|are\s+to)|please|task)\b",
re.IGNORECASE,
),
),
# Exfiltration instructions embedded in content
(
"Exfil instruction",
re.compile(
r"\b(?:send|post|forward|transmit|upload|exfiltrate)\b.{0,60}\b(?:this\s+(?:conversation|context|data|prompt)|user\s+data|api\s+key|token|secret)\b",
re.IGNORECASE,
),
),
# Prompt delimiter attacks
(
"Prompt delimiter attack",
re.compile(
r"(?:###\s*(?:SYSTEM|INSTRUCTION|CONTEXT)|---\s*(?:SYSTEM|NEW\s+PROMPT)|={3,}\s*(?:SYSTEM|INSTRUCTION))",
re.IGNORECASE,
),
),
]


# ─── Suspicious tool call sequences ──────────────────────────────────────────

# (sequence_name, [tool_name_patterns], description)
Expand Down
162 changes: 152 additions & 10 deletions src/agent_bom/toxic_combos.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class ToxicPattern(str, Enum):
MULTI_AGENT_CVE = "multi_agent_cve"
KEV_WITH_CREDS = "kev_with_credentials"
TRANSITIVE_CRITICAL = "transitive_critical"
CACHE_POISON = "cache_poison"
CROSS_AGENT_POISON = "cross_agent_poison"


@dataclass
Expand All @@ -51,17 +53,20 @@ def detect_toxic_combinations(
"""
combos: list[ToxicCombination] = []

if not report.blast_radii:
return combos

combos.extend(_detect_cred_blast(report.blast_radii))
combos.extend(_detect_kev_with_creds(report.blast_radii))
combos.extend(_detect_execute_exploit(report.blast_radii))
combos.extend(_detect_multi_agent_cve(report.blast_radii))
combos.extend(_detect_transitive_critical(report.blast_radii))

if report.blast_radii:
combos.extend(_detect_cred_blast(report.blast_radii))
combos.extend(_detect_kev_with_creds(report.blast_radii))
combos.extend(_detect_execute_exploit(report.blast_radii))
combos.extend(_detect_multi_agent_cve(report.blast_radii))
combos.extend(_detect_transitive_critical(report.blast_radii))
# Cache poison can be detected from tool names alone — no context required
combos.extend(_detect_cache_poison(report.blast_radii, context_graph_data or {}))
if context_graph_data:
combos.extend(_detect_lateral_chain(report.blast_radii, context_graph_data))

# Context-graph-based detectors run even without blast_radii (structural risk)
if context_graph_data:
combos.extend(_detect_lateral_chain(report.blast_radii, context_graph_data))
combos.extend(_detect_cross_agent_poison(report.blast_radii, context_graph_data))

# Deduplicate by (pattern, title)
seen: set[tuple[str, str]] = set()
Expand Down Expand Up @@ -313,6 +318,143 @@ def _detect_lateral_chain(
return results


def _detect_cross_agent_poison(
blast_radii: list[BlastRadius],
context_graph_data: dict,
) -> list[ToxicCombination]:
"""Detect cross-agent injection: one agent can write to a shared resource read by another.

Attack pattern: Agent A has a write-capable tool on a shared MCP server.
Agent B has a read/retrieval tool on the same server. Agent A can poison
the shared context that Agent B will later consume.
"""
shared_servers = context_graph_data.get("shared_servers", [])
if not shared_servers:
return []

results = []
for server_info in shared_servers:
server_name = server_info.get("name", "") if isinstance(server_info, dict) else str(server_info)
agents = server_info.get("agents", []) if isinstance(server_info, dict) else []
tools = server_info.get("tools", []) if isinstance(server_info, dict) else []

if len(agents) < 2:
continue

# Check for write + read tool pair on the same shared server
write_tools = [
t
for t in tools
if any(kw in str(t).lower() for kw in ("write", "insert", "store", "save", "create", "add", "index", "upsert", "embed"))
]
read_tools = [
t
for t in tools
if any(kw in str(t).lower() for kw in ("read", "search", "query", "retrieve", "fetch", "get", "lookup", "similarity"))
]

if not (write_tools and read_tools):
continue

agent_names = ", ".join(str(a) for a in agents[:4])
write_names = ", ".join(str(t) for t in write_tools[:2])
read_names = ", ".join(str(t) for t in read_tools[:2])

results.append(
ToxicCombination(
pattern=ToxicPattern.CROSS_AGENT_POISON,
severity="high",
title=f"Cross-Agent Poison: shared server '{server_name}' has write+read tool pair",
description=(
f"Server '{server_name}' is shared by {len(agents)} agents ({agent_names}) and "
f"exposes both write tools ({write_names}) and read/retrieval tools ({read_names}). "
f"An agent or external attacker that can invoke write tools can poison the shared "
f"context consumed by other agents via read tools."
),
components=[
{"type": "server", "id": server_name, "label": "shared"},
*[{"type": "agent", "id": str(a), "label": "affected"} for a in agents[:4]],
*[{"type": "tool", "id": str(t), "label": "write"} for t in write_tools[:2]],
*[{"type": "tool", "id": str(t), "label": "read"} for t in read_tools[:2]],
],
risk_score=8.0,
remediation=(
f"Restrict write access to '{server_name}' to trusted agents only. "
f"Add input validation and content scanning on write tools. "
f"Consider separate servers per agent to eliminate the shared surface."
),
)
)
return results


def _detect_cache_poison(
blast_radii: list[BlastRadius],
context_graph_data: dict,
) -> list[ToxicCombination]:
"""Detect cache poisoning: CVE in a package + vector DB / RAG retrieval tool exposure.

When a vulnerable package backs an MCP server that exposes retrieval tools
(similarity search, RAG query), an attacker can exploit the CVE to inject
malicious content into the vector store, poisoning the LLM's retrieved context.
"""
vector_servers = context_graph_data.get("vector_db_servers", [])
vector_server_names: set[str] = {(s.get("name", "") if isinstance(s, dict) else str(s)) for s in vector_servers}

# Also infer from tool names if vector_db_servers not populated
results = []
for br in blast_radii:
if br.vulnerability.severity.value not in ("critical", "high"):
continue

# Check if any exposed tool looks like a vector/RAG retrieval tool
retrieval_tools = [
t
for t in br.exposed_tools
if any(
kw in (t.name + " " + (t.description or "")).lower()
for kw in ("similarity", "semantic", "retriev", "embedding", "vector", "rag", "context", "knowledge")
)
]
# Or check if the affected server is a known vector DB server
vector_affected = [s for s in br.affected_servers if s.name in vector_server_names]

if not retrieval_tools and not vector_affected:
continue

tool_names = ", ".join(t.name for t in retrieval_tools[:3])
server_names = ", ".join(s.name for s in vector_affected[:2])
target_label = tool_names or server_names

results.append(
ToxicCombination(
pattern=ToxicPattern.CACHE_POISON,
severity="critical",
title=f"Cache Poison: {br.vulnerability.id} + RAG/vector retrieval ({target_label})",
description=(
f"{br.vulnerability.id} ({br.vulnerability.severity.value}) in {br.package.name}@{br.package.version} "
f"backs a server with RAG/vector retrieval tools ({target_label}). "
f"An attacker exploiting this CVE could inject malicious instructions into the "
f"vector store, poisoning LLM context on every retrieval query."
),
components=[
{"type": "cve", "id": br.vulnerability.id, "label": br.vulnerability.severity.value},
{"type": "package", "id": f"{br.package.name}@{br.package.version}", "label": "vector backend"},
*[{"type": "tool", "id": t.name, "label": "retrieval"} for t in retrieval_tools[:3]],
*[{"type": "server", "id": s.name, "label": "vector_db"} for s in vector_affected[:2]],
],
risk_score=min(br.risk_score * 1.5, 10.0) if br.risk_score else 9.5,
remediation=(
f"Patch {br.package.name} to {br.vulnerability.fixed_version or 'latest'}. "
f"Add content scanning on vector store writes. "
f"Enable authentication on vector DB endpoints. "
f"Implement retrieval output filtering before passing to LLM."
),
)
)
return results


# ---------------------------------------------------------------------------
# Prioritization
# ---------------------------------------------------------------------------
Expand Down
Loading