Skip to content
13 changes: 13 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,16 @@ RESEND_API_KEY=resend_api_key_here

# ── Magic Link ───────────────────────────────────────────────────────
MAGIC_LINK_BASE_URL=http://localhost:8000

# ── Security ─────────────────────────────────────────────────────────
# Session cookie Secure flag — must be false for local http://localhost dev.
# For production https deployments, leave this commented out (defaults to true).
# SESSION_COOKIE_SECURE=false

# NOTE: The application trusts all upstream proxies by default (trusted_hosts=["*"])
# to support zero-config PaaS deployments. To prevent IP spoofing, your edge proxy
# or WAF *must* be configured to strip inbound X-Forwarded-For headers from clients.
#
# Opt-in: Set TRUSTED_PROXY_IPS to a comma-separated list of your proxy IPs (e.g., 10.0.0.0/8)
# to enable IP-based hijack detection in the application layer.
# TRUSTED_PROXY_IPS=
3 changes: 2 additions & 1 deletion finbot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Settings(BaseSettings):
# Security Config
SECRET_KEY: str = DEFAULT_SECRET_KEY
SESSION_SIGNING_KEY: str | None = None # Derived from SECRET_KEY
TRUSTED_PROXY_IPS: str | None = None

# Session Config
TEMP_SESSION_TIMEOUT: int = 86400 * 7 # 7 days
Expand All @@ -77,7 +78,7 @@ class Settings(BaseSettings):

# Cookie Config
SESSION_COOKIE_NAME: str = "finbot_session"
SESSION_COOKIE_SECURE: bool = False # Set to True in production with https
SESSION_COOKIE_SECURE: bool = True # Set to False in .env for local HTTP dev only
SESSION_COOKIE_HTTP_ONLY: bool = True # Always HTTP-only for security
SESSION_COOKIE_SAMESITE: str = "Lax"

Expand Down
22 changes: 16 additions & 6 deletions finbot/core/auth/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,22 @@ def get_session(
# IP address monitoring (logging only, not strict validation)
if current_ip and session_context.original_ip:
if current_ip != session_context.original_ip:
logger.info(
"IP change detected for session %s: %s -> %s",
session_id[:8],
session_context.original_ip,
current_ip,
)
if settings.TRUSTED_PROXY_IPS:
logger.info(
"IP change detected for session %s: %s -> %s",
session_id[:8],
session_context.original_ip,
current_ip,
extra={"app.trusted_source": True}
)
else:
logger.info(
"IP change observed for session %s: %s -> %s",
session_id[:8],
session_context.original_ip,
current_ip,
extra={"app.trusted_source": False}
)

# Tiered fingerprint validation
if settings.ENABLE_FINGERPRINT_VALIDATION:
Expand Down
25 changes: 24 additions & 1 deletion finbot/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,30 @@ def setup_logging(log_level: str | None = None) -> None:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(numeric_level)

formatter = logging.Formatter(
class ExtraFormatter(logging.Formatter):
def format(self, record):
# Format the base message
s = super().format(record)

# Extract standard record attributes so we know what is "extra"
standard_attrs = {
'name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename',
'module', 'exc_info', 'exc_text', 'stack_info', 'lineno', 'funcName',
'created', 'msecs', 'relativeCreated', 'thread', 'threadName',
'processName', 'process', 'message', 'asctime', 'taskName'
}

# Find any extra attributes added via logger.info(..., extra={"key": "val"})
extras = {k: v for k, v in record.__dict__.items() if k not in standard_attrs}

if extras:
# Append them in a structured way
extra_str = " ".join(f"{k}={v}" for k, v in extras.items())
s += f" | {extra_str}"

return s

formatter = ExtraFormatter(
fmt="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
Expand Down
11 changes: 10 additions & 1 deletion finbot/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,16 @@ async def lifespan(app: FastAPI):
ProxyHeadersMiddleware, # pylint: disable=ungrouped-imports
)

app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=["*"])
if settings.TRUSTED_PROXY_IPS:
_trusted_proxies = [h.strip() for h in settings.TRUSTED_PROXY_IPS.split(",")]
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=_trusted_proxies)
else:
import logging
logging.getLogger("uvicorn").warning(
"TRUSTED_PROXY_IPS is unset. Client IPs are untrusted and IP-based hijack detection is disabled. "
"Ensure your edge proxy strips inbound X-Forwarded-For headers."
)
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=["*"])

# Register error handlers
register_error_handlers(app)
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/auth/test_session_ip_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest
import logging
from unittest.mock import patch
from finbot.core.auth.session import SessionManager

@pytest.fixture
def session_manager():
return SessionManager()

@pytest.mark.parametrize("trusted_proxy_ips, expected_trusted_source", [
(None, False),
("10.0.0.1,127.0.0.1", True)
])
def test_session_ip_change_logging(
session_manager,
trusted_proxy_ips,
expected_trusted_source,
caplog,
db
):
"""
Assert that IP-change heuristics correctly add structured `trusted_source` flags
based on the TRUSTED_PROXY_IPS configuration.
"""
with patch("finbot.core.auth.session.settings.TRUSTED_PROXY_IPS", trusted_proxy_ips):
with caplog.at_level(logging.INFO):
# Create a session with an initial IP
session_context = session_manager.create_session(
ip_address="192.168.1.1"
)

