Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 124 additions & 79 deletions src/api/webhook.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,43 @@
"""Facebook webhook endpoints."""
"""Facebook webhook endpoints.

This module handles incoming Facebook Messenger webhooks with multiple
security layers and dependency injection support for testability.

Security layers (in order):
1. Rate limiting - prevents abuse from single users
2. Input validation - ensures message meets basic requirements
3. Prompt injection detection - blocks malicious manipulation attempts
4. Input sanitization - cleans input for safe processing

The webhook handlers focus on HTTP concerns and security validation,
delegating business logic to the MessageProcessor service.
"""

import logging
import random

from fastapi import APIRouter, BackgroundTasks, Request, Response
from fastapi.responses import PlainTextResponse

from src.config import get_settings
from src.db.repository import (
get_bot_configuration_by_page_id,
get_reference_document,
get_user_profile,
save_message_history,
update_user_profile,
upsert_user_profile,
)
from src.models.agent_models import AgentContext
from src.models.user_models import UserProfileCreate, UserProfileUpdate
from src.services.agent_service import MessengerAgentService
from src.services.facebook_service import get_user_info, send_message
from src.middleware.rate_limiter import RateLimiter, get_rate_limiter
from src.models.user_models import UserProfileUpdate
from src.services.facebook_service import send_message
from src.services.input_sanitizer import (
get_user_friendly_error,
sanitize_user_input,
validate_message,
)
from src.services.message_processor import (
BotConfigNotFoundError,
MessageProcessor,
ReferenceDocNotFoundError,
get_message_processor,
)
from src.services.prompt_guard import PromptInjectionDetector, get_prompt_guard

logger = logging.getLogger(__name__)
router = APIRouter()
Expand Down Expand Up @@ -82,86 +101,112 @@ async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
return {"status": "ok"}


async def process_message(page_id: str, sender_id: str, message_text: str):
"""Process incoming message and send response."""
async def process_message(
page_id: str,
sender_id: str,
message_text: str,
*,
processor: MessageProcessor | None = None,
rate_limiter: RateLimiter | None = None,
prompt_guard: PromptInjectionDetector | None = None,
):
"""Process incoming message and send response.

This function handles security validation and delegates business logic
to the MessageProcessor service. Supports dependency injection for testing.

Args:
page_id: Facebook Page ID that received the message
sender_id: Facebook user ID (PSID) who sent the message
message_text: The message text content
processor: Optional injected message processor (for testing)
rate_limiter: Optional injected rate limiter (for testing)
prompt_guard: Optional injected prompt guard (for testing)

Security checks performed in order:
1. Rate limiting - prevents abuse from single users
2. Input validation - ensures message meets basic requirements
3. Prompt injection detection - blocks malicious manipulation attempts
4. Input sanitization - cleans input for safe processing
"""
try:
bot_config = get_bot_configuration_by_page_id(page_id)
if not bot_config:
logger.error("No bot configuration found for page_id: %s", page_id)
# ======================================================================
# Security Layer 1: Rate Limiting
# ======================================================================
_rate_limiter = rate_limiter or get_rate_limiter()
if not _rate_limiter.check_rate_limit(sender_id):
logger.warning("Rate limit exceeded for user %s", sender_id)
# Optionally send a polite rate limit message
bot_config = get_bot_configuration_by_page_id(page_id)
if bot_config:
await send_message(
page_access_token=bot_config.facebook_page_access_token,
recipient_id=sender_id,
text="You're sending messages too quickly. Please wait a moment before sending another message.",
)
return

