diff --git a/README.md b/README.md index b8ba18f..3b4bdaf 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,9 @@ pip install tusk-drift-python-sdk[flask] # FastAPI support pip install tusk-drift-python-sdk[fastapi] + +# Django support +pip install tusk-drift-python-sdk[django] ``` ## Requirements diff --git a/drift/instrumentation/django/middleware.py b/drift/instrumentation/django/middleware.py index 74d17b5..768d077 100644 --- a/drift/instrumentation/django/middleware.py +++ b/drift/instrumentation/django/middleware.py @@ -179,7 +179,7 @@ def _record_request(self, request: HttpRequest, sdk, is_pre_app_start: bool) -> start_time_ns = time.time_ns() - method = request.method + method = request.method or "" path = request.path span_name = f"{method} {path}" @@ -397,7 +397,7 @@ def dict_to_schema_merges(merges_dict): status = SpanStatus(code=StatusCode.OK, message="") # Django-specific: use route template for span name to avoid cardinality explosion - method = request.method + method = request.method or "" route_template = getattr(request, "_drift_route_template", None) if route_template: # Use route template (e.g., "users//") @@ -502,7 +502,7 @@ def dict_to_schema_merges(merges_dict): duration_seconds = duration_ns // 1_000_000_000 duration_nanos = duration_ns % 1_000_000_000 - method = request.method + method = request.method or "" route_template = getattr(request, "_drift_route_template", None) span_name = f"{method} {route_template}" if route_template else f"{method} {request.path}" diff --git a/drift/instrumentation/httpx/e2e-tests/.tusk/config.yaml b/drift/instrumentation/httpx/e2e-tests/.tusk/config.yaml new file mode 100644 index 0000000..ed91a49 --- /dev/null +++ b/drift/instrumentation/httpx/e2e-tests/.tusk/config.yaml @@ -0,0 +1,27 @@ +version: 1 + +service: + id: "httpx-e2e-test-id" + name: "httpx-e2e-test" + port: 8000 + start: + command: "python src/app.py" + readiness_check: + command: "curl -f http://localhost:8000/health" + timeout: 45s + interval: 5s + +tusk_api: + url: "http://localhost:8000" + +test_execution: + concurrent_limit: 10 + batch_size: 10 + timeout: 30s + +recording: + sampling_rate: 1.0 + export_spans: false + +replay: + enable_telemetry: false diff --git a/drift/instrumentation/httpx/e2e-tests/Dockerfile b/drift/instrumentation/httpx/e2e-tests/Dockerfile new file mode 100644 index 0000000..b9551db --- /dev/null +++ b/drift/instrumentation/httpx/e2e-tests/Dockerfile @@ -0,0 +1,21 @@ +FROM python-e2e-base:latest + +# Copy SDK source for editable install +COPY . /sdk + +# Copy test files +COPY drift/instrumentation/httpx/e2e-tests /app + +WORKDIR /app + +# Install dependencies (requirements.txt uses -e /sdk for SDK) +RUN pip install -q -r requirements.txt + +# Make entrypoint executable +RUN chmod +x entrypoint.py + +# Create .tusk directories +RUN mkdir -p /app/.tusk/traces /app/.tusk/logs + +# Run entrypoint +ENTRYPOINT ["python", "entrypoint.py"] diff --git a/drift/instrumentation/httpx/e2e-tests/docker-compose.yml b/drift/instrumentation/httpx/e2e-tests/docker-compose.yml new file mode 100644 index 0000000..1d51c3f --- /dev/null +++ b/drift/instrumentation/httpx/e2e-tests/docker-compose.yml @@ -0,0 +1,17 @@ +services: + app: + build: + context: ../../../.. + dockerfile: drift/instrumentation/httpx/e2e-tests/Dockerfile + args: + - TUSK_CLI_VERSION=${TUSK_CLI_VERSION:-latest} + environment: + - PORT=8000 + - TUSK_ANALYTICS_DISABLED=1 + - PYTHONUNBUFFERED=1 + working_dir: /app + volumes: + # Mount app source for development + - ./src:/app/src + # Mount .tusk folder to persist traces + - ./.tusk:/app/.tusk diff --git a/drift/instrumentation/httpx/e2e-tests/entrypoint.py b/drift/instrumentation/httpx/e2e-tests/entrypoint.py new file mode 100644 index 0000000..87e0710 --- /dev/null +++ b/drift/instrumentation/httpx/e2e-tests/entrypoint.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +E2E Test Entrypoint for HTTPX Instrumentation + +This script orchestrates the full e2e test lifecycle: +1. Setup: Install dependencies +2. Record: Start app in RECORD mode, execute requests +3. Test: Run Tusk CLI tests +4. Teardown: Cleanup and return exit code +""" + +import sys +from pathlib import Path + +# Add SDK to path for imports +sys.path.insert(0, "/sdk") + +from drift.instrumentation.e2e_common.base_runner import E2ETestRunnerBase + + +class HttpxE2ETestRunner(E2ETestRunnerBase): + """E2E test runner for HTTPX instrumentation.""" + + def __init__(self): + import os + + port = int(os.getenv("PORT", "8000")) + super().__init__(app_port=port) + + +if __name__ == "__main__": + runner = HttpxE2ETestRunner() + exit_code = runner.run() + sys.exit(exit_code) diff --git a/drift/instrumentation/httpx/e2e-tests/requirements.txt b/drift/instrumentation/httpx/e2e-tests/requirements.txt new file mode 100644 index 0000000..9d4518d --- /dev/null +++ b/drift/instrumentation/httpx/e2e-tests/requirements.txt @@ -0,0 +1,4 @@ +-e /sdk +Flask>=3.1.2 +httpx>=0.28.1 +requests>=2.32.5 diff --git a/drift/instrumentation/httpx/e2e-tests/run.sh b/drift/instrumentation/httpx/e2e-tests/run.sh new file mode 100755 index 0000000..d67bae1 --- /dev/null +++ b/drift/instrumentation/httpx/e2e-tests/run.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +# Exit on error +set -e + +# Accept optional port parameter (default: 8000) +APP_PORT=${1:-8000} +export APP_PORT + +# Generate unique docker compose project name +# Get the instrumentation name (parent directory of e2e-tests) +TEST_NAME="$(basename "$(dirname "$(pwd)")")" +PROJECT_NAME="python-${TEST_NAME}-${APP_PORT}" + +# Colors for output +GREEN='\033[0;32m' +RED='\033[0;31m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Running Python E2E Test: ${TEST_NAME}${NC}" +echo -e "${BLUE}Port: ${APP_PORT}${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" + +# Cleanup function +cleanup() { + echo "" + echo -e "${YELLOW}Cleaning up containers...${NC}" + docker compose -p "$PROJECT_NAME" down -v 2>/dev/null || true +} + +# Register cleanup on exit +trap cleanup EXIT + +# Build containers +echo -e "${BLUE}Building containers...${NC}" +docker compose -p "$PROJECT_NAME" build --no-cache + +# Run the test container +echo -e "${BLUE}Starting test...${NC}" +echo "" + +# Run container and capture exit code (always use port 8000 inside container) +# Disable set -e temporarily to capture exit code +set +e +docker compose -p "$PROJECT_NAME" run --rm app +EXIT_CODE=$? +set -e + +echo "" +if [ $EXIT_CODE -eq 0 ]; then + echo -e "${GREEN}========================================${NC}" + echo -e "${GREEN}Test passed!${NC}" + echo -e "${GREEN}========================================${NC}" +else + echo -e "${RED}========================================${NC}" + echo -e "${RED}Test failed with exit code ${EXIT_CODE}${NC}" + echo -e "${RED}========================================${NC}" +fi + +exit $EXIT_CODE diff --git a/drift/instrumentation/httpx/e2e-tests/src/app.py b/drift/instrumentation/httpx/e2e-tests/src/app.py new file mode 100644 index 0000000..4bcd224 --- /dev/null +++ b/drift/instrumentation/httpx/e2e-tests/src/app.py @@ -0,0 +1,366 @@ +"""Flask test app for e2e tests - HTTPX instrumentation testing (sync and async).""" + +import asyncio + +import httpx +from flask import Flask, jsonify, request + +from drift import TuskDrift + +# Initialize SDK +sdk = TuskDrift.initialize( + api_key="tusk-test-key", + log_level="debug", +) + +app = Flask(__name__) + + +# ============================================================================= +# Health Check +# ============================================================================= + + +@app.route("/health", methods=["GET"]) +def health(): + return jsonify({"status": "healthy"}) + + +# ============================================================================= +# Sync Client Tests (httpx.Client) +# ============================================================================= + + +@app.route("/api/sync/get-json", methods=["GET"]) +def sync_get_json(): + """Test sync GET request returning JSON.""" + try: + with httpx.Client() as client: + response = client.get("https://jsonplaceholder.typicode.com/posts/1") + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/sync/get-with-params", methods=["GET"]) +def sync_get_with_params(): + """Test sync GET request with query parameters.""" + try: + with httpx.Client() as client: + response = client.get( + "https://jsonplaceholder.typicode.com/comments", + params={"postId": 1}, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/sync/get-with-headers", methods=["GET"]) +def sync_get_with_headers(): + """Test sync GET request with custom headers.""" + try: + with httpx.Client() as client: + response = client.get( + "https://jsonplaceholder.typicode.com/posts/1", + headers={ + "X-Custom-Header": "test-value", + "Accept": "application/json", + }, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/sync/post-json", methods=["POST"]) +def sync_post_json(): + """Test sync POST request with JSON body.""" + try: + data = request.get_json() or {} + with httpx.Client() as client: + response = client.post( + "https://jsonplaceholder.typicode.com/posts", + json={ + "title": data.get("title", "Test Title"), + "body": data.get("body", "Test Body"), + "userId": data.get("userId", 1), + }, + ) + return jsonify(response.json()), 201 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/sync/post-form", methods=["POST"]) +def sync_post_form(): + """Test sync POST request with form-encoded data (using content parameter).""" + try: + with httpx.Client() as client: + # Use jsonplaceholder which returns deterministic responses + # Send form data as content with explicit content-type + response = client.post( + "https://jsonplaceholder.typicode.com/posts", + content="title=Form+Title&body=Form+Body&userId=1", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/sync/put-json", methods=["PUT"]) +def sync_put_json(): + """Test sync PUT request with JSON body.""" + try: + data = request.get_json() or {} + with httpx.Client() as client: + response = client.put( + "https://jsonplaceholder.typicode.com/posts/1", + json={ + "id": 1, + "title": data.get("title", "Updated Title"), + "body": data.get("body", "Updated Body"), + "userId": data.get("userId", 1), + }, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/sync/patch-json", methods=["PATCH"]) +def sync_patch_json(): + """Test sync PATCH request with partial JSON body.""" + try: + data = request.get_json() or {} + with httpx.Client() as client: + response = client.patch( + "https://jsonplaceholder.typicode.com/posts/1", + json={"title": data.get("title", "Patched Title")}, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/sync/delete", methods=["DELETE"]) +def sync_delete(): + """Test sync DELETE request.""" + try: + with httpx.Client() as client: + response = client.delete("https://jsonplaceholder.typicode.com/posts/1") + return jsonify({"status": "deleted", "status_code": response.status_code}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/sync/chain", methods=["GET"]) +def sync_chain(): + """Test sync sequential chained requests.""" + try: + with httpx.Client() as client: + # First request: get a user + user_response = client.get("https://jsonplaceholder.typicode.com/users/1") + user = user_response.json() + + # Second request: get posts by that user + posts_response = client.get( + "https://jsonplaceholder.typicode.com/posts", + params={"userId": user["id"]}, + ) + posts = posts_response.json() + + # Third request: get comments on the first post + if posts: + comments_response = client.get(f"https://jsonplaceholder.typicode.com/posts/{posts[0]['id']}/comments") + comments = comments_response.json() + else: + comments = [] + + return jsonify( + { + "user": user, + "post_count": len(posts), + "first_post_comments": len(comments), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# ============================================================================= +# Async Client Tests (httpx.AsyncClient) +# ============================================================================= + + +@app.route("/api/async/get-json", methods=["GET"]) +def async_get_json(): + """Test async GET request returning JSON.""" + + async def fetch(): + async with httpx.AsyncClient() as client: + response = await client.get("https://jsonplaceholder.typicode.com/posts/2") + return response.json() + + try: + result = asyncio.run(fetch()) + return jsonify(result) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/async/get-with-params", methods=["GET"]) +def async_get_with_params(): + """Test async GET request with query parameters.""" + + async def fetch(): + async with httpx.AsyncClient() as client: + response = await client.get( + "https://jsonplaceholder.typicode.com/comments", + params={"postId": 2}, + ) + return response.json() + + try: + result = asyncio.run(fetch()) + return jsonify(result) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/async/post-json", methods=["POST"]) +def async_post_json(): + """Test async POST request with JSON body.""" + + async def fetch(): + data = request.get_json() or {} + async with httpx.AsyncClient() as client: + response = await client.post( + "https://jsonplaceholder.typicode.com/posts", + json={ + "title": data.get("title", "Async Test Title"), + "body": data.get("body", "Async Test Body"), + "userId": data.get("userId", 2), + }, + ) + return response.json() + + try: + result = asyncio.run(fetch()) + return jsonify(result), 201 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/async/put-json", methods=["PUT"]) +def async_put_json(): + """Test async PUT request with JSON body.""" + + async def fetch(): + data = request.get_json() or {} + async with httpx.AsyncClient() as client: + response = await client.put( + "https://jsonplaceholder.typicode.com/posts/2", + json={ + "id": 2, + "title": data.get("title", "Async Updated Title"), + "body": data.get("body", "Async Updated Body"), + "userId": data.get("userId", 2), + }, + ) + return response.json() + + try: + result = asyncio.run(fetch()) + return jsonify(result) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/async/delete", methods=["DELETE"]) +def async_delete(): + """Test async DELETE request.""" + + async def fetch(): + async with httpx.AsyncClient() as client: + response = await client.delete("https://jsonplaceholder.typicode.com/posts/2") + return {"status": "deleted", "status_code": response.status_code} + + try: + result = asyncio.run(fetch()) + return jsonify(result) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/async/parallel", methods=["GET"]) +def async_parallel(): + """Test parallel async requests using asyncio.gather.""" + + async def fetch(): + async with httpx.AsyncClient() as client: + # Run three requests in parallel + posts_task = client.get("https://jsonplaceholder.typicode.com/posts/1") + users_task = client.get("https://jsonplaceholder.typicode.com/users/1") + comments_task = client.get("https://jsonplaceholder.typicode.com/comments/1") + + posts_response, users_response, comments_response = await asyncio.gather( + posts_task, users_task, comments_task + ) + + return { + "post": posts_response.json(), + "user": users_response.json(), + "comment": comments_response.json(), + } + + try: + result = asyncio.run(fetch()) + return jsonify(result) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/async/chain", methods=["GET"]) +def async_chain(): + """Test async sequential chained requests.""" + + async def fetch(): + async with httpx.AsyncClient() as client: + # First request: get a user + user_response = await client.get("https://jsonplaceholder.typicode.com/users/2") + user = user_response.json() + + # Second request: get posts by that user + posts_response = await client.get( + "https://jsonplaceholder.typicode.com/posts", + params={"userId": user["id"]}, + ) + posts = posts_response.json() + + # Third request: get comments on the first post + if posts: + comments_response = await client.get( + f"https://jsonplaceholder.typicode.com/posts/{posts[0]['id']}/comments" + ) + comments = comments_response.json() + else: + comments = [] + + return { + "user": user, + "post_count": len(posts), + "first_post_comments": len(comments), + } + + try: + result = asyncio.run(fetch()) + return jsonify(result) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + sdk.mark_app_as_ready() + app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/httpx/e2e-tests/src/test_requests.py b/drift/instrumentation/httpx/e2e-tests/src/test_requests.py new file mode 100644 index 0000000..26789c5 --- /dev/null +++ b/drift/instrumentation/httpx/e2e-tests/src/test_requests.py @@ -0,0 +1,103 @@ +"""Execute test requests against the Flask app to exercise the HTTPX instrumentation.""" + +import time + +import requests + +BASE_URL = "http://localhost:8000" + + +def make_request(method, endpoint, **kwargs): + """Make HTTP request and log result.""" + url = f"{BASE_URL}{endpoint}" + print(f"-> {method} {endpoint}") + + # Set default timeout if not provided + kwargs.setdefault("timeout", 30) + response = requests.request(method, url, **kwargs) + print(f" Status: {response.status_code}") + time.sleep(0.5) # Small delay between requests + return response + + +if __name__ == "__main__": + print("Starting test request sequence for HTTPX instrumentation...\n") + + # Health check + make_request("GET", "/health") + + # ========================================================================== + # Sync Client Tests + # ========================================================================== + print("\n--- Sync Client Tests ---\n") + + # Basic GET request - JSON response + make_request("GET", "/api/sync/get-json") + + # GET with query parameters + make_request("GET", "/api/sync/get-with-params") + + # GET with custom headers + make_request("GET", "/api/sync/get-with-headers") + + # POST with JSON body + make_request( + "POST", + "/api/sync/post-json", + json={"title": "Sync Test Post", "body": "This is a sync test post body", "userId": 1}, + ) + + # POST with form data + make_request("POST", "/api/sync/post-form") + + # PUT request + make_request( + "PUT", + "/api/sync/put-json", + json={"title": "Sync Updated Post", "body": "This is a sync updated post body", "userId": 1}, + ) + + # PATCH request + make_request("PATCH", "/api/sync/patch-json", json={"title": "Sync Patched Title"}) + + # DELETE request + make_request("DELETE", "/api/sync/delete") + + # Sequential chained requests + make_request("GET", "/api/sync/chain") + + # ========================================================================== + # Async Client Tests + # ========================================================================== + print("\n--- Async Client Tests ---\n") + + # Async GET request - JSON response + make_request("GET", "/api/async/get-json") + + # Async GET with query parameters + make_request("GET", "/api/async/get-with-params") + + # Async POST with JSON body + make_request( + "POST", + "/api/async/post-json", + json={"title": "Async Test Post", "body": "This is an async test post body", "userId": 2}, + ) + + # Async PUT request + make_request( + "PUT", + "/api/async/put-json", + json={"title": "Async Updated Post", "body": "This is an async updated post body", "userId": 2}, + ) + + # Async DELETE request + make_request("DELETE", "/api/async/delete") + + # Parallel async requests + make_request("GET", "/api/async/parallel") + + # Async sequential chained requests + make_request("GET", "/api/async/chain") + + print("\nAll requests completed successfully") diff --git a/drift/instrumentation/httpx/instrumentation.py b/drift/instrumentation/httpx/instrumentation.py index 125c552..9cb1e49 100644 --- a/drift/instrumentation/httpx/instrumentation.py +++ b/drift/instrumentation/httpx/instrumentation.py @@ -5,12 +5,10 @@ import base64 import json import logging -import time from typing import Any from urllib.parse import urlparse -from opentelemetry import context as otel_context -from opentelemetry.trace import Span, Status, set_span_in_context +from opentelemetry.trace import Span, Status from opentelemetry.trace import SpanKind as OTelSpanKind from opentelemetry.trace import StatusCode as OTelStatusCode @@ -37,14 +35,15 @@ def __init__(self, message: str, method: str, url: str): from ...core.data_normalization import create_mock_input_value, remove_none_values from ...core.drift_sdk import TuskDrift from ...core.json_schema_helper import DecodedType, EncodingType, SchemaMerge +from ...core.mode_utils import handle_record_mode, handle_replay_mode from ...core.tracing import TdSpanAttributes +from ...core.tracing.span_utils import CreateSpanOptions, SpanUtils from ...core.types import ( PackageType, SpanKind, SpanStatus, StatusCode, TuskDriftMode, - calling_library_context, ) from ..base import InstrumentationBase from ..http import HttpSpanData, HttpTransformEngine @@ -107,30 +106,80 @@ def patch(self, module: Any) -> None: else: logger.warning("httpx.AsyncClient not found, skipping async instrumentation") + def _get_default_response(self, httpx_module: Any, url: str) -> Any: + """Return default response for background requests in REPLAY mode. + + Background requests (health checks, metrics, etc.) that happen outside + of any trace context should return a default response instead of failing. + """ + request = httpx_module.Request("GET", url) + response = httpx_module.Response( + status_code=200, + content=b"", + request=request, + ) + logger.debug(f"[HttpxInstrumentation] Returning default response for background request to {url}") + return response + def _patch_sync_client(self, module: Any) -> None: """Patch httpx.Client.request for sync HTTP calls.""" original_request = module.Client.request - instrumentation = self + instrumentation_self = self def patched_request(client_self, method: str, url: Any, **kwargs): """Patched Client.request method.""" - # Convert URL to string if needed url_str = str(url) - sdk = TuskDrift.get_instance() # Pass through if SDK is disabled if sdk.mode == TuskDriftMode.DISABLED: return original_request(client_self, method, url, **kwargs) - # Get tracer and parse URL for span name - tracer = sdk.get_tracer() - parsed_url = urlparse(url_str) - span_name = f"{method.upper()} {parsed_url.path or '/'}" + def original_call(): + return original_request(client_self, method, url, **kwargs) + + # REPLAY mode: Use handle_replay_mode for proper background request handling + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: instrumentation_self._handle_replay_sync( + sdk, module, method, url_str, **kwargs + ), + no_op_request_handler=lambda: instrumentation_self._get_default_response(module, url_str), + is_server_request=False, + ) + + # RECORD mode: Use handle_record_mode for proper is_pre_app_start handling + return handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: instrumentation_self._handle_record_sync( + client_self, method, url_str, is_pre_app_start, original_request, **kwargs + ), + span_kind=OTelSpanKind.CLIENT, + ) + + # Apply patch + module.Client.request = patched_request + logger.info("httpx.Client.request instrumented") + + def _handle_replay_sync( + self, + sdk: TuskDrift, + httpx_module: Any, + method: str, + url: str, + **kwargs, + ) -> Any: + """Handle request in REPLAY mode (sync). + + Creates a span, fetches mock response, and returns it. + Raises RuntimeError if no mock is found. + """ + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" - # Start OpenTelemetry span (automatically inherits parent context) - logger.debug(f"[HttpxInstrumentation] Creating span for {method.upper()} {url_str}") - span = tracer.start_span( + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( name=span_name, kind=OTelSpanKind.CLIENT, attributes={ @@ -141,52 +190,93 @@ def patched_request(client_self, method: str, url: Any, **kwargs): TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, TdSpanAttributes.IS_PRE_APP_START: not sdk.app_ready, }, + is_pre_app_start=not sdk.app_ready, ) - logger.debug(f"[HttpxInstrumentation] Span created: {span}") + ) - # Make span active - ctx = otel_context.get_current() - ctx_with_span = set_span_in_context(span, ctx) - token = otel_context.attach(ctx_with_span) + if not span_info: + raise RuntimeError(f"Error creating span in replay mode for {method} {url}") - try: - # Get span IDs for mock requests - span_context = span.get_span_context() - trace_id = format(span_context.trace_id, "032x") - span_id = format(span_context.span_id, "016x") - - # REPLAY mode: Try to get mock - if sdk.mode == TuskDriftMode.REPLAY: - mock_response = instrumentation._try_get_mock_sync( - sdk, module, method, url_str, trace_id, span_id, **kwargs - ) - if mock_response is not None: - return mock_response + try: + with SpanUtils.with_span(span_info): + # Use IDs from SpanInfo (already formatted) + mock_response = self._try_get_mock_sync( + sdk, + httpx_module, + method, + url, + span_info.trace_id, + span_info.span_id, + **kwargs, + ) + + if mock_response is not None: + return mock_response + + # No mock found - raise error in REPLAY mode + raise RuntimeError(f"No mock found for {method} {url} in REPLAY mode") + finally: + span_info.span.end() - # Check drop transforms before making request - if instrumentation._transform_engine and instrumentation._transform_engine.should_drop_outbound_request( - method.upper(), url_str, kwargs.get("headers", {}) + def _handle_record_sync( + self, + client_self: Any, + method: str, + url: str, + is_pre_app_start: bool, + original_request: Any, + **kwargs, + ) -> Any: + """Handle request in RECORD mode (sync). + + Creates a span, makes the real request, and records the response. + """ + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( + name=span_name, + kind=OTelSpanKind.CLIENT, + attributes={ + TdSpanAttributes.NAME: span_name, + TdSpanAttributes.PACKAGE_NAME: parsed_url.scheme, + TdSpanAttributes.INSTRUMENTATION_NAME: "HttpxInstrumentation", + TdSpanAttributes.SUBMODULE_NAME: method.upper(), + TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, + TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start, + }, + is_pre_app_start=is_pre_app_start, + ) + ) + + if not span_info: + # Span creation failed (trace blocked, etc.) - just make the request + return original_request(client_self, method, url, **kwargs) + + try: + with SpanUtils.with_span(span_info): + # Check drop transforms BEFORE making the request + if self._transform_engine and self._transform_engine.should_drop_outbound_request( + method.upper(), url, kwargs.get("headers", {}) ): - span.set_attribute( + # Request should be dropped - mark span and raise exception + span_info.span.set_attribute( TdSpanAttributes.OUTPUT_VALUE, json.dumps({"bodyProcessingError": "dropped"}), ) - span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) - span.end() - + span_info.span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) raise RequestDroppedByTransform( - f"Outbound request to {url_str} was dropped by transform rule", + f"Outbound request to {url} was dropped by transform rule", method.upper(), - url_str, + url, ) - # RECORD mode or mock not found: Make real request - start_time_ns = time.time_ns() + # Make the real request error = None response = None - # Set calling_library_context to prevent socket instrumentation warnings - calling_lib_token = calling_library_context.set("HttpxInstrumentation") try: response = original_request(client_self, method, url, **kwargs) return response @@ -194,52 +284,82 @@ def patched_request(client_self, method: str, url: Any, **kwargs): error = e raise finally: - calling_library_context.reset(calling_lib_token) # Finalize span with request/response data - (time.time_ns() - start_time_ns) / 1_000_000 - instrumentation._finalize_span( - span, + self._finalize_span( + span_info.span, method, - url_str, + url, response, error, kwargs, ) - finally: - # Detach context and end span - otel_context.detach(token) - logger.debug(f"[HttpxInstrumentation] Ending span for {method.upper()} {url_str}") - span.end() - logger.debug("[HttpxInstrumentation] Span ended") - - # Apply patch - module.Client.request = patched_request - logger.info("httpx.Client.request instrumented") + finally: + span_info.span.end() def _patch_async_client(self, module: Any) -> None: """Patch httpx.AsyncClient.request for async HTTP calls.""" original_request = module.AsyncClient.request - instrumentation = self + instrumentation_self = self async def patched_request(client_self, method: str, url: Any, **kwargs): """Patched AsyncClient.request method.""" - # Convert URL to string if needed url_str = str(url) - sdk = TuskDrift.get_instance() # Pass through if SDK is disabled if sdk.mode == TuskDriftMode.DISABLED: return await original_request(client_self, method, url, **kwargs) - # Get tracer and parse URL for span name - tracer = sdk.get_tracer() - parsed_url = urlparse(url_str) - span_name = f"{method.upper()} {parsed_url.path or '/'}" + async def original_call(): + return await original_request(client_self, method, url, **kwargs) + + # REPLAY mode: Use handle_replay_mode for proper background request handling + # handle_replay_mode returns coroutine which we await + if sdk.mode == TuskDriftMode.REPLAY: + return await handle_replay_mode( + replay_mode_handler=lambda: instrumentation_self._handle_replay_async( + sdk, module, method, url_str, **kwargs + ), + no_op_request_handler=lambda: instrumentation_self._get_default_response(module, url_str), + is_server_request=False, + ) + + # RECORD mode: Use handle_record_mode for proper is_pre_app_start handling + # handle_record_mode returns coroutine which we await + return await handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: instrumentation_self._handle_record_async( + client_self, method, url_str, is_pre_app_start, original_request, **kwargs + ), + span_kind=OTelSpanKind.CLIENT, + ) + + # Apply patch + module.AsyncClient.request = patched_request + logger.info("httpx.AsyncClient.request instrumented") + + async def _handle_replay_async( + self, + sdk: TuskDrift, + httpx_module: Any, + method: str, + url: str, + **kwargs, + ) -> Any: + """Handle request in REPLAY mode (async). + + Creates a span, fetches mock response, and returns it. + Raises RuntimeError if no mock is found. + + Note: Uses sync mock lookup because async httpx in Flask runs via + asyncio.run() which creates nested event loop issues with async SDK calls. + """ + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" - # Start OpenTelemetry span (automatically inherits parent context) - logger.debug(f"[HttpxInstrumentation] Creating async span for {method.upper()} {url_str}") - span = tracer.start_span( + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( name=span_name, kind=OTelSpanKind.CLIENT, attributes={ @@ -250,52 +370,94 @@ async def patched_request(client_self, method: str, url: Any, **kwargs): TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, TdSpanAttributes.IS_PRE_APP_START: not sdk.app_ready, }, + is_pre_app_start=not sdk.app_ready, ) - logger.debug(f"[HttpxInstrumentation] Async span created: {span}") + ) - # Make span active - ctx = otel_context.get_current() - ctx_with_span = set_span_in_context(span, ctx) - token = otel_context.attach(ctx_with_span) + if not span_info: + raise RuntimeError(f"Error creating span in replay mode for {method} {url}") - try: - # Get span IDs for mock requests - span_context = span.get_span_context() - trace_id = format(span_context.trace_id, "032x") - span_id = format(span_context.span_id, "016x") - - # REPLAY mode: Try to get mock - if sdk.mode == TuskDriftMode.REPLAY: - mock_response = await instrumentation._try_get_mock_async( - sdk, module, method, url_str, trace_id, span_id, **kwargs - ) - if mock_response is not None: - return mock_response + try: + with SpanUtils.with_span(span_info): + # Use sync mock lookup to avoid nested event loop issues + # (Flask uses asyncio.run() which doesn't play well with nested async SDK calls) + mock_response = self._try_get_mock_sync( + sdk, + httpx_module, + method, + url, + span_info.trace_id, + span_info.span_id, + **kwargs, + ) + + if mock_response is not None: + return mock_response + + # No mock found - raise error in REPLAY mode + raise RuntimeError(f"No mock found for {method} {url} in REPLAY mode") + finally: + span_info.span.end() - # Check drop transforms before making request - if instrumentation._transform_engine and instrumentation._transform_engine.should_drop_outbound_request( - method.upper(), url_str, kwargs.get("headers", {}) + async def _handle_record_async( + self, + client_self: Any, + method: str, + url: str, + is_pre_app_start: bool, + original_request: Any, + **kwargs, + ) -> Any: + """Handle request in RECORD mode (async). + + Creates a span, makes the real request, and records the response. + """ + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( + name=span_name, + kind=OTelSpanKind.CLIENT, + attributes={ + TdSpanAttributes.NAME: span_name, + TdSpanAttributes.PACKAGE_NAME: parsed_url.scheme, + TdSpanAttributes.INSTRUMENTATION_NAME: "HttpxInstrumentation", + TdSpanAttributes.SUBMODULE_NAME: method.upper(), + TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, + TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start, + }, + is_pre_app_start=is_pre_app_start, + ) + ) + + if not span_info: + # Span creation failed (trace blocked, etc.) - just make the request + return await original_request(client_self, method, url, **kwargs) + + try: + with SpanUtils.with_span(span_info): + # Check drop transforms BEFORE making the request + if self._transform_engine and self._transform_engine.should_drop_outbound_request( + method.upper(), url, kwargs.get("headers", {}) ): - span.set_attribute( + # Request should be dropped - mark span and raise exception + span_info.span.set_attribute( TdSpanAttributes.OUTPUT_VALUE, json.dumps({"bodyProcessingError": "dropped"}), ) - span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) - span.end() - + span_info.span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) raise RequestDroppedByTransform( - f"Outbound request to {url_str} was dropped by transform rule", + f"Outbound request to {url} was dropped by transform rule", method.upper(), - url_str, + url, ) - # RECORD mode or mock not found: Make real request - start_time_ns = time.time_ns() + # Make the real request error = None response = None - # Set calling_library_context to prevent socket instrumentation warnings - calling_lib_token = calling_library_context.set("HttpxInstrumentation") try: response = await original_request(client_self, method, url, **kwargs) return response @@ -303,27 +465,17 @@ async def patched_request(client_self, method: str, url: Any, **kwargs): error = e raise finally: - calling_library_context.reset(calling_lib_token) - # Finalize span with request/response data (async version) - (time.time_ns() - start_time_ns) / 1_000_000 - await instrumentation._finalize_span_async( - span, + # Finalize span with request/response data + await self._finalize_span_async( + span_info.span, method, - url_str, + url, response, error, kwargs, ) - finally: - # Detach context and end span - otel_context.detach(token) - logger.debug(f"[HttpxInstrumentation] Ending async span for {method.upper()} {url_str}") - span.end() - logger.debug("[HttpxInstrumentation] Async span ended") - - # Apply patch - module.AsyncClient.request = patched_request - logger.info("httpx.AsyncClient.request instrumented") + finally: + span_info.span.end() def _encode_body_to_base64(self, body_data: Any) -> tuple[str | None, int]: """Encode body data to base64 string. @@ -409,8 +561,8 @@ def _try_get_mock_sync( parsed_url = urlparse(url) # Extract request data - headers = dict(kwargs.get("headers", {})) - params = dict(kwargs.get("params", {})) if kwargs.get("params") else {} + headers = dict(kwargs.get("headers") or {}) + params = dict(kwargs.get("params") or {}) # Handle request body - encode to base64 content = kwargs.get("content") @@ -506,8 +658,8 @@ async def _try_get_mock_async( parsed_url = urlparse(url) # Extract request data - headers = dict(kwargs.get("headers", {})) - params = dict(kwargs.get("params", {})) if kwargs.get("params") else {} + headers = dict(kwargs.get("headers") or {}) + params = dict(kwargs.get("params") or {}) # Handle request body - encode to base64 content = kwargs.get("content") @@ -597,7 +749,16 @@ def _create_mock_response(self, httpx_module: Any, mock_data: dict[str, Any], me """ # Get status code and headers status_code = mock_data.get("statusCode", 200) - headers = mock_data.get("headers", {}) + headers = dict(mock_data.get("headers", {})) + + # Remove content-encoding and transfer-encoding headers since the body + # was already decompressed when recorded (httpx auto-decompresses) + headers_to_remove = [] + for key in headers: + if key.lower() in ("content-encoding", "transfer-encoding"): + headers_to_remove.append(key) + for key in headers_to_remove: + del headers[key] # Get body - decode from base64 if needed body = mock_data.get("body", "") @@ -653,8 +814,8 @@ def _finalize_span( parsed_url = urlparse(url) # ===== BUILD INPUT VALUE ===== - headers = dict(request_kwargs.get("headers", {})) - params = dict(request_kwargs.get("params", {})) if request_kwargs.get("params") else {} + headers = dict(request_kwargs.get("headers") or {}) + params = dict(request_kwargs.get("params") or {}) # Get request body and encode to base64 content = request_kwargs.get("content") @@ -831,8 +992,8 @@ async def _finalize_span_async( parsed_url = urlparse(url) # ===== BUILD INPUT VALUE ===== - headers = dict(request_kwargs.get("headers", {})) - params = dict(request_kwargs.get("params", {})) if request_kwargs.get("params") else {} + headers = dict(request_kwargs.get("headers") or {}) + params = dict(request_kwargs.get("params") or {}) # Get request body and encode to base64 content = request_kwargs.get("content") diff --git a/drift/instrumentation/requests/e2e-tests/.tusk/config.yaml b/drift/instrumentation/requests/e2e-tests/.tusk/config.yaml new file mode 100644 index 0000000..959280c --- /dev/null +++ b/drift/instrumentation/requests/e2e-tests/.tusk/config.yaml @@ -0,0 +1,27 @@ +version: 1 + +service: + id: "requests-e2e-test-id" + name: "requests-e2e-test" + port: 8000 + start: + command: "python src/app.py" + readiness_check: + command: "curl -f http://localhost:8000/health" + timeout: 45s + interval: 5s + +tusk_api: + url: "http://localhost:8000" + +test_execution: + concurrent_limit: 10 + batch_size: 10 + timeout: 30s + +recording: + sampling_rate: 1.0 + export_spans: false + +replay: + enable_telemetry: false diff --git a/drift/instrumentation/requests/e2e-tests/Dockerfile b/drift/instrumentation/requests/e2e-tests/Dockerfile new file mode 100644 index 0000000..6964984 --- /dev/null +++ b/drift/instrumentation/requests/e2e-tests/Dockerfile @@ -0,0 +1,21 @@ +FROM python-e2e-base:latest + +# Copy SDK source for editable install +COPY . /sdk + +# Copy test files +COPY drift/instrumentation/requests/e2e-tests /app + +WORKDIR /app + +# Install dependencies (requirements.txt uses -e /sdk for SDK) +RUN pip install -q -r requirements.txt + +# Make entrypoint executable +RUN chmod +x entrypoint.py + +# Create .tusk directories +RUN mkdir -p /app/.tusk/traces /app/.tusk/logs + +# Run entrypoint +ENTRYPOINT ["python", "entrypoint.py"] diff --git a/drift/instrumentation/requests/e2e-tests/docker-compose.yml b/drift/instrumentation/requests/e2e-tests/docker-compose.yml new file mode 100644 index 0000000..774bb9d --- /dev/null +++ b/drift/instrumentation/requests/e2e-tests/docker-compose.yml @@ -0,0 +1,17 @@ +services: + app: + build: + context: ../../../.. + dockerfile: drift/instrumentation/requests/e2e-tests/Dockerfile + args: + - TUSK_CLI_VERSION=${TUSK_CLI_VERSION:-latest} + environment: + - PORT=8000 + - TUSK_ANALYTICS_DISABLED=1 + - PYTHONUNBUFFERED=1 + working_dir: /app + volumes: + # Mount app source for development + - ./src:/app/src + # Mount .tusk folder to persist traces + - ./.tusk:/app/.tusk diff --git a/drift/instrumentation/requests/e2e-tests/entrypoint.py b/drift/instrumentation/requests/e2e-tests/entrypoint.py new file mode 100644 index 0000000..aa617dd --- /dev/null +++ b/drift/instrumentation/requests/e2e-tests/entrypoint.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +E2E Test Entrypoint for Requests Instrumentation + +This script orchestrates the full e2e test lifecycle: +1. Setup: Install dependencies +2. Record: Start app in RECORD mode, execute requests +3. Test: Run Tusk CLI tests +4. Teardown: Cleanup and return exit code +""" + +import sys +from pathlib import Path + +# Add SDK to path for imports +sys.path.insert(0, "/sdk") + +from drift.instrumentation.e2e_common.base_runner import E2ETestRunnerBase + + +class RequestsE2ETestRunner(E2ETestRunnerBase): + """E2E test runner for Requests instrumentation.""" + + def __init__(self): + import os + + port = int(os.getenv("PORT", "8000")) + super().__init__(app_port=port) + + +if __name__ == "__main__": + runner = RequestsE2ETestRunner() + exit_code = runner.run() + sys.exit(exit_code) diff --git a/drift/instrumentation/requests/e2e-tests/requirements.txt b/drift/instrumentation/requests/e2e-tests/requirements.txt new file mode 100644 index 0000000..3a6fefb --- /dev/null +++ b/drift/instrumentation/requests/e2e-tests/requirements.txt @@ -0,0 +1,3 @@ +-e /sdk +Flask>=3.1.2 +requests>=2.32.5 diff --git a/drift/instrumentation/requests/e2e-tests/run.sh b/drift/instrumentation/requests/e2e-tests/run.sh new file mode 100755 index 0000000..d67bae1 --- /dev/null +++ b/drift/instrumentation/requests/e2e-tests/run.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +# Exit on error +set -e + +# Accept optional port parameter (default: 8000) +APP_PORT=${1:-8000} +export APP_PORT + +# Generate unique docker compose project name +# Get the instrumentation name (parent directory of e2e-tests) +TEST_NAME="$(basename "$(dirname "$(pwd)")")" +PROJECT_NAME="python-${TEST_NAME}-${APP_PORT}" + +# Colors for output +GREEN='\033[0;32m' +RED='\033[0;31m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Running Python E2E Test: ${TEST_NAME}${NC}" +echo -e "${BLUE}Port: ${APP_PORT}${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" + +# Cleanup function +cleanup() { + echo "" + echo -e "${YELLOW}Cleaning up containers...${NC}" + docker compose -p "$PROJECT_NAME" down -v 2>/dev/null || true +} + +# Register cleanup on exit +trap cleanup EXIT + +# Build containers +echo -e "${BLUE}Building containers...${NC}" +docker compose -p "$PROJECT_NAME" build --no-cache + +# Run the test container +echo -e "${BLUE}Starting test...${NC}" +echo "" + +# Run container and capture exit code (always use port 8000 inside container) +# Disable set -e temporarily to capture exit code +set +e +docker compose -p "$PROJECT_NAME" run --rm app +EXIT_CODE=$? +set -e + +echo "" +if [ $EXIT_CODE -eq 0 ]; then + echo -e "${GREEN}========================================${NC}" + echo -e "${GREEN}Test passed!${NC}" + echo -e "${GREEN}========================================${NC}" +else + echo -e "${RED}========================================${NC}" + echo -e "${RED}Test failed with exit code ${EXIT_CODE}${NC}" + echo -e "${RED}========================================${NC}" +fi + +exit $EXIT_CODE diff --git a/drift/instrumentation/requests/e2e-tests/src/app.py b/drift/instrumentation/requests/e2e-tests/src/app.py new file mode 100644 index 0000000..e67f741 --- /dev/null +++ b/drift/instrumentation/requests/e2e-tests/src/app.py @@ -0,0 +1,267 @@ +"""Flask test app for e2e tests - Requests instrumentation testing.""" + +from concurrent.futures import ThreadPoolExecutor + +import requests +from flask import Flask, jsonify, request +from opentelemetry import context as otel_context + +from drift import TuskDrift + +# Initialize SDK +sdk = TuskDrift.initialize( + api_key="tusk-test-key", + log_level="debug", +) + +app = Flask(__name__) + + +def _run_with_context(ctx, fn, *args, **kwargs): + """Helper to run a function with OpenTelemetry context in a thread pool.""" + token = otel_context.attach(ctx) + try: + return fn(*args, **kwargs) + finally: + otel_context.detach(token) + + +# Health check endpoint +@app.route("/health", methods=["GET"]) +def health(): + return jsonify({"status": "healthy"}) + + +# GET request - simple JSON response +@app.route("/api/get-json", methods=["GET"]) +def get_json(): + """Test GET request returning JSON.""" + try: + response = requests.get("https://jsonplaceholder.typicode.com/posts/1") + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# GET request with query parameters +@app.route("/api/get-with-params", methods=["GET"]) +def get_with_params(): + """Test GET request with query parameters.""" + try: + response = requests.get( + "https://jsonplaceholder.typicode.com/comments", + params={"postId": 1, "limit": 5}, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# GET request with custom headers +@app.route("/api/get-with-headers", methods=["GET"]) +def get_with_headers(): + """Test GET request with custom headers.""" + try: + response = requests.get( + "https://httpbin.org/headers", + headers={ + "X-Custom-Header": "test-value", + "Accept": "application/json", + }, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# POST request with JSON body +@app.route("/api/post-json", methods=["POST"]) +def post_json(): + """Test POST request with JSON body.""" + try: + data = request.get_json() or {} + response = requests.post( + "https://jsonplaceholder.typicode.com/posts", + json={ + "title": data.get("title", "Test Title"), + "body": data.get("body", "Test Body"), + "userId": data.get("userId", 1), + }, + ) + return jsonify(response.json()), 201 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# POST request with form data +@app.route("/api/post-form", methods=["POST"]) +def post_form(): + """Test POST request with form-encoded data.""" + try: + response = requests.post( + "https://httpbin.org/post", + data={ + "field1": "value1", + "field2": "value2", + }, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# PUT request +@app.route("/api/put-json", methods=["PUT"]) +def put_json(): + """Test PUT request with JSON body.""" + try: + data = request.get_json() or {} + response = requests.put( + "https://jsonplaceholder.typicode.com/posts/1", + json={ + "id": 1, + "title": data.get("title", "Updated Title"), + "body": data.get("body", "Updated Body"), + "userId": data.get("userId", 1), + }, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# PATCH request +@app.route("/api/patch-json", methods=["PATCH"]) +def patch_json(): + """Test PATCH request with partial JSON body.""" + try: + data = request.get_json() or {} + response = requests.patch( + "https://jsonplaceholder.typicode.com/posts/1", + json={"title": data.get("title", "Patched Title")}, + ) + return jsonify(response.json()) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# DELETE request +@app.route("/api/delete", methods=["DELETE"]) +def delete_resource(): + """Test DELETE request.""" + try: + response = requests.delete("https://jsonplaceholder.typicode.com/posts/1") + return jsonify({"status": "deleted", "status_code": response.status_code}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Sequential chained requests +@app.route("/api/chain", methods=["GET"]) +def chain_requests(): + """Test sequential chained requests.""" + try: + # First request: get a user + user_response = requests.get("https://jsonplaceholder.typicode.com/users/1") + user = user_response.json() + + # Second request: get posts by that user + posts_response = requests.get( + "https://jsonplaceholder.typicode.com/posts", + params={"userId": user["id"]}, + ) + posts = posts_response.json() + + # Third request: get comments on the first post + if posts: + comments_response = requests.get(f"https://jsonplaceholder.typicode.com/posts/{posts[0]['id']}/comments") + comments = comments_response.json() + else: + comments = [] + + return jsonify( + { + "user": user, + "post_count": len(posts), + "first_post_comments": len(comments), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Parallel requests with ThreadPoolExecutor +@app.route("/api/parallel", methods=["GET"]) +def parallel_requests(): + """Test parallel requests with context propagation.""" + ctx = otel_context.get_current() + + with ThreadPoolExecutor(max_workers=3) as executor: + # Run three requests in parallel with context propagation + posts_future = executor.submit( + _run_with_context, + ctx, + requests.get, + "https://jsonplaceholder.typicode.com/posts/1", + ) + users_future = executor.submit( + _run_with_context, + ctx, + requests.get, + "https://jsonplaceholder.typicode.com/users/1", + ) + comments_future = executor.submit( + _run_with_context, + ctx, + requests.get, + "https://jsonplaceholder.typicode.com/comments/1", + ) + + posts_response = posts_future.result() + users_response = users_future.result() + comments_response = comments_future.result() + + return jsonify( + { + "post": posts_response.json(), + "user": users_response.json(), + "comment": comments_response.json(), + } + ) + + +# Request with timeout +@app.route("/api/with-timeout", methods=["GET"]) +def with_timeout(): + """Test request with explicit timeout.""" + try: + response = requests.get( + "https://jsonplaceholder.typicode.com/posts/1", + timeout=10, + ) + return jsonify(response.json()) + except requests.Timeout: + return jsonify({"error": "Request timed out"}), 504 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Multiple content types +@app.route("/api/text-response", methods=["GET"]) +def text_response(): + """Test request that returns text/plain.""" + try: + response = requests.get("https://httpbin.org/robots.txt") + return jsonify( + { + "content": response.text, + "content_type": response.headers.get("Content-Type"), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + sdk.mark_app_as_ready() + app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/requests/e2e-tests/src/test_requests.py b/drift/instrumentation/requests/e2e-tests/src/test_requests.py new file mode 100644 index 0000000..e2df396 --- /dev/null +++ b/drift/instrumentation/requests/e2e-tests/src/test_requests.py @@ -0,0 +1,73 @@ +"""Execute test requests against the Flask app to exercise the requests instrumentation.""" + +import time + +import requests + +BASE_URL = "http://localhost:8000" + + +def make_request(method, endpoint, **kwargs): + """Make HTTP request and log result.""" + url = f"{BASE_URL}{endpoint}" + print(f"-> {method} {endpoint}") + + # Set default timeout if not provided + kwargs.setdefault("timeout", 30) + response = requests.request(method, url, **kwargs) + print(f" Status: {response.status_code}") + time.sleep(0.5) # Small delay between requests + return response + + +if __name__ == "__main__": + print("Starting test request sequence for requests instrumentation...\n") + + # Health check + make_request("GET", "/health") + + # Basic GET request - JSON response + make_request("GET", "/api/get-json") + + # GET with query parameters + make_request("GET", "/api/get-with-params") + + # GET with custom headers + make_request("GET", "/api/get-with-headers") + + # POST with JSON body + make_request( + "POST", + "/api/post-json", + json={"title": "Test Post", "body": "This is a test post body", "userId": 1}, + ) + + # POST with form data + make_request("POST", "/api/post-form") + + # PUT request + make_request( + "PUT", + "/api/put-json", + json={"title": "Updated Post", "body": "This is an updated post body", "userId": 1}, + ) + + # PATCH request + make_request("PATCH", "/api/patch-json", json={"title": "Patched Title"}) + + # DELETE request + make_request("DELETE", "/api/delete") + + # Sequential chained requests + make_request("GET", "/api/chain") + + # Parallel requests with context propagation + make_request("GET", "/api/parallel") + + # Request with timeout + make_request("GET", "/api/with-timeout") + + # Text response handling + make_request("GET", "/api/text-response") + + print("\nAll requests completed successfully") diff --git a/drift/instrumentation/requests/instrumentation.py b/drift/instrumentation/requests/instrumentation.py index 1d8721c..9b833dc 100644 --- a/drift/instrumentation/requests/instrumentation.py +++ b/drift/instrumentation/requests/instrumentation.py @@ -5,12 +5,10 @@ import base64 import json import logging -import time from typing import Any from urllib.parse import urlparse -from opentelemetry import context as otel_context -from opentelemetry.trace import Span, Status, set_span_in_context +from opentelemetry.trace import Span, Status from opentelemetry.trace import SpanKind as OTelSpanKind from opentelemetry.trace import StatusCode as OTelStatusCode @@ -37,14 +35,15 @@ def __init__(self, message: str, method: str, url: str): from ...core.data_normalization import create_mock_input_value, remove_none_values from ...core.drift_sdk import TuskDrift from ...core.json_schema_helper import DecodedType, EncodingType, SchemaMerge +from ...core.mode_utils import handle_record_mode, handle_replay_mode from ...core.tracing import TdSpanAttributes +from ...core.tracing.span_utils import CreateSpanOptions, SpanUtils from ...core.types import ( PackageType, SpanKind, SpanStatus, StatusCode, TuskDriftMode, - calling_library_context, ) from ..base import InstrumentationBase from ..http import HttpSpanData, HttpTransformEngine @@ -97,6 +96,7 @@ def patch(self, module: Any) -> None: # Store original method original_request = module.Session.request + instrumentation_self = self def patched_request(session_self, method: str, url: str, **kwargs): """Patched Session.request method.""" @@ -106,14 +106,69 @@ def patched_request(session_self, method: str, url: str, **kwargs): if sdk.mode == TuskDriftMode.DISABLED: return original_request(session_self, method, url, **kwargs) - # Get tracer and parse URL for span name - tracer = sdk.get_tracer() - parsed_url = urlparse(url) - span_name = f"{method.upper()} {parsed_url.path or '/'}" + def original_call(): + return original_request(session_self, method, url, **kwargs) - # Start OpenTelemetry span (automatically inherits parent context) - logger.debug(f"[RequestsInstrumentation] Creating span for {method.upper()} {url}") - span = tracer.start_span( + # REPLAY mode: Use handle_replay_mode for proper background request handling + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: instrumentation_self._handle_replay( + sdk, session_self, method, url, original_request, **kwargs + ), + no_op_request_handler=lambda: instrumentation_self._get_default_response(url), + is_server_request=False, + ) + + # RECORD mode: Use handle_record_mode for proper is_pre_app_start handling + return handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: instrumentation_self._handle_record( + sdk, session_self, method, url, is_pre_app_start, original_request, **kwargs + ), + span_kind=OTelSpanKind.CLIENT, + ) + + # Apply patch + module.Session.request = patched_request + logger.info("requests.Session.request instrumented") + + def _get_default_response(self, url: str) -> Any: + """Return default response for background requests in REPLAY mode. + + Background requests (health checks, metrics, etc.) that happen outside + of any trace context should return a default response instead of failing. + """ + import requests + + response = requests.Response() + response.status_code = 200 + response.reason = "OK" + response.url = url + response._content = b"" + response.encoding = "utf-8" + logger.debug(f"[RequestsInstrumentation] Returning default response for background request to {url}") + return response + + def _handle_replay( + self, + sdk: TuskDrift, + session_self: Any, + method: str, + url: str, + original_request: Any, + **kwargs, + ) -> Any: + """Handle request in REPLAY mode. + + Creates a span, fetches mock response, and returns it. + Raises RuntimeError if no mock is found. + """ + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( name=span_name, kind=OTelSpanKind.CLIENT, attributes={ @@ -124,57 +179,93 @@ def patched_request(session_self, method: str, url: str, **kwargs): TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, TdSpanAttributes.IS_PRE_APP_START: not sdk.app_ready, }, + is_pre_app_start=not sdk.app_ready, ) - logger.debug(f"[RequestsInstrumentation] Span created: {span}") + ) - # Make span active - ctx = otel_context.get_current() - ctx_with_span = set_span_in_context(span, ctx) - token = otel_context.attach(ctx_with_span) + if not span_info: + raise RuntimeError(f"Error creating span in replay mode for {method} {url}") - try: - # Get span IDs for mock requests - span_context = span.get_span_context() - trace_id = format(span_context.trace_id, "032x") - span_id = format(span_context.span_id, "016x") - - # REPLAY mode: Try to get mock - if sdk.mode == TuskDriftMode.REPLAY: - mock_response = self._try_get_mock(sdk, method, url, trace_id, span_id, **kwargs) - if mock_response is not None: - return mock_response - - # ===== CHECK DROP TRANSFORMS (matches Node SDK) ===== - # Check BEFORE making the HTTP request to prevent network traffic + try: + with SpanUtils.with_span(span_info): + # Use IDs from SpanInfo (already formatted) + mock_response = self._try_get_mock( + sdk, + method, + url, + span_info.trace_id, + span_info.span_id, + **kwargs, + ) + + if mock_response is not None: + return mock_response + + # No mock found - raise error in REPLAY mode + raise RuntimeError(f"No mock found for {method} {url} in REPLAY mode") + finally: + span_info.span.end() + + def _handle_record( + self, + sdk: TuskDrift, + session_self: Any, + method: str, + url: str, + is_pre_app_start: bool, + original_request: Any, + **kwargs, + ) -> Any: + """Handle request in RECORD mode. + + Creates a span, makes the real request, and records the response. + """ + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( + name=span_name, + kind=OTelSpanKind.CLIENT, + attributes={ + TdSpanAttributes.NAME: span_name, + TdSpanAttributes.PACKAGE_NAME: parsed_url.scheme, + TdSpanAttributes.INSTRUMENTATION_NAME: "RequestsInstrumentation", + TdSpanAttributes.SUBMODULE_NAME: method.upper(), + TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, + TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start, + }, + is_pre_app_start=is_pre_app_start, + ) + ) + + if not span_info: + # Span creation failed (trace blocked, etc.) - just make the request + return original_request(session_self, method, url, **kwargs) + + try: + with SpanUtils.with_span(span_info): + # Check drop transforms BEFORE making the request if self._transform_engine and self._transform_engine.should_drop_outbound_request( method.upper(), url, kwargs.get("headers", {}) ): # Request should be dropped - mark span and raise exception - span.set_attribute( + span_info.span.set_attribute( TdSpanAttributes.OUTPUT_VALUE, - json.dumps( - { - "bodyProcessingError": "dropped", - } - ), + json.dumps({"bodyProcessingError": "dropped"}), ) - span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) - span.end() - - # Raise exception to indicate drop (matches Node SDK behavior) + span_info.span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) raise RequestDroppedByTransform( f"Outbound request to {url} was dropped by transform rule", method.upper(), url, ) - # RECORD mode or mock not found: Make real request - start_time_ns = time.time_ns() + # Make the real request error = None response = None - # Set calling_library_context to prevent socket instrumentation warnings - calling_lib_token = calling_library_context.set("RequestsInstrumentation") try: response = original_request(session_self, method, url, **kwargs) return response @@ -182,27 +273,17 @@ def patched_request(session_self, method: str, url: str, **kwargs): error = e raise finally: - calling_library_context.reset(calling_lib_token) # Finalize span with request/response data - (time.time_ns() - start_time_ns) / 1_000_000 self._finalize_span( - span, + span_info.span, method, url, response, error, kwargs, ) - finally: - # Detach context and end span - otel_context.detach(token) - logger.debug(f"[RequestsInstrumentation] Ending span for {method.upper()} {url}") - span.end() - logger.debug("[RequestsInstrumentation] Span ended") - - # Apply patch - module.Session.request = patched_request - logger.info("requests.Session.request instrumented") + finally: + span_info.span.end() def _encode_body_to_base64(self, body_data: Any) -> tuple[str | None, int]: """Encode body data to base64 string. @@ -380,7 +461,17 @@ def _create_mock_response(self, mock_data: dict[str, Any], url: str) -> Any: response.url = url # Set headers - headers = mock_data.get("headers", {}) + headers = dict(mock_data.get("headers", {})) + + # Remove content-encoding and transfer-encoding headers since the body + # was already decompressed when recorded (requests auto-decompresses) + headers_to_remove = [] + for key in headers: + if key.lower() in ("content-encoding", "transfer-encoding"): + headers_to_remove.append(key) + for key in headers_to_remove: + del headers[key] + response.headers.update(headers) # Set body - decode from base64 if needed diff --git a/drift/instrumentation/socket/instrumentation.py b/drift/instrumentation/socket/instrumentation.py index e3a1273..e7948b6 100644 --- a/drift/instrumentation/socket/instrumentation.py +++ b/drift/instrumentation/socket/instrumentation.py @@ -136,9 +136,8 @@ def _handle_socket_call(self, method_name: str, socket_self: Any) -> None: # Detect unpatched dependency: # - Must be in a SERVER span (inbound request context) - # - Must NOT be from an instrumented library (calling_library is set) - # Examples: ProtobufCommunicator, HttpxInstrumentation, RequestsInstrumentation - if span_kind == SpanKind.SERVER and calling_library is None: + # - Must NOT be from ProtobufCommunicator + if span_kind == SpanKind.SERVER and calling_library != "ProtobufCommunicator": self._log_unpatched_dependency(method_name, socket_self) def _log_unpatched_dependency(self, method_name: str, socket_self: Any) -> None: diff --git a/pyproject.toml b/pyproject.toml index dc0cee4..50a1fdb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ [project.optional-dependencies] flask = ["Flask>=3.1.2"] fastapi = ["fastapi>=0.115.6", "uvicorn>=0.34.2", "starlette<0.42.0"] +django = ["Django>=5.0"] dev = [ "Flask>=3.1.2", "fastapi>=0.115.6",