Skip to content

Commit 1757548

Browse files
committed
Pointwise Mode
1 parent 859dce2 commit 1757548

File tree

4 files changed

+199
-6
lines changed

4 files changed

+199
-6
lines changed

eval_protocol/adapters/fireworks_tracing.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,11 +343,11 @@ def get_evaluation_rows(
343343
# Remove None values
344344
params = {k: v for k, v in params.items() if v is not None}
345345

346-
# Make request to proxy
346+
# Make request to proxy (using pointwise for efficiency)
347347
if self.project_id:
348-
url = f"{self.base_url}/v1/project_id/{self.project_id}/traces"
348+
url = f"{self.base_url}/v1/project_id/{self.project_id}/traces/pointwise"
349349
else:
350-
url = f"{self.base_url}/v1/traces"
350+
url = f"{self.base_url}/v1/traces/pointwise"
351351

352352
headers = {"Authorization": f"Bearer {os.environ.get('FIREWORKS_API_KEY')}"}
353353

@@ -367,7 +367,7 @@ def get_evaluation_rows(
367367
except Exception: # In case e.response.json() fails
368368
error_msg = f"Proxy error: {e.response.text}"
369369

370-
logger.error("Failed to fetch traces from proxy: %s", error_msg)
370+
logger.error("Failed to fetch traces from proxy (HTTP %s): %s", e.response.status_code, error_msg)
371371
return eval_rows
372372
except requests.exceptions.RequestException as e:
373373
# Non-HTTP errors (network issues, timeouts, etc.)

eval_protocol/proxy/README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,26 @@ The `encoded_base_url` is base64-encoded URL string injected into the request bo
166166

167167
### Trace Fetching
168168

169-
#### Fetch Langfuse Traces
169+
#### Fetch All Langfuse Traces
170170
```
171171
GET /traces?tags=rollout_id:abc123
172+
GET /v1/traces?tags=rollout_id:abc123
172173
GET /project_id/{project_id}/traces?tags=rollout_id:abc123
174+
GET /v1/project_id/{project_id}/traces?tags=rollout_id:abc123
173175
```
174176

177+
Waits for all expected insertion_ids to complete before returning all traces.
178+
179+
#### Fetch Latest Langfuse Trace (Pointwise)
180+
```
181+
GET /traces/pointwise?tags=rollout_id:abc123
182+
GET /v1/traces/pointwise?tags=rollout_id:abc123
183+
GET /project_id/{project_id}/traces/pointwise?tags=rollout_id:abc123
184+
GET /v1/project_id/{project_id}/traces/pointwise?tags=rollout_id:abc123
185+
```
186+
187+
Returns only the latest trace (UUID v7 time-ordered). Much faster for pointwise evaluations where you only need the final accumulated result.
188+
175189
**Required Query Parameters:**
176190
- `tags`: Array of tags (must include at least one `rollout_id:*` tag)
177191

eval_protocol/proxy/proxy_core/app.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from .models import ProxyConfig, LangfuseTracesResponse, TracesParams, ChatParams, ChatRequestHook, TracesRequestHook
1717
from .auth import AuthProvider, NoAuthProvider
1818
from .litellm import handle_chat_completion, proxy_to_litellm
19-
from .langfuse import fetch_langfuse_traces
19+
from .langfuse import fetch_langfuse_traces, pointwise_fetch_langfuse_trace
2020

2121
# Configure logging before any other imports (so all modules inherit this config)
2222
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
@@ -267,6 +267,27 @@ async def get_langfuse_traces(
267267
params=params,
268268
)
269269

270+
@app.get("/traces/pointwise", response_model=LangfuseTracesResponse)
271+
@app.get("/v1/traces/pointwise", response_model=LangfuseTracesResponse)
272+
@app.get("/project_id/{project_id}/traces/pointwise", response_model=LangfuseTracesResponse)
273+
@app.get("/v1/project_id/{project_id}/traces/pointwise", response_model=LangfuseTracesResponse)
274+
async def pointwise_get_langfuse_trace(
275+
request: Request,
276+
params: TracesParams = Depends(get_traces_params),
277+
project_id: Optional[str] = None,
278+
config: ProxyConfig = Depends(get_config),
279+
redis_client: redis.Redis = Depends(get_redis),
280+
_: None = Depends(require_auth),
281+
) -> LangfuseTracesResponse:
282+
if project_id is not None:
283+
params.project_id = project_id
284+
return await pointwise_fetch_langfuse_trace(
285+
config=config,
286+
redis_client=redis_client,
287+
request=request,
288+
params=params,
289+
)
290+
270291
# Health
271292
@app.get("/health")
272293
async def health():

eval_protocol/proxy/proxy_core/langfuse.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,3 +366,161 @@ async def fetch_langfuse_traces(
366366
raise
367367
except Exception as e:
368368
raise HTTPException(status_code=500, detail=f"Error fetching traces from Langfuse: {str(e)}")
369+
370+
371+
async def pointwise_fetch_langfuse_trace(
372+
config: ProxyConfig,
373+
redis_client: redis.Redis,
374+
request: Request,
375+
params: TracesParams,
376+
):
377+
"""
378+
Fetch the latest trace from Langfuse for the specified project.
379+
380+
Since insertion_ids are UUID v7 (time-ordered), we only fetch the last one
381+
as it contains all accumulated information from the pointwise evaluation.
382+
383+
Returns a single trace object or raises if not found.
384+
"""
385+
386+
# Preprocess traces request
387+
if config.preprocess_traces_request:
388+
params = config.preprocess_traces_request(request, params)
389+
390+
tags = params.tags
391+
project_id = params.project_id
392+
user_id = params.user_id
393+
session_id = params.session_id
394+
name = params.name
395+
environment = params.environment
396+
version = params.version
397+
release = params.release
398+
fields = params.fields
399+
hours_back = params.hours_back
400+
from_timestamp = params.from_timestamp
401+
to_timestamp = params.to_timestamp
402+
sleep_between_gets = params.sleep_between_gets
403+
max_retries = params.max_retries
404+
405+
# Use default project if not specified
406+
if project_id is None:
407+
project_id = config.default_project_id
408+
409+
# Validate project_id
410+
if project_id not in config.langfuse_keys:
411+
raise HTTPException(
412+
status_code=404,
413+
detail=f"Project ID '{project_id}' not found. Available projects: {list(config.langfuse_keys.keys())}",
414+
)
415+
416+
# Extract rollout_id from tags for Redis lookup
417+
rollout_id = _extract_tag_value(tags, "rollout_id:")
418+
419+
try:
420+
# Import the Langfuse adapter
421+
from langfuse import Langfuse
422+
423+
# Create Langfuse client with the project's keys
424+
logger.debug(f"Connecting to Langfuse at {config.langfuse_host} for project '{project_id}'")
425+
langfuse_client = Langfuse(
426+
public_key=config.langfuse_keys[project_id]["public_key"],
427+
secret_key=config.langfuse_keys[project_id]["secret_key"],
428+
host=config.langfuse_host,
429+
)
430+
431+
# Parse datetime strings if provided
432+
from_ts = None
433+
to_ts = None
434+
if from_timestamp:
435+
from_ts = datetime.fromisoformat(from_timestamp.replace("Z", "+00:00"))
436+
if to_timestamp:
437+
to_ts = datetime.fromisoformat(to_timestamp.replace("Z", "+00:00"))
438+
439+
# Determine time window: explicit from/to takes precedence over hours_back
440+
if from_ts is None and to_ts is None and hours_back:
441+
to_ts = datetime.now()
442+
from_ts = to_ts - timedelta(hours=hours_back)
443+
444+
# Get expected insertion_ids from Redis for completeness checking
445+
expected_ids: Set[str] = set()
446+
if rollout_id:
447+
expected_ids = get_insertion_ids(redis_client, rollout_id)
448+
logger.info(f"Pointwise fetch for rollout_id '{rollout_id}', expecting {len(expected_ids)} insertion_ids")
449+
if not expected_ids:
450+
logger.warning(
451+
f"No expected insertion_ids found in Redis for rollout '{rollout_id}'. Returning empty trace."
452+
)
453+
raise HTTPException(
454+
status_code=500,
455+
detail=f"No expected insertion_ids found in Redis for rollout '{rollout_id}'. Returning empty trace.",
456+
)
457+
458+
# Get the latest (last) insertion_id since UUID v7 is time-ordered
459+
latest_insertion_id = max(expected_ids) # UUID v7 max = newest
460+
logger.info(f"Targeting latest insertion_id (last5): {latest_insertion_id[-5:]} for rollout '{rollout_id}'")
461+
462+
for retry in range(max_retries):
463+
# Fetch trace list targeting the latest insertion_id
464+
traces = await _fetch_trace_list_with_retry(
465+
langfuse_client,
466+
page=1,
467+
limit=1, # Only need the one trace
468+
tags=[f"insertion_id:{latest_insertion_id}"],
469+
user_id=user_id,
470+
session_id=session_id,
471+
name=name,
472+
environment=environment,
473+
version=version,
474+
release=release,
475+
fields=fields,
476+
from_ts=from_ts,
477+
to_ts=to_ts,
478+
max_retries=max_retries,
479+
)
480+
481+
if traces and traces.data:
482+
# Get the trace info
483+
trace_info = traces.data[0]
484+
logger.debug(f"Found trace {trace_info.id} for latest insertion_id {latest_insertion_id[-5:]}")
485+
486+
# Fetch full trace details
487+
trace_full = await _fetch_trace_detail_with_retry(
488+
langfuse_client,
489+
trace_info.id,
490+
max_retries,
491+
)
492+
493+
if trace_full:
494+
trace_dict = _serialize_trace_to_dict(trace_full)
495+
logger.info(
496+
f"Successfully fetched latest trace for rollout '{rollout_id}', insertion_id (last5): {latest_insertion_id[-5:]}"
497+
)
498+
return LangfuseTracesResponse(
499+
project_id=project_id,
500+
total_traces=1,
501+
traces=[TraceResponse(**trace_dict)],
502+
)
503+
504+
# If not successful and not last retry, sleep and continue
505+
if retry < max_retries - 1:
506+
wait_time = 2**retry
507+
logger.info(
508+
f"Pointwise fetch attempt {retry + 1}/{max_retries} failed for rollout '{rollout_id}', insertion_id (last5): {latest_insertion_id[-5:]}. Retrying in {wait_time}s..."
509+
)
510+
await asyncio.sleep(wait_time)
511+
512+
# After all retries failed
513+
logger.error(
514+
f"Failed to fetch latest trace for rollout '{rollout_id}', insertion_id (last5): {latest_insertion_id[-5:]} after {max_retries} retries"
515+
)
516+
raise HTTPException(
517+
status_code=404,
518+
detail=f"Failed to fetch latest trace for rollout '{rollout_id}' after {max_retries} retries",
519+
)
520+
521+
except ImportError:
522+
raise HTTPException(status_code=500, detail="Langfuse SDK not installed. Install with: pip install langfuse")
523+
except HTTPException:
524+
raise
525+
except Exception as e:
526+
raise HTTPException(status_code=500, detail=f"Error fetching latest trace from Langfuse: {str(e)}")

0 commit comments

Comments
 (0)