# Retrieve session with a different IP to trigger the heuristic
session_manager.get_session(
session_id=session_context.session_id,
current_ip="192.168.1.2",
_db=db
)

# Check log records
log_records = [r for r in caplog.records if "IP change" in r.getMessage()]
assert len(log_records) == 1
assert getattr(log_records[0], "app.trusted_source") == expected_trusted_source
24 changes: 24 additions & 0 deletions tests/unit/core/test_config_defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
from unittest.mock import patch
from finbot.config import Settings

def test_session_cookie_secure_default_when_env_missing(monkeypatch, tmp_path):
"""
Assert that when SESSION_COOKIE_SECURE is completely absent from the environment,
the code-level default firmly evaluates to True.
"""
# Change into an empty temporary directory to ensure no local .env files can be discovered
monkeypatch.chdir(tmp_path)

# Ensure it's totally removed from the environment
monkeypatch.delenv("SESSION_COOKIE_SECURE", raising=False)

# Instantiate Settings directly (bypassing any cached singletons).
# Pass _env_file=None to ensure Pydantic doesn't read a stray .env file in the test dir.
test_settings = Settings(_env_file=None)

# 1. Assert that without a loaded environment, the class-level default evaluates to True
assert Settings().SESSION_COOKIE_SECURE is True

# 2. Assert that explicitly bypassing the .env file loading yields the same safe default
assert test_settings.SESSION_COOKIE_SECURE is True
45 changes: 45 additions & 0 deletions tests/unit/core/test_proxy_middleware_behavior.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import pytest
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware

def create_test_app(trusted_hosts):
app = FastAPI()

@app.get("/ip")
def get_ip(request: Request):
return {"client_ip": request.client.host}

app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=trusted_hosts)
return app

@pytest.mark.parametrize("trusted_hosts, peer_ip, x_forwarded_for, expected_ip", [
# When unset, we pass ["*"], so ANY peer is trusted to provide X-Forwarded-For
(["*"], "192.168.1.100", "8.8.8.8", "8.8.8.8"),

# When set to specific IPs, only those peers can provide X-Forwarded-For
(["10.0.0.1"], "10.0.0.1", "8.8.8.8", "8.8.8.8"),

# Negative Branch: A peer NOT in the trusted list tries to spoof IP
(["10.0.0.1"], "192.168.1.100", "8.8.8.8", "192.168.1.100"),

# Chained-header vulnerability scenario:
# If the edge proxy (10.0.0.1) is trusted but simply *appends* to an attacker-controlled header
# rather than stripping it, uvicorn's right-to-left traversal will pop 10.0.0.1, see 8.8.8.8,
# and resolve client_ip to the spoofed 8.8.8.8. This test pins this exact behavior,
# which is why our documentation explicitly mandates that edge proxies MUST STRIP inbound headers.
(["10.0.0.1"], "10.0.0.1", "8.8.8.8, 10.0.0.1", "8.8.8.8"),
])
def test_proxy_middleware_spoofing_behavior(trusted_hosts, peer_ip, x_forwarded_for, expected_ip):
"""
Assert that X-Forwarded-For from a non-listed peer doesn't influence the application's
perceived client_ip when strict trusted hosts are configured.
"""
app = create_test_app(trusted_hosts)

# We must explicitly set the client IP on the TestClient to simulate the physical peer IP
# that Uvicorn would see on the TCP socket.
with TestClient(app, client=(peer_ip, 12345)) as client:
response = client.get("/ip", headers={"X-Forwarded-For": x_forwarded_for})
assert response.status_code == 200
assert response.json()["client_ip"] == expected_ip
35 changes: 35 additions & 0 deletions tests/unit/core/test_proxy_middleware_startup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest
import logging
import importlib
from unittest.mock import patch

def test_proxy_middleware_startup_warning_when_unset():
"""
Assert that the startup warning fires when TRUSTED_PROXY_IPS is unset.
"""
with patch("finbot.main.settings.TRUSTED_PROXY_IPS", None):
with patch("logging.Logger.warning") as mock_warning:
import finbot.main
importlib.reload(finbot.main)

# Check if warning was called with the correct message
called_with_message = any(
"TRUSTED_PROXY_IPS is unset" in call_args[0][0]
for call_args in mock_warning.call_args_list
)
assert called_with_message

def test_proxy_middleware_startup_no_warning_when_set():
"""
Assert that the startup warning does NOT fire when TRUSTED_PROXY_IPS is set.
"""
with patch("finbot.main.settings.TRUSTED_PROXY_IPS", "10.0.0.1"):
with patch("logging.Logger.warning") as mock_warning:
import finbot.main
importlib.reload(finbot.main)

called_with_message = any(
"TRUSTED_PROXY_IPS is unset" in call_args[0][0]
for call_args in mock_warning.call_args_list
)
assert not called_with_message