diff --git a/.env.example b/.env.example index 31526784..a9d676e2 100644 --- a/.env.example +++ b/.env.example @@ -25,3 +25,16 @@ RESEND_API_KEY=resend_api_key_here # ── Magic Link ─────────────────────────────────────────────────────── MAGIC_LINK_BASE_URL=http://localhost:8000 + +# ── Security ───────────────────────────────────────────────────────── +# Session cookie Secure flag — must be false for local http://localhost dev. +# For production https deployments, leave this commented out (defaults to true). +# SESSION_COOKIE_SECURE=false + +# NOTE: The application trusts all upstream proxies by default (trusted_hosts=["*"]) +# to support zero-config PaaS deployments. To prevent IP spoofing, your edge proxy +# or WAF *must* be configured to strip inbound X-Forwarded-For headers from clients. +# +# Opt-in: Set TRUSTED_PROXY_IPS to a comma-separated list of your proxy IPs (e.g., 10.0.0.0/8) +# to enable IP-based hijack detection in the application layer. +# TRUSTED_PROXY_IPS= diff --git a/finbot/config.py b/finbot/config.py index df362f5c..220f72b7 100644 --- a/finbot/config.py +++ b/finbot/config.py @@ -53,6 +53,7 @@ class Settings(BaseSettings): # Security Config SECRET_KEY: str = DEFAULT_SECRET_KEY SESSION_SIGNING_KEY: str | None = None # Derived from SECRET_KEY + TRUSTED_PROXY_IPS: str | None = None # Session Config TEMP_SESSION_TIMEOUT: int = 86400 * 7 # 7 days @@ -77,7 +78,7 @@ class Settings(BaseSettings): # Cookie Config SESSION_COOKIE_NAME: str = "finbot_session" - SESSION_COOKIE_SECURE: bool = False # Set to True in production with https + SESSION_COOKIE_SECURE: bool = True # Set to False in .env for local HTTP dev only SESSION_COOKIE_HTTP_ONLY: bool = True # Always HTTP-only for security SESSION_COOKIE_SAMESITE: str = "Lax" diff --git a/finbot/core/auth/session.py b/finbot/core/auth/session.py index ce08f070..49c0e570 100644 --- a/finbot/core/auth/session.py +++ b/finbot/core/auth/session.py @@ -448,12 +448,22 @@ def get_session( # IP address monitoring (logging only, not strict validation) if current_ip and session_context.original_ip: if current_ip != session_context.original_ip: - logger.info( - "IP change detected for session %s: %s -> %s", - session_id[:8], - session_context.original_ip, - current_ip, - ) + if settings.TRUSTED_PROXY_IPS: + logger.info( + "IP change detected for session %s: %s -> %s", + session_id[:8], + session_context.original_ip, + current_ip, + extra={"app.trusted_source": True} + ) + else: + logger.info( + "IP change observed for session %s: %s -> %s", + session_id[:8], + session_context.original_ip, + current_ip, + extra={"app.trusted_source": False} + ) # Tiered fingerprint validation if settings.ENABLE_FINGERPRINT_VALIDATION: diff --git a/finbot/logging_config.py b/finbot/logging_config.py index a804df69..deb46ead 100644 --- a/finbot/logging_config.py +++ b/finbot/logging_config.py @@ -37,7 +37,30 @@ def setup_logging(log_level: str | None = None) -> None: console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(numeric_level) - formatter = logging.Formatter( + class ExtraFormatter(logging.Formatter): + def format(self, record): + # Format the base message + s = super().format(record) + + # Extract standard record attributes so we know what is "extra" + standard_attrs = { + 'name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename', + 'module', 'exc_info', 'exc_text', 'stack_info', 'lineno', 'funcName', + 'created', 'msecs', 'relativeCreated', 'thread', 'threadName', + 'processName', 'process', 'message', 'asctime', 'taskName' + } + + # Find any extra attributes added via logger.info(..., extra={"key": "val"}) + extras = {k: v for k, v in record.__dict__.items() if k not in standard_attrs} + + if extras: + # Append them in a structured way + extra_str = " ".join(f"{k}={v}" for k, v in extras.items()) + s += f" | {extra_str}" + + return s + + formatter = ExtraFormatter( fmt="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) diff --git a/finbot/main.py b/finbot/main.py index 8cd4f1a2..5e1e191f 100644 --- a/finbot/main.py +++ b/finbot/main.py @@ -149,7 +149,16 @@ async def lifespan(app: FastAPI): ProxyHeadersMiddleware, # pylint: disable=ungrouped-imports ) -app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=["*"]) +if settings.TRUSTED_PROXY_IPS: + _trusted_proxies = [h.strip() for h in settings.TRUSTED_PROXY_IPS.split(",")] + app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=_trusted_proxies) +else: + import logging + logging.getLogger("uvicorn").warning( + "TRUSTED_PROXY_IPS is unset. Client IPs are untrusted and IP-based hijack detection is disabled. " + "Ensure your edge proxy strips inbound X-Forwarded-For headers." + ) + app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=["*"]) # Register error handlers register_error_handlers(app) diff --git a/tests/unit/auth/test_session_ip_logging.py b/tests/unit/auth/test_session_ip_logging.py new file mode 100644 index 00000000..f50724c5 --- /dev/null +++ b/tests/unit/auth/test_session_ip_logging.py @@ -0,0 +1,42 @@ +import pytest +import logging +from unittest.mock import patch +from finbot.core.auth.session import SessionManager + +@pytest.fixture +def session_manager(): + return SessionManager() + +@pytest.mark.parametrize("trusted_proxy_ips, expected_trusted_source", [ + (None, False), + ("10.0.0.1,127.0.0.1", True) +]) +def test_session_ip_change_logging( + session_manager, + trusted_proxy_ips, + expected_trusted_source, + caplog, + db +): + """ + Assert that IP-change heuristics correctly add structured `trusted_source` flags + based on the TRUSTED_PROXY_IPS configuration. + """ + with patch("finbot.core.auth.session.settings.TRUSTED_PROXY_IPS", trusted_proxy_ips): + with caplog.at_level(logging.INFO): + # Create a session with an initial IP + session_context = session_manager.create_session( + ip_address="192.168.1.1" + ) + + # Retrieve session with a different IP to trigger the heuristic + session_manager.get_session( + session_id=session_context.session_id, + current_ip="192.168.1.2", + _db=db + ) + + # Check log records + log_records = [r for r in caplog.records if "IP change" in r.getMessage()] + assert len(log_records) == 1 + assert getattr(log_records[0], "app.trusted_source") == expected_trusted_source diff --git a/tests/unit/core/test_config_defaults.py b/tests/unit/core/test_config_defaults.py new file mode 100644 index 00000000..8433f184 --- /dev/null +++ b/tests/unit/core/test_config_defaults.py @@ -0,0 +1,24 @@ +import pytest +from unittest.mock import patch +from finbot.config import Settings + +def test_session_cookie_secure_default_when_env_missing(monkeypatch, tmp_path): + """ + Assert that when SESSION_COOKIE_SECURE is completely absent from the environment, + the code-level default firmly evaluates to True. + """ + # Change into an empty temporary directory to ensure no local .env files can be discovered + monkeypatch.chdir(tmp_path) + + # Ensure it's totally removed from the environment + monkeypatch.delenv("SESSION_COOKIE_SECURE", raising=False) + + # Instantiate Settings directly (bypassing any cached singletons). + # Pass _env_file=None to ensure Pydantic doesn't read a stray .env file in the test dir. + test_settings = Settings(_env_file=None) + + # 1. Assert that without a loaded environment, the class-level default evaluates to True + assert Settings().SESSION_COOKIE_SECURE is True + + # 2. Assert that explicitly bypassing the .env file loading yields the same safe default + assert test_settings.SESSION_COOKIE_SECURE is True diff --git a/tests/unit/core/test_proxy_middleware_behavior.py b/tests/unit/core/test_proxy_middleware_behavior.py new file mode 100644 index 00000000..19751342 --- /dev/null +++ b/tests/unit/core/test_proxy_middleware_behavior.py @@ -0,0 +1,45 @@ +import pytest +from fastapi import FastAPI, Request +from fastapi.testclient import TestClient +from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware + +def create_test_app(trusted_hosts): + app = FastAPI() + + @app.get("/ip") + def get_ip(request: Request): + return {"client_ip": request.client.host} + + app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=trusted_hosts) + return app + +@pytest.mark.parametrize("trusted_hosts, peer_ip, x_forwarded_for, expected_ip", [ + # When unset, we pass ["*"], so ANY peer is trusted to provide X-Forwarded-For + (["*"], "192.168.1.100", "8.8.8.8", "8.8.8.8"), + + # When set to specific IPs, only those peers can provide X-Forwarded-For + (["10.0.0.1"], "10.0.0.1", "8.8.8.8", "8.8.8.8"), + + # Negative Branch: A peer NOT in the trusted list tries to spoof IP + (["10.0.0.1"], "192.168.1.100", "8.8.8.8", "192.168.1.100"), + + # Chained-header vulnerability scenario: + # If the edge proxy (10.0.0.1) is trusted but simply *appends* to an attacker-controlled header + # rather than stripping it, uvicorn's right-to-left traversal will pop 10.0.0.1, see 8.8.8.8, + # and resolve client_ip to the spoofed 8.8.8.8. This test pins this exact behavior, + # which is why our documentation explicitly mandates that edge proxies MUST STRIP inbound headers. + (["10.0.0.1"], "10.0.0.1", "8.8.8.8, 10.0.0.1", "8.8.8.8"), +]) +def test_proxy_middleware_spoofing_behavior(trusted_hosts, peer_ip, x_forwarded_for, expected_ip): + """ + Assert that X-Forwarded-For from a non-listed peer doesn't influence the application's + perceived client_ip when strict trusted hosts are configured. + """ + app = create_test_app(trusted_hosts) + + # We must explicitly set the client IP on the TestClient to simulate the physical peer IP + # that Uvicorn would see on the TCP socket. + with TestClient(app, client=(peer_ip, 12345)) as client: + response = client.get("/ip", headers={"X-Forwarded-For": x_forwarded_for}) + assert response.status_code == 200 + assert response.json()["client_ip"] == expected_ip diff --git a/tests/unit/core/test_proxy_middleware_startup.py b/tests/unit/core/test_proxy_middleware_startup.py new file mode 100644 index 00000000..0c5f319c --- /dev/null +++ b/tests/unit/core/test_proxy_middleware_startup.py @@ -0,0 +1,35 @@ +import pytest +import logging +import importlib +from unittest.mock import patch + +def test_proxy_middleware_startup_warning_when_unset(): + """ + Assert that the startup warning fires when TRUSTED_PROXY_IPS is unset. + """ + with patch("finbot.main.settings.TRUSTED_PROXY_IPS", None): + with patch("logging.Logger.warning") as mock_warning: + import finbot.main + importlib.reload(finbot.main) + + # Check if warning was called with the correct message + called_with_message = any( + "TRUSTED_PROXY_IPS is unset" in call_args[0][0] + for call_args in mock_warning.call_args_list + ) + assert called_with_message + +def test_proxy_middleware_startup_no_warning_when_set(): + """ + Assert that the startup warning does NOT fire when TRUSTED_PROXY_IPS is set. + """ + with patch("finbot.main.settings.TRUSTED_PROXY_IPS", "10.0.0.1"): + with patch("logging.Logger.warning") as mock_warning: + import finbot.main + importlib.reload(finbot.main) + + called_with_message = any( + "TRUSTED_PROXY_IPS is unset" in call_args[0][0] + for call_args in mock_warning.call_args_list + ) + assert not called_with_message