user_profile = get_user_profile(sender_id, page_id)
if not user_profile:
logger.info("New user %s, fetching profile from Facebook", sender_id)
fb_user_info = await get_user_info(
page_access_token=bot_config.facebook_page_access_token,
user_id=sender_id,
# ======================================================================
# Security Layer 2: Input Validation
# ======================================================================
validation_result = validate_message(message_text)
if not validation_result.is_valid:
logger.warning(
"Invalid message from %s: %s",
sender_id,
validation_result.error_code,
)
if fb_user_info:
new_profile = UserProfileCreate(
sender_id=sender_id,
page_id=page_id,
first_name=fb_user_info.first_name,
last_name=fb_user_info.last_name,
profile_pic=fb_user_info.profile_pic,
locale=fb_user_info.locale,
timezone=fb_user_info.timezone,
)
upsert_user_profile(new_profile)
user_profile = get_user_profile(sender_id, page_id)

ref_doc = get_reference_document(bot_config.reference_doc_id)
if not ref_doc:
logger.error("No reference document found: %s", bot_config.reference_doc_id)
# Send user-friendly error if appropriate
error_msg = get_user_friendly_error(validation_result.error_code)
if error_msg:
bot_config = get_bot_configuration_by_page_id(page_id)
if bot_config:
await send_message(
page_access_token=bot_config.facebook_page_access_token,
recipient_id=sender_id,
text=error_msg,
)
return

recent_messages: list[str] = []
user_name = user_profile.get("first_name") if user_profile else None
user_location = user_profile.get("location_title") if user_profile else None

context = AgentContext(
bot_config_id=bot_config.id,
reference_doc_id=bot_config.reference_doc_id,
reference_doc=ref_doc["content"],
tone=bot_config.tone,
recent_messages=recent_messages,
tenant_id=getattr(bot_config, "tenant_id", None),
user_name=user_name,
user_location=user_location,
)
# ======================================================================
# Security Layer 3: Prompt Injection Detection
# ======================================================================
_prompt_guard = prompt_guard or get_prompt_guard()
injection_result = _prompt_guard.check(message_text)

agent_service = MessengerAgentService()
response = await agent_service.respond(context, message_text)
if injection_result.is_suspicious and injection_result.risk_level == "high":
Comment on lines +171 to +177
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prompt injection detection runs on the raw message_text (before sanitization). Since sanitize_user_input removes control characters and normalizes whitespace, doing the guard check before sanitization can allow trivial bypasses (e.g., inserting control chars into "ignore\x00 previous\x00 instructions"). Consider sanitizing first and running prompt_guard.check() on the sanitized text (and passing the sanitized text into the processor).

Copilot uses AI. Check for mistakes.
logger.warning(
"Blocked high-risk prompt injection from %s: %s",
sender_id,
injection_result.matched_pattern,
)
# Don't process, but don't reveal why to avoid helping attackers
return

response_text = response.message
if user_name and response.confidence > 0.8:
if not response_text.startswith(user_name) and random.random() < 0.2:
response_text = f"Hi {user_name}! {response_text}"
# Log medium-risk patterns but allow processing
if injection_result.is_suspicious and injection_result.risk_level == "medium":
logger.info(
"Medium-risk pattern detected from %s: %s (proceeding)",
sender_id,
injection_result.matched_pattern,
)

await send_message(
page_access_token=bot_config.facebook_page_access_token,
recipient_id=sender_id,
text=response_text,
)
# ======================================================================
# Security Layer 4: Input Sanitization
# ======================================================================
sanitized_message = sanitize_user_input(message_text)

save_message_history(
bot_id=bot_config.id,
sender_id=sender_id,
message_text=message_text,
response_text=response_text,
confidence=response.confidence,
requires_escalation=response.requires_escalation,
user_profile_id=user_profile["id"] if user_profile else None,
)
# ======================================================================
# Main Processing - Delegate to MessageProcessor
# ======================================================================
_processor = processor or get_message_processor()

logger.info(
"Processed message for page %s: user_name=%s, location=%s, confidence=%s, escalation=%s",
page_id,
user_name or "unknown",
user_location or "unknown",
response.confidence,
response.requires_escalation,
)
try:
await _processor.process(page_id, sender_id, sanitized_message)
except BotConfigNotFoundError:
logger.error("No bot configuration found for page_id: %s", page_id)
except ReferenceDocNotFoundError as e:
logger.error("Reference document not found: %s", e)

except Exception as e:
logger.error("Error processing message: %s", e, exc_info=True)
Expand Down
51 changes: 49 additions & 2 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from src.constants import (
BROWSER_JS_REFETCH_TIMEOUT_SECONDS,
BROWSER_PAGE_LOAD_TIMEOUT_SECONDS,
DEFAULT_EMBEDDING_DIMENSIONS,
DEFAULT_HTTP_TIMEOUT_SECONDS,
DEFAULT_SEARCH_RESULT_LIMIT,
FACEBOOK_API_TIMEOUT_SECONDS,
MAX_MESSAGES_PER_USER_PER_MINUTE,
RATE_LIMIT_WINDOW_SECONDS,
)


class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
Expand Down Expand Up @@ -56,11 +67,11 @@ class Settings(BaseSettings):
description="Embedding model via PAIG (e.g. gateway/openai:text-embedding-3-small)",
)
embedding_dimensions: int = Field(
default=1536,
default=DEFAULT_EMBEDDING_DIMENSIONS,
description="Embedding vector dimension (matches text-embedding-3-small)",
)
search_result_limit: int = Field(
default=5,
default=DEFAULT_SEARCH_RESULT_LIMIT,
description="Max number of chunks to return from page search",
)

