Skip to content

event-sourcing cleanup + snapshot-based catch-up#2011

Draft
AlexCheema wants to merge 1 commit intomainfrom
alexcheema/event-sourcing-snapshots
Draft

event-sourcing cleanup + snapshot-based catch-up#2011
AlexCheema wants to merge 1 commit intomainfrom
alexcheema/event-sourcing-snapshots

Conversation

@AlexCheema
Copy link
Copy Markdown
Contributor

Summary

  • Snapshot-based catch-up. Joining nodes used to replay the entire event log 1000 events at a time over NACK round-trips (~15 min for 1M events). They now bootstrap from a master-served snapshot of State and replay only the small tail.
  • Event sourcing cleanup. Split events into durable (state-modifying, persisted, ordered) and transient (streaming/notification) — durable consumers no longer react directly to events; they reconcile from state. Necessary precondition for snapshots: anything snapshot-skipped must converge from state alone.
  • Snappier liveness. Master inactivity timeout 30s → 5s, plan tick 10s → 1s, worker neighbour-ping 10s → 2s.

What changed

Stage 1 — split transient events from the durable log

  • New TransientEvent union: ChunkGenerated | InputChunkReceived | TracesCollected | TracesMerged | TaskAcknowledged.
  • New SNAPSHOT_RESPONSES and TRANSIENT_EVENTS topics; new TransientRouter for fire-and-forget pub/sub.
  • Master, Worker, API and RunnerSupervisor learn to demultiplex. Runner→supervisor mp_channel widened to Event | TransientEvent.

Stage 2 — reconciliation in place of event reactions

  • custom_model_cards: Mapping[ModelId, ModelCard] promoted to State; new apply handlers.
  • Worker's InstanceDeleted/CustomModelCard* reactions replaced by _reconcile_instance_backoff and _reconcile_custom_cards.
  • API's InstanceDeleted stream-close replaced by _reconcile_streams driven by state.tasks × state.instances.
  • KeyedBackoff.tracked_keys() added to support safe iter-during-mutation.

Stages 3–5 — snapshot transfer protocol

  • New SnapshotChunk wire type with explicit data_b64: str (Pydantic v2 default-encodes bytes as UTF-8, which dies on zstd output).
  • New RequestSnapshot command. Master encodes self.state on demand (zstd-compressed model_dump_json), slices into ~512 KiB chunks, publishes on SNAPSHOT_RESPONSES with requester_node_id filtering and SHA-256 verification.
  • SnapshotReceiver reassembles chunks, ignores stale sessions / wrong recipients, validates checksum, decodes State.
  • Worker and API issue RequestSnapshot at startup, apply the result to local state, then call EventRouter.set_buffer_start(idx + 1) so live events drain in order. Falls back to full event-log replay on timeout.
  • OrderedBuffer.fast_forward_to(idx) discards pending events covered by the snapshot.

Stage 6 — throughput tuning

  • Soft cap on RequestEventLog response (1000) kept for gossipsub burst protection — comment now reflects that snapshots make this branch a fallback.
  • NACK base/cap dropped 0.5s..10s → 0.05s..1s. With snapshots, NACKs only cover the small post-snapshot tail.

Liveness

  • Master _plan inactivity timeout 30s → 5s, tick 10s → 1s. Aligned with macmon's 1s emit cadence (5× heartbeat headroom).
  • Worker _poll_connection_updates 10s → 2s.

Bugs fixed along the way

  • Worker.shutdown() crashed if called during _fetch_snapshot before the task group entered (the pre-Stage-5 ordering put the fetch outside async with self._tg). Snapshot fetch now runs as a child task inside the group.
  • Pydantic v2 default bytes→JSON encoding broke zstd payloads. Replaced with explicit base64 round-trip.
  • RunnerSupervisor's synthetic ChunkGenerated on runner crash now flows over the transient channel.

Bench

3 hosts (james master, mike/s13/s14 joiners) over LAN libp2p, 100K events seeded on master:

