Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ JINA_API_KEY=
# =============================================
# --- tracing ---
# OTEL endpoint (Optional), ref https://arize.com/docs/phoenix if use Phoenix
TRACING_PROVIDER=phoenix
PHOENIX_ENDPOINT=
PHOENIX_PROJECT_NAME=youtu_agent
# additional key if use Phoenix's cloud service. https://app.phoenix.arize.com/s/_space_name_/settings/general
Expand Down
12 changes: 12 additions & 0 deletions .env.full
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,23 @@ UTU_DISABLE_TOOL_CACHE=
# =============================================
# we use phoenix by default, ref https://arize.com/docs/phoenix
# OTEL endpoint, e.g. http://localhost:6006/v1/traces.
TRACING_PROVIDER=phoenix
PHOENIX_ENDPOINT=
PHOENIX_PROJECT_NAME=
# additional key if use Phoenix's cloud service. https://app.phoenix.arize.com/s/_space_name_/settings/general
PHOENIX_API_KEY=

# --- Langfuse configuration ---
# ref https://langfuse.com/docs/opentelemetry/get-started
# Langfuse base URL, e.g. https://cloud.langfuse.com or https://your-selfhosted-langfuse.com
LANGFUSE_BASE_URL=https://cloud.langfuse.com
# Langfuse API keys (required for Langfuse tracing)
# Get from: https://cloud.langfuse.com or your self-hosted instance
LANGFUSE_PUBLIC_KEY=
LANGFUSE_SECRET_KEY=
# Optional: project name for Langfuse
LANGFUSE_PROJECT_NAME=

# database url (Optional), e.g., SQLite, PostgreSQL. DB is used to store tracing and evaluation data
UTU_DB_URL=sqlite:///test.db
# log level
Expand Down
62 changes: 62 additions & 0 deletions tests/tracing/test_langfuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from utu.tracing.langfuse_utils import LangfuseUtils

langfuse_utils = LangfuseUtils()


def test_get_otlp_endpoint():
"""Test that the OTLP endpoint is correctly formatted."""
endpoint = langfuse_utils.get_otlp_endpoint()
print(f"OTLP Endpoint: {endpoint}")
assert endpoint.endswith("/api/public/otel/v1/traces"), f"Unexpected endpoint: {endpoint}"


def test_get_otlp_headers():
"""Test that the OTLP headers contain Authorization."""
headers = langfuse_utils.get_otlp_headers()
print(f"OTLP Headers: {headers}")
assert "Authorization" in headers, "Authorization header is missing"
assert headers["Authorization"].startswith("Basic "), "Authorization should use Basic auth"


def test_get_trace_url():
"""Test that the trace URL is correctly formatted."""
trace_id = "test-trace-id-12345"
url = langfuse_utils.get_trace_url(trace_id)
print(f"Trace URL: {url}")
assert url is not None, "Trace URL should not be None"
assert trace_id in url, f"Trace ID should be in URL: {url}"
assert "/trace/" in url, f"URL should contain '/trace/': {url}"


def test_create_span_exporter():
"""Test that the span exporter is created correctly."""
exporter = langfuse_utils.create_span_exporter()
print(f"Span Exporter: {exporter}")
assert exporter is not None, "Span exporter should not be None"
assert hasattr(exporter, "_endpoint"), "Exporter should have _endpoint attribute"
assert exporter._endpoint.endswith("/api/public/otel/v1/traces"), (
f"Unexpected exporter endpoint: {exporter._endpoint}"
)


if __name__ == "__main__":
print("Testing LangfuseUtils...")
print("-" * 50)

test_get_otlp_endpoint()
print("✓ test_get_otlp_endpoint passed")
print("-" * 50)

test_get_otlp_headers()
print("✓ test_get_otlp_headers passed")
print("-" * 50)

test_get_trace_url()
print("✓ test_get_trace_url passed")
print("-" * 50)

test_create_span_exporter()
print("✓ test_create_span_exporter passed")
print("-" * 50)

print("\nAll tests passed!")
10 changes: 9 additions & 1 deletion utu/tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from .langfuse_utils import LangfuseUtils
from .phoenix_utils import PhoenixUtils
from .setup import setup_db_tracing, setup_otel_tracing, setup_tracing

__all__ = ["setup_otel_tracing", "setup_db_tracing", "setup_tracing"]
__all__ = [
"setup_otel_tracing",
"setup_db_tracing",
"setup_tracing",
"PhoenixUtils",
"LangfuseUtils",
]
79 changes: 79 additions & 0 deletions utu/tracing/langfuse_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os

from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter


class LangfuseUtils:
"""Utilities for Langfuse tracing integration.

Langfuse supports OpenTelemetry protocol for trace ingestion.
Ref: https://langfuse.com/docs/opentelemetry/get-started
"""

def __init__(
self,
base_url: str = None,
public_key: str = None,
secret_key: str = None,
project_name: str = None,
):
self.base_url = base_url or os.getenv("LANGFUSE_BASE_URL", "https://cloud.langfuse.com")
self.public_key = public_key or os.getenv("LANGFUSE_PUBLIC_KEY", "")
self.secret_key = secret_key or os.getenv("LANGFUSE_SECRET_KEY", "")
self.project_name = project_name or os.getenv("LANGFUSE_PROJECT_NAME", "")

if not self.public_key or not self.secret_key:
raise ValueError("LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY must be set")

print(f"Using Langfuse base url: {self.base_url} with project name: {self.project_name}")

def get_otlp_endpoint(self) -> str:
"""Get the OTLP endpoint for Langfuse.

Returns:
The OTLP HTTP endpoint URL for sending traces.
"""
# Langfuse OTLP endpoint path: /api/public/otel/v1/traces
# Ref: https://langfuse.com/docs/opentelemetry/get-started
return f"{self.base_url.rstrip('/')}/api/public/otel/v1/traces"

def get_otlp_headers(self) -> dict[str, str]:
"""Get the OTLP headers for authentication.

Langfuse uses Basic authentication with public_key:secret_key.

Returns:
Dictionary containing the Authorization header.
"""
import base64

credentials = base64.b64encode(f"{self.public_key}:{self.secret_key}".encode()).decode()
headers = {"Authorization": f"Basic {credentials}"}
if self.project_name:
headers["X-Langfuse-Project-Name"] = self.project_name
return headers

def create_span_exporter(self) -> OTLPSpanExporter:
"""Create an OTLP span exporter configured for Langfuse.

Returns:
Configured OTLPSpanExporter instance.
"""
return OTLPSpanExporter(
endpoint=self.get_otlp_endpoint(),
headers=self.get_otlp_headers(),
)

def get_trace_url(self, trace_id: str) -> str | None:
"""Get the URL to view a trace in Langfuse UI.

Args:
trace_id: The trace ID to look up.

Returns:
URL to the trace in Langfuse UI, or None if not available.
"""
if not trace_id:
return None
# Langfuse trace URL format
return f"{self.base_url.rstrip('/')}/trace/{trace_id}"