Expand All @@ -87,6 +98,42 @@ class Settings(BaseSettings):
default=None, description="Pydantic Logfire token for AI observability"
)

# ==========================================================================
# Timeout Configuration
# ==========================================================================
# All timeouts can be overridden via environment variables.
# Defaults are sourced from src/constants.py.

scraper_timeout_seconds: float = Field(
default=DEFAULT_HTTP_TIMEOUT_SECONDS,
description="HTTP timeout for scraper requests (seconds)",
)
facebook_api_timeout_seconds: float = Field(
default=FACEBOOK_API_TIMEOUT_SECONDS,
description="Timeout for Facebook Graph API calls (seconds)",
)
browser_page_load_timeout_seconds: float = Field(
default=BROWSER_PAGE_LOAD_TIMEOUT_SECONDS,
description="Timeout for browser page loads (seconds)",
)
browser_js_refetch_timeout_seconds: float = Field(
default=BROWSER_JS_REFETCH_TIMEOUT_SECONDS,
description="Extended timeout for JS-rendered page refetch (seconds)",
)

# ==========================================================================
# Rate Limiting Configuration (from GUARDRAILS.md)
# ==========================================================================

rate_limit_max_messages: int = Field(
default=MAX_MESSAGES_PER_USER_PER_MINUTE,
description="Max messages per user per rate limit window",
)
rate_limit_window_seconds: int = Field(
default=RATE_LIMIT_WINDOW_SECONDS,
description="Rate limit sliding window duration in seconds",
)
Comment on lines +101 to +135
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Validate timeout and rate‑limit values are positive.
Environment overrides could set zero/negative values and break timeouts or the limiter. Consider adding gt=0 constraints.

♻️ Add positive-value constraints
     scraper_timeout_seconds: float = Field(
         default=DEFAULT_HTTP_TIMEOUT_SECONDS,
+        gt=0,
         description="HTTP timeout for scraper requests (seconds)",
     )
     facebook_api_timeout_seconds: float = Field(
         default=FACEBOOK_API_TIMEOUT_SECONDS,
+        gt=0,
         description="Timeout for Facebook Graph API calls (seconds)",
     )
     browser_page_load_timeout_seconds: float = Field(
         default=BROWSER_PAGE_LOAD_TIMEOUT_SECONDS,
+        gt=0,
         description="Timeout for browser page loads (seconds)",
     )
     browser_js_refetch_timeout_seconds: float = Field(
         default=BROWSER_JS_REFETCH_TIMEOUT_SECONDS,
+        gt=0,
         description="Extended timeout for JS-rendered page refetch (seconds)",
     )
@@
     rate_limit_max_messages: int = Field(
         default=MAX_MESSAGES_PER_USER_PER_MINUTE,
+        gt=0,
         description="Max messages per user per rate limit window",
     )
     rate_limit_window_seconds: int = Field(
         default=RATE_LIMIT_WINDOW_SECONDS,
+        gt=0,
         description="Rate limit sliding window duration in seconds",
     )
🤖 Prompt for AI Agents
In `@src/config.py` around lines 101 - 135, Add validation constraints to ensure
timeouts and rate-limit values are positive by adding gt=0 to the Field
declarations for scraper_timeout_seconds, facebook_api_timeout_seconds,
browser_page_load_timeout_seconds, browser_js_refetch_timeout_seconds (float
fields) and for rate_limit_max_messages and rate_limit_window_seconds (int
fields); update the Field(...) calls to include gt=0 so environment overrides
that are zero or negative will be rejected by the model validation.



@lru_cache()
def get_settings() -> Settings:
Expand Down
Loading
Loading