Mode Time to bootstrap Notes
with snapshot ~5 s consistent across all 3 joiners 1 chunk, ~7 KB
without snapshot (EXO_DISABLE_SNAPSHOT_FETCH=1) >14 min and not done 9 – 69% range across hosts

At normal cluster scale (1500 events) the join time is also ~5 s — dominated entirely by gossipsub election convergence, not event transport. Drop/rejoin chaos test against the new 5s liveness window: master detects departures in 5–6 s consistently.

Test plan

  • uv run basedpyright 0 errors
  • uv run ruff check clean
  • nix fmt applied
  • uv run pytest 405 pass / 1 skipped
  • Same-host two-process bench: worker bootstrap idx 133 in 34 ms
  • Cross-host three-joiner parallel bench: bootstrap in 4–6 s each
  • 100K-event bench: snapshot ~5 s vs no-snapshot >14 min (incomplete)
  • Drop/rejoin chaos with all four hosts running dashboards: 5–6 s detection, sub-second bootstrap on rejoin

Notes

  • The disk-snapshot-store I'd initially designed got dropped — every node already has State in memory, so the master encodes on demand.
  • Stage 1 versioning skipped on disk (existing events.bin files get rotated on master startup before any read; archives are write-only).
  • The duplicate-nodes UX issue surfaced during chaos testing is pre-existing, from PR make node-ids unique per-session #1338 / issue Detect multiple nodes with the same node ID within a cluster #1332 (get_node_id_keypair returns Keypair.generate() instead of persisting). Out of scope.

🤖 Generated with Claude Code

Joining a long-running cluster used to replay the entire event log
1000 events at a time over NACK round-trips, which took ~15 min for
1M events. New nodes now bootstrap from a master-served snapshot of
State and only replay the small tail.

- Split the Event union into durable Event (state-modifying, persisted,
  ordered) and TransientEvent (per-request streaming/notification:
  ChunkGenerated, InputChunkReceived, TracesCollected, TracesMerged,
  TaskAcknowledged) routed over a separate TRANSIENT_EVENTS topic.
  Transients no longer touch the durable log or NACK machinery.
- Eliminate direct event reactions in event-log consumers. Worker drops
  the InstanceDeleted/CustomModelCard reactions in favour of
  reconciliation loops on state.instances and state.custom_model_cards.
  API drops the InstanceDeleted stream-close reaction in favour of a
  state.tasks x state.instances reconciliation. Promote
  custom_model_cards to State with proper apply handlers.
- Add SnapshotChunk wire type, RequestSnapshot command, and
  SNAPSHOT_RESPONSES topic. Master encodes its in-memory State on
  demand (zstd JSON), slices into ~512 KiB chunks, publishes per
  request. Receiver verifies SHA-256 and reassembles.
- Worker/API request a snapshot at startup, apply, fast-forward the
  EventRouter buffer to last_event_applied_idx + 1, then receive only
  the tail via NACK. Falls back to full replay on timeout.
- Tighter responsiveness: master inactivity timeout 30s -> 5s, plan
  tick 10s -> 1s, worker neighbour ping 10s -> 2s. macmon emits every
  1s so 5s gives 5x heartbeat headroom.
- Bug fixes found while building this: Worker.shutdown crashed if
  called during _fetch_snapshot before the task group entered;
  Pydantic v2 default-encodes bytes as UTF-8 strings which dies on
  zstd output, so SnapshotChunk uses an explicit base64 str field
  with from_data/data helpers.
- Throughput: drop hardcoded 1000-event RequestEventLog cap (replaced
  with safety valve); tighten NACK base/cap from 0.5s..10s to
  0.05s..1s (snapshots carry the bulk now).

Bench (3 hosts, 100K events on master):
  with snapshot:    ~5s consistent across all joiners (8KB transfer)
  without snapshot: still incomplete after 14 min (9-69% per host)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant