Skip to content

perf(orchestrator): reduce event-loop lag#2608

Closed
mikasenghaas wants to merge 39 commits into
mainfrom
perf/r3
Closed

perf(orchestrator): reduce event-loop lag#2608
mikasenghaas wants to merge 39 commits into
mainfrom
perf/r3

Conversation

@mikasenghaas
Copy link
Copy Markdown
Member

@mikasenghaas mikasenghaas commented May 23, 2026

Summary

Reduce orchestrator step-boundary event-loop lag, motivated by router-replay runs (Qwen3-30B-A3B + GLM-5.1) where the per-step routed_experts payload (~25 GB across 256 rollouts) caused 5–40 s loop-lag spikes at step boundaries.

Changes are all in the orchestrator and its post-rollout pipeline. The trainer/inference paths are untouched.

Coupled with PrimeIntellect-ai/verifiers#1453 (perf/r3-serve). The verifiers PR fixes the env-side counterpart of the same lag (env_worker / env_router), which this PR's verifiers submodule bump pulls in. Land or revert together — without the verifiers side the env worker loop saturates first under 256-rollout concurrency and the orch-side gains here don't surface.

What's in here

  • Async sender + to_thread advantages/filters + uvloop (0ed04c879)
  • Shared ProcessPoolExecutor plumbing (62bbd7b5a) — kept opt-in via use_process_pool; tradeoffs documented in the config field.
  • Eager pool init + p99 lag metric (e8ec02c4d)
  • Defer routed_experts concat in interleave_rollout (d0f7b1372) — per-step unpack → concat → repack was O(N²) byte copies (~2.3 GB memcpy / rollout in the worst case). New path keeps a per-sample chunk list and concats once at finalize. O(N) byte work.
  • Per-phase timing + dump_raw_rollouts for offline replay (668c18401)
  • orjson save_rollouts, dedup bool conversion, chunked gather (28c3e6d18):
    • json.dumporjson.dumps. The pure-Python serializer held the GIL for the entire 1.4 s of save in replay (1.36 s lag spike); prod hit a 15 s save_rollouts on one step.
    • Dropped redundant bool() conversion in make_sample/extend_sample; switched the remaining one in prepare_step_tokens to list(map(bool, ...)) (~3× faster).
    • New gather_chunk_size config option chunks the interleave_rollout fanout with await sleep(0) between batches so the loop gets guaranteed slices. Offline bench: chunk=128 cut max loop lag 2.6× under simulated traffic for ~11 % wallclock cost.
  • Pretokenize call-site skip + 10 Hz lag monitor + offline harness (afa3a610c):
    • Pretokenize was a no-op for router-replay (every step has tokens populated), but the 256-way to_thread fanout starved the loop for ~2 s per step. Skip the fanout entirely when nothing needs work. Offline attribution: pretokenize lag drops 2050 ms → 11 ms.
    • EventLoopLagMonitor sampling 1 Hz → 10 Hz (mirrors verifiers); 1 Hz was missing sub-second spikes.
    • scripts/perf/ — offline harness used to find these:
      • replay_orch_step.py and replay_orch_attribution.py replay the step boundary against a raw_rollouts/step_N.pkl. The attribution script runs each phase alone with 1 ms-granularity lag tracking so blockers are attributed by phase, not just by wallclock.
      • --synthetic-traffic N spawns N background loop tasks to mimic prod's oversubscription regime (active rollouts, recv handlers, inference replies).
  • Verifiers submodule bump → 445f379 — pulls in PR 1453 (env_worker / env_router loop fixes). Required for the orch-side gains above to be visible under realistic concurrency.

How to validate / iterate

Enable raw-rollout dumping:

[orchestrator]
dump_raw_rollouts = true
gather_chunk_size = 128

Then run the attribution against a captured step:

uv run python scripts/perf/replay_orch_attribution.py \
  --dump /path/to/raw_rollouts/step_8.pkl \
  --gather-chunk-size 128 --synthetic-traffic 200 --quick

README.md under scripts/perf/ has the full story.

What's NOT in here (deferred / open questions)

  • Killing base64 on the routed_experts wire. Profile showed pybase64.b64decode_as_bytearray is ~73 % of single-thread CPU in interleave_rollout. It releases the GIL so it parallelises and doesn't block the loop directly, but it's the dominant wallclock floor. Moving inference→orch transport to msgpack bin would eliminate the encode + decode. Cross-repo, deferred.
  • Changing TrainingSample.{prompt,completion}_mask from list[bool] to list[int] (or bytes). Would kill the last Python loop in prepare_step_tokens. Interface change, deferred until empirical evidence it matters for loop lag.

S1ro1 and others added 30 commits May 13, 2026 17:41
…wth (#2527)

numpy/pandas allocate array data via malloc() (outside Python's allocator),
so gc.collect() alone doesn't reclaim RSS after per-step DataFrames are freed.
glibc retains freed pages in its internal pool, causing ~+6 MB/step monotonic
RssAnon growth on the orchestrator process.

malloc_trim(0) forces glibc to return freed heap pages to the OS, producing
a stable sawtooth pattern (peak during rollout generation, drops after trim)
with no upward trend.

Verified on alphabet-sort (512 rollouts/step, 20 steps, no wandb):
- Without fix: 941 MB → 973 MB by step 3 (+6 MB/step, unbounded)
- With fix:    941 MB → 942 MB by step 5 (flat, sawtooth only)

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…ut error (#2590)

* feat(scheduler): train on partial groups instead of dropping on rollout error

When an individual-scoring env returns N rollouts for a single example and one
errors out, scrap only the failed rollout — keep the survivors and ship the
group through as soon as every dispatched rollout has come back (success or
failure). Group-scoring envs still drop the whole group on any failure because
their per-rollout scores are computed against the now-missing rollouts.

To make variable-size groups round-trip through advantage computation, group
rollouts by (env_name, example_id) instead of positional slicing, and bucket
groups by size so each advantage_fn call still sees a uniform 2D rewards
tensor. Singleton groups produce zero advantage and get filtered out by the
existing zero-advantage filter — no special-casing.

Closes #2585.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(advantage): make advantage_fn per-group

Drop the bucket-by-size workaround in compute_advantages by changing the
advantage_fn contract: AdvantageInputs.rollouts is now a single group
(list[RolloutOutput]) and AdvantageOutputs.advantages is 1D. The framework
calls advantage_fn once per group, which works cleanly for variable-size
groups (partial-group training).

BREAKING: second change to this public API in three weeks. Custom advantage
functions must drop the outer list dim. Migration documented in CHANGELOG.md
and docs/bring-your-own-algorithms.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(advantage): return list[float] from advantage_fn

Drops the torch tensor from the public AdvantageOutputs contract; internal
math stays in torch and converts via .tolist() at the boundary. Same partial-
group support, simpler downstream consumers (no more .tolist() / no shape
gymnastics in custom advantages).

BREAKING (folds into the per-group change in the previous commit): custom
advantage functions must return AdvantageOutputs(advantages=[...]) rather
than a tensor. CHANGELOG entry and docs example updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(scheduler): log each rollout failure inline, drop GroupState.last_failure_reason

The reason is now logged at the moment the failure is observed (one warning
per failed rollout) instead of being stashed on the group and replayed at
finalization. Removes the per-group field entirely and avoids the "first vs
latest wins" semantic question that came up in review - each log line carries
its own actual reason. Finalization warnings only carry counts now.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(scheduler): drop verbose comment on GroupState.failed_rollouts

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…2589)

* feat(orchestrator): per-env state_columns for extra rollout fields

Adds `state_columns: list[str] = []` to `EnvConfig` so each env can
persist additional `State` fields into the saved JSONL rollouts on top
of the always-saved `trajectory` and `sampling_args`. Merged at the
call site (required first, deduped).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor: drop seen set from state_columns dedup

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each DP rank was dividing its summed token loss by its own local
loss_scale, then FSDP averaged the resulting gradients. Because ranks
process different sequence lengths, that mean is not the true per-token
mean over the global batch — ranks with fewer loss tokens get implicitly
upweighted.

Mirror the SFT trainer fix (src/prime_rl/trainer/sft/train.py:416-427):
all-reduce the local token count across dp_cp, divide by that global
denominator on every rank, and multiply grads by fsdp_gradient_divide_factor
after the microbatch loop so FSDP's per-rank averaging is undone and the
final gradient is the per-token mean over the global batch.

Closes #2358. Adapted from #2359, which first diagnosed the bias and
proposed the all-reduce-then-rescale approach.

Co-authored-by: irfanjamil <irfanjamil9@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* add per-env trainer metrics

* require env names for trainer batches

* address per-env metric review feedback

* reuse tensor stats for env metrics

* fix sft trajectory env-name fixture

* address trainer metric naming comments

* fix: reuse trainer ratio tensors for env metrics

* fix: derive dppo mask from shared ratio

* address PR review: drop precomputed loss inputs, use {all,env} keys

- compute importance-ratio / mismatch_kl inside the loss functions instead of
  passing them in via LossInputs (per Mika)
- compute mismatch_kl inline in train.py only for per-env logging
- rename trainer aggregate keys to entropy/all and mismatch_kl/all to match the
  orchestrator {all,env}/{mean,std,max} convention; drop the leftover bare
  entropy/* and mismatch_kl/* keys
- drop the overly defensive env_names length check in DataLoader

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* guard mismatch_kl logging behind sft_loss flag

SFT batches don't have meaningful inference_logprobs (sft_loss_fn ignores
them), so computing and logging mismatch_kl for those microbatches is wasted
work and produces misleading numbers. Skip the inline mismatch_kl compute and
the per-env / debug-log emissions when sft_loss is True.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* address review: simplify probs_diff and move mismatch_kl/all to trainer loop

* reserve env_name='all' for aggregate metric keys

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* feat: add trainer token jsonl export

* chore: use docstrings for token export config
samsja and others added 9 commits May 22, 2026 03:59
Strip routed_experts.data into a raw-bytes sidecar file with shape/dtype
metadata kept in msgpack; stream-encode per-sample with asyncio.sleep(0)
between samples; perform all disk I/O via asyncio.to_thread.

Microbench (1.21 GB batch, 128 examples, 1.16 GB routed_experts):
  baseline: per-call 1.94s, event-loop max lag 3.88s
  perf/r3:  per-call 1.51s, event-loop max lag 6.3ms (~600x)

The sender's `send` method is now async (TrainingBatchSender base class
updated accordingly). The ZMQ subclass becomes `async def` as well; its
body is unchanged but the call site must `await`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…uvloop

- await training_batch_sender.send(training_batch) since sender is async (O1+O2)
- compute_advantages and apply_filters wrapped in asyncio.to_thread (O3+O4):
  both iterate ~2k rollouts in pure Python and release the GIL on every
  bytecode tick, so threading actually helps unlike the C-extension encode
- main() installs uvloop (O7) — lower scheduler overhead matters when the
  orchestrator is juggling many concurrent rollouts + the HTTP client to
  inference; measurably reduces tail latency.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The residual orch event-loop spikes (5-30s at step boundaries) survive
even after wrapping process_rollout / pretokenize / save in asyncio.to_thread:
interleave_rollout does enough Python-level work (turn iteration, list slicing,
dict construction) that 256 concurrent thread invocations contend for the GIL
with the main asyncio loop, starving it.

Add a shared ProcessPoolExecutor in orchestrator/utils.py:
- lazy-initialized on first use, so disabled config paths pay zero cost
- spawn context (no fork + CUDA/FSDP state interaction)
- atexit shutdown
- single instance shared across step boundaries

orchestrator.py: when config.use_process_pool is true, route process_rollout
through the pool via loop.run_in_executor. _process_rollout_worker is a new
module-level function (not nested) so it pickles. The existing threaded path
remains the default; gated entirely behind use_process_pool=False.

Tradeoff: ~per-call pickle/IPC cost. For non-router-replay rollouts the
payload is small and the IPC is essentially free, so this is a clear win.
For router replay (~9-25 MB routed_experts per rollout) the IPC cost may
dominate; users should measure before enabling.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Eagerly call get_process_pool() at the start of orchestrate() when
  config.use_process_pool=True so the spawn cost (worker fork + heavy import
  graph) lands at startup instead of the first step-boundary gather.
- Add event_loop_lag/p99 to the metrics dict and the busy-loop warning. p90
  on its own undersells the long tail we observe at step boundaries; p99 is
  the metric we actually compare across runs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
extend_sample's previous implementation unpacked the accumulated
routed_experts bytes, mutated one boundary entry, np.concatenated the new
step's routings, then re-packed to bytes — every extension. For an N-turn
rollout reaching seq_len 60k, the accumulated buffer reached ~23 MB and was
fully read+written N times, giving O(N²) byte work (~2.3 GB of memcpy per
rollout). At 256 rollouts/step this dominated the orch's step-boundary
event-loop stalls more than the encode path we'd already fixed.

Defer the concat: track per-sample `chunks: list[np.ndarray]` during the
extension loop and finalize once at the end. The "boundary token" entry
that vLLM omits for each request is appended as its own one-entry chunk
between consecutive steps' contributions, so the final concat is a
straight join — no destructive writes to prior chunks.

Verified byte-exact against the old implementation on a 50-step extension
chain. Per-rollout work drops from O(N²) → O(N): ~100× less memcpy in the
realistic regime.

This makes use_process_pool unnecessary for the residual orch lag: each
rollout's process_rollout cost drops below the IPC pickle cost, so the
threaded path is strictly better.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ine replay

- Per-phase wall-clock instrumentation around every step-boundary op
  (compute_advantages, apply_filters, save_rollouts, pretokenize,
  process_rollout_gather, result_collection_loop, training_batch_send,
  pandas_dataframes, metrics_aggregation, wandb_log). Single info line
  per step: "Step N phase breakdown: <name>=<sec>, ..." sorted desc by
  cost so the biggest chunk is always first. Replaces guesswork about
  what's blocking the loop with attributable data.
- OrchestratorConfig.dump_raw_rollouts (default off): pickle the raw
  list[vf.RolloutOutput] returned by scheduler.generate_batch into
  <output_dir>/raw_rollouts/step_N.pkl. Files are multi-GB under router
  replay (~6 GB at batch_size=256 × 9-25 MB per rollout), so keep it
  debug-only. Paired with scripts/perf_r3/replay_orch_step.py which
  loads the dump and runs the full post-process pipeline with per-phase
  timing — fast iteration loop for orch perf work without SLURM.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three independent wins from the offline-replay attribution against
v5 step_8.pkl (256 rollouts, max 78 turns, 25 GB routed_experts):

- save_rollouts: json.dump -> orjson.dumps + bytes write. The pure-Python
  serializer held the GIL for the entire 1.4 s save phase in replay
  (1.36 s loop-lag spike), and prod hit a 15 s save_rollouts on one
  step. orjson serializes in C and releases the GIL: save_rollouts drops
  to 0.04 s wallclock with ~1 ms loop lag.

- prepare/make/extend mask handling: [bool(i) for i in mask] was the
  remaining per-turn GIL-held Python loop. (a) make_sample/extend_sample
  were re-converting an already-bool-ified list and got swapped to
  list(...) / direct extend (~27x faster on a 15K-element mask), (b)
  prepare_step_tokens uses list(map(bool, ...)) instead of the listcomp,
  which delegates to a C-implemented map iterator (~3x faster on the
  same list). Together these drop gather wallclock 5-10% on long-tail
  rollouts.

- gather_chunk_size config: chunk the interleave_rollout to_thread
  gather and await sleep(0) between batches so the main asyncio loop
  gets guaranteed slices instead of being starved by 256 simultaneous
  GIL-contending worker threads. Offline bench against step_8.pkl with
  200 synthetic background tasks (to simulate prod's 1024 active
  rollouts + inference replies + env-router traffic): chunk=128 cut max
  loop lag 2.6x (387 ms -> 149 ms) for ~11% wallclock cost. chunk<=64
  didn't reduce lag further and cost much more wallclock. Default None
  preserves legacy behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two prod fixes plus the offline replay harness used to find them.

Pretokenize call-site skip (orchestrator.py + trajectories.py):
  Under router-replay every trajectory step has `tokens` already populated by
  the inference server, so pretokenize_rollout_trajectory is a no-op for
  every step. The orch still fans out 256 to_thread copies of this no-op per
  step, and the GIL contention from 256 simultaneous worker threads starves
  the event loop for ~2s per step.

  Fix: any(step["tokens"] is None for r in train_rollouts for step in ...) on
  the loop thread once, and skip the fanout entirely when nothing needs work.
  Added a matching all(step["tokens"] is not None) fast path inside the
  function so per-thread cost stays microseconds when the fanout *is* needed
  for a mixed batch.

  Offline attribution showed pretokenize loop lag drop 2050ms -> 11ms with
  this change. It was the second largest contributor after the gather itself.

EventLoopLagMonitor sampling 1Hz -> 10Hz (event_loop_lag.py):
  Mirrors what verifiers' env_router / env_worker use. 1Hz misses sub-second
  spikes entirely and gives one max-lag sample per second, which made it
  impossible to tell whether earlier fixes (orjson, chunked gather, etc) had
  actually moved the needle on prod loop lag. Same warning thresholds.

scripts/perf/ — offline harness this work was built on:
  - lag_monitor.py: 1ms-granularity event-loop lag sampler
  - replay_orch_step.py: replays the full step boundary against a saved
    raw_rollouts/step_N.pkl with per-phase wallclock + lag
  - replay_orch_attribution.py: per-phase-alone with optional synthetic
    background traffic (--synthetic-traffic N) to reproduce prod's
    loop-oversubscription regime
  - replay.py: encode/sender microbench (older, kept for completeness)
  - README.md: usage + "what to look for" section

  Pair with `orchestrator.dump_raw_rollouts = true` to capture inputs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@mikasenghaas mikasenghaas changed the title perf(orchestrator): reduce step-boundary event-loop lag perf(orchestrator): reduce event-loop lag May 23, 2026
@mikasenghaas
Copy link
Copy Markdown
Member Author

Superseded by #2609 — same 8 perf commits cherry-picked cleanly onto current main, plus the verifiers submodule bump to PR 1453's head. Closing this one to avoid confusion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants