Skip to content

[data] feat: add MultiSourceDataset for weighted sampling#522

Draft
hjshi84 wants to merge 10 commits intomainfrom
junhao/feature-multisource-dataset
Draft

[data] feat: add MultiSourceDataset for weighted sampling#522
hjshi84 wants to merge 10 commits intomainfrom
junhao/feature-multisource-dataset

Conversation

@hjshi84
Copy link
Copy Markdown
Collaborator

@hjshi84 hjshi84 commented Feb 28, 2026

What does this PR do?

This PR includes multiple features and fixes merged from the feature branch:

  1. [data] MultiSourceDataset: Add multi-source weighted sampling dataset with checkpoint support, including exhausted state save/restore
  2. [data] DynamicBatchingSizeDataset: Add stateful multi-worker dynamic batching

Checklist Before Starting

  • Search for similar PRs. Paste at least one query link here: ...
  • Format the PR title as [{modules}] {type}: {description} (This will be checked by the CI)
    • {modules} include misc, ci, config, docs, data, dist, omni, logging, model, optim, ckpt, release, task, perf, ops, parallel
    • If this PR involves multiple modules, separate them with , like [ci, data, model]
    • {type} is in feat, fix, refactor, chore, test
    • If this PR breaks any API (CLI arguments, config, function signature, etc.), add [BREAKING] to the beginning of the title.
    • Example: [BREAKING][parallel, model] feat: dynamic batching

Test

  • Unit tests for MultiSourceDataset covering state_dict structure, exhausted state save/restore, elastic source add/remove, and backward compatibility
  • E2E tests for interleave datasets with checkpoint/resume
  • Tests for dynamic batching datase

API and Usage Example

Demonstrate how the API changes if any, and provide usage example(s) if possible.

  from veomni.data.multisource_dataset import MultiSourceDataset

  # Multi-source weighted sampling
  dataset = MultiSourceDataset(
      datasets=[ds1, ds2],
      weights=[0.5, 0.5],
      stopping_strategy="all_exhausted",  # or "first_exhausted", "never_exhausted"
      level="token",  # token-level weighting
  )

  # Checkpoint support with exhausted state
  state = dataset.state_dict()
  dataset.load_state_dict(state)

Design & Code Changes

  1. MultiSourceDataset: New iterable dataset for weighted sampling from multiple sources with:
    • Three stopping strategies: first_exhausted, all_exhausted, never_exhausted
    • Token-level or sample-level weighting
    • Elastic source add/remove during checkpoint resume
    • Backward compatible checkpoint loading
  2. DynamicBatchingSizeDataset: Stateful dynamic batching with multi-worker support

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Feb 28, 2026

CLA assistant check
All committers have signed the CLA.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces MultiSourceDataset for weighted sampling from multiple data sources and enhances DynamicBatchingSizeDataset with stateful multi-worker support, both with robust checkpointing capabilities. The changes are well-tested and significantly improve data handling flexibility. My main feedback concerns the use of a broad exception in the MoE merging script, which could mask underlying issues. I've suggested a more specific exception handling approach to improve the script's robustness.

Comment thread scripts/moe_ckpt_merge/moe_merge.py
@LiuzcEECS LiuzcEECS force-pushed the junhao/feature-multisource-dataset branch from c6d3fc0 to cca36ef Compare February 28, 2026 03:44
Comment thread tests/data/test_multisource_dataset.py Outdated
Comment thread tests/data/test_multisource_dataset.py Outdated
@LiuzcEECS LiuzcEECS force-pushed the junhao/feature-multisource-dataset branch from 2863b77 to 1268cc2 Compare March 23, 2026 22:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants