@@ -236,6 +236,7 @@ async def fetch_langfuse_traces(
236236 expected_ids : Set [str ] = set ()
237237 if rollout_id :
238238 expected_ids = get_insertion_ids (redis_client , rollout_id )
239+ logger .info (f"Fetching traces for rollout_id '{ rollout_id } ', expecting { len (expected_ids )} insertion_ids" )
239240 if not expected_ids :
240241 logger .warning (
241242 f"No expected insertion_ids found in Redis for rollout '{ rollout_id } '. Returning empty traces."
@@ -258,7 +259,9 @@ async def fetch_langfuse_traces(
258259 # Build targeted tags for missing insertion_ids
259260 missing_ids = expected_ids - insertion_ids
260261 fetch_tags = [f"insertion_id:{ id } " for id in missing_ids ]
261- logger .info (f"Retry { retry } : Targeting { len (fetch_tags )} missing insertion_ids" )
262+ logger .info (
263+ f"Retry { retry } : Targeting { len (fetch_tags )} missing insertion_ids for rollout '{ rollout_id } ': { [id [:5 ] for id in sorted (missing_ids )[:10 ]]} { '...' if len (missing_ids ) > 10 else '' } "
264+ )
262265
263266 current_page = 1
264267 collected = 0
@@ -313,6 +316,7 @@ async def fetch_langfuse_traces(
313316 insertion_id = _extract_tag_value (trace_dict .get ("tags" , []), "insertion_id:" )
314317 if insertion_id :
315318 insertion_ids .add (insertion_id )
319+ logger .debug (f"Found insertion_id '{ insertion_id } ' for rollout '{ rollout_id } '" )
316320
317321 except Exception as e :
318322 logger .warning ("Failed to serialize trace %s: %s" , trace_info .id , e )
@@ -331,8 +335,12 @@ async def fetch_langfuse_traces(
331335
332336 # If we have all expected completions or more, return traces. At least once is ok.
333337 if expected_ids <= insertion_ids :
338+ logger .info (
339+ f"Traces complete for rollout '{ rollout_id } ': { len (insertion_ids )} /{ len (expected_ids )} insertion_ids found, returning { len (all_traces )} traces"
340+ )
334341 if sample_size is not None and len (all_traces ) > sample_size :
335342 all_traces = random .sample (all_traces , sample_size )
343+ logger .info (f"Sampled down to { sample_size } traces" )
336344
337345 return LangfuseTracesResponse (
338346 project_id = project_id ,
@@ -343,8 +351,9 @@ async def fetch_langfuse_traces(
343351 # If it doesn't match, wait and do loop again (exponential backoff)
344352 if retry < max_retries - 1 :
345353 wait_time = 2 ** retry
354+ still_missing = expected_ids - insertion_ids
346355 logger .info (
347- f"Attempt { retry + 1 } /{ max_retries } . Found { len (insertion_ids )} /{ len (expected_ids )} expected . Waiting { wait_time } s..."
356+ f"Attempt { retry + 1 } /{ max_retries } . Found { len (insertion_ids )} /{ len (expected_ids )} for rollout ' { rollout_id } '. Still missing: { [ id [: 5 ] for id in sorted ( still_missing )[: 10 ]] } { '...' if len ( still_missing ) > 10 else '' } . Waiting { wait_time } s..."
348357 )
349358 await asyncio .sleep (wait_time )
350359
0 commit comments