-
Notifications
You must be signed in to change notification settings - Fork 122
Add Seq Pack to verl in third_party #885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Updated from commit 8b33abd84f360473f05e5a750aef36e974340cce to 4424616d7dfe03cc564866dc5e99dfaba1daba2e - Includes 615 new commits with various improvements and bug fixes - Latest commit: [algo] refactor: Rollout Importance Sampling - Separate IS Weights from Rejection Sampling (#3915) - Key recent changes: * Recipe improvements and bug fixes * Documentation updates * Algorithm refactoring for better performance * Installation script updates * CI/CD improvements Tracking commit: 4424616d7dfe03cc564866dc5e99dfaba1daba2e
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Summary of ChangesHello @WenLiuyi, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request updates the verl submodule to include several new sequence packing algorithms and refactors some YAML configurations. The new packing algorithms are a great addition, providing a structured way to handle sequence packing with various strategies.
However, there are several critical and high-severity issues that need to be addressed:
- A patch file,
flagscale/backends/verl/verl/utils/seqlen_balancing.py.patch, has been added to the repository. This is unconventional and should be removed. The changes should reside within thethird_party/verlsubmodule, which is already being updated in this PR. - The code within the patch file contains several bugs, including potential
Noneaccess errors,NameErrors from missing imports or typos, and incorrect logic in a factory function.
I have provided detailed comments on each issue within the patch file. Please review them and apply the suggested fixes in the verl submodule repository, then update the submodule commit hash in this PR, and finally remove the extraneous .patch file.
| diff --git a/verl/utils/seqlen_balancing.py b/verl/utils/seqlen_balancing.py | ||
| index bc5588f7..8face3b3 100644 | ||
| --- a/verl/utils/seqlen_balancing.py | ||
| +++ b/verl/utils/seqlen_balancing.py | ||
| @@ -15,7 +15,9 @@ | ||
| import copy | ||
| import heapq | ||
| from itertools import chain | ||
| - | ||
| +import enum | ||
| +from abc import ABC, abstractmethod | ||
| +from typing import Dict, List, Optional, Tuple, Type, Union | ||
| import torch | ||
| from torch import distributed as dist | ||
|
|
||
| @@ -34,128 +36,573 @@ def calculate_workload(seqlen_list: list[int]): | ||
| return 24576 * seqlen_list + seqlen_list**2 | ||
|
|
||
|
|
||
| -def karmarkar_karp(seqlen_list: list[int], k_partitions: int, equal_size: bool): | ||
| - # see: https://en.wikipedia.org/wiki/Largest_differencing_method | ||
| - class Set: | ||
| - def __init__(self) -> None: | ||
| - self.sum = 0 | ||
| - self.items = [] | ||
| +class PackingAlgorithm(enum.Enum): | ||
| + """Enum for supported sequence packing algorithms.""" | ||
| + | ||
| + KARMARKAR_KARP = "karmarkar_karp" | ||
| + GREEDY_PARTITION = "greedy_partition" | ||
| + CONCATENATIVE = "concatenative" | ||
| + FIRST_FIT_DECREASING = "first_fit_decreasing" | ||
| + FIRST_FIT_SHUFFLE = "first_fit_shuffle" | ||
| + MODIFIED_FIRST_FIT_DECREASING = "modified_first_fit_decreasing" | ||
| + | ||
| + | ||
| +class SequencePacker(ABC): | ||
| + def __init__( | ||
| + self, | ||
| + bin_capacity: int, | ||
| + min_bin_count: Optional[int] = None, | ||
| + bin_count_multiple: Optional[int] = None, | ||
| + equal_size: bool = True, | ||
| + ): | ||
| + """Initialize the sequence packer. | ||
| + | ||
| + Args: | ||
| + bin_capacity: The maximum capacity of each bin. | ||
| + min_bin_count: Minimum number of bins to create, even if fewer would suffice. | ||
| + If None, no minimum is enforced. | ||
| + bin_count_multiple: The total number of bins must be a multiple of this value. | ||
| + If None, no multiple constraint is enforced. | ||
| + equal_size (bool): If True, ensures that each partition has the same number of items. | ||
| + Requires len(seqlen_list) to be divisible by k_partitions. | ||
| + If False, partitions can have varying numbers of items, focusing | ||
| + only on balancing the sum of sequence lengths. | ||
|
|
||
| - def add(self, idx: int, val: int): | ||
| - self.items.append((idx, val)) | ||
| - self.sum += val | ||
| + Raises: | ||
| + ValueError: If min_bin_count or bin_count_multiple are invalid. | ||
| + """ | ||
| + self.bin_capacity = bin_capacity | ||
| + self.min_bin_count = min_bin_count | ||
| + self.bin_count_multiple = bin_count_multiple | ||
| + self.metrics = None | ||
| + | ||
| + if min_bin_count is not None and min_bin_count < 0: | ||
| + raise ValueError("min_bin_count must be nonnegative") | ||
| + if bin_count_multiple is not None and bin_count_multiple < 1: | ||
| + raise ValueError("bin_count_multiple must be positive") | ||
| + | ||
| + @abstractmethod | ||
| + def _pack_implementation(self, sequence_lengths: list[int]) -> list[list[int]]: | ||
| + pass | ||
| + | ||
| + def _adjust_bin_count(self, bins: list[list[int]]) -> list[list[int]]: | ||
| + """Adjust the number of bins to meet minimum and multiple constraints.""" | ||
| + current_bin_count = len(bins) | ||
| + target_bin_count = current_bin_count | ||
| + | ||
| + if self.min_bin_count is not None: | ||
| + target_bin_count = max(target_bin_count, self.min_bin_count) | ||
| + | ||
| + if self.bin_count_multiple is not None: | ||
| + remainder = target_bin_count % self.bin_count_multiple | ||
| + if remainder != 0: | ||
| + target_bin_count += self.bin_count_multiple - remainder | ||
| + | ||
| + if target_bin_count == current_bin_count: | ||
| + return bins | ||
| + | ||
| + # Count total sequences | ||
| + total_sequences = sum(len(bin_contents) for bin_contents in bins) | ||
| + if total_sequences < target_bin_count: | ||
| + raise ValueError( | ||
| + f"Cannot create {target_bin_count} bins with only {total_sequences} sequences. " | ||
| + f"Each bin must contain at least one sequence. " | ||
| + f"Either reduce min_bin_count/bin_count_multiple or provide more sequences." | ||
| + ) | ||
|
|
||
| - def merge(self, other): | ||
| - for idx, val in other.items: | ||
| + adjusted_bins = [bin_contents.copy() for bin_contents in bins] | ||
| + additional_bins_needed = target_bin_count - current_bin_count | ||
| + for _ in range(additional_bins_needed): | ||
| + adjusted_bins.append([]) | ||
| + | ||
| + # Move sequences from existing bins to new bins | ||
| + bin_sizes = [ | ||
| + (len(bin_contents), i) | ||
| + for i, bin_contents in enumerate(adjusted_bins[:current_bin_count]) | ||
| + ] | ||
| + bin_sizes.sort(reverse=True) # Sort by size, largest first | ||
| + | ||
| + source_bin_idx = 0 | ||
| + | ||
| + for new_bin_idx in range(current_bin_count, target_bin_count): | ||
| + # Find a bin with at least 2 sequences (so we can move one and leave at least one) | ||
| + while source_bin_idx < len(bin_sizes): | ||
| + bin_size, original_bin_idx = bin_sizes[source_bin_idx] | ||
| + current_size = len(adjusted_bins[original_bin_idx]) | ||
| + | ||
| + if current_size > 1: | ||
| + # Move one sequence from this bin to the new bin | ||
| + sequence_to_move = adjusted_bins[original_bin_idx].pop() | ||
| + adjusted_bins[new_bin_idx].append(sequence_to_move) | ||
| + break | ||
| + else: | ||
| + # This bin only has one sequence, try the next one | ||
| + source_bin_idx += 1 | ||
| + else: | ||
| + # If we get here, we couldn't find any bin with more than 1 sequence | ||
| + # This should not happen given our earlier validation, but let's handle it | ||
| + raise ValueError( | ||
| + f"Cannot create additional bins: insufficient sequences to redistribute. " | ||
| + f"Need {additional_bins_needed} additional bins but cannot find enough " | ||
| + f"bins with multiple sequences to redistribute from." | ||
| + f"WARNING: Triggering this section of code is a bug. Please report it." | ||
| + ) | ||
| + | ||
| + return adjusted_bins | ||
| + | ||
| + def pack(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + """Pack sequences into bins. | ||
| + | ||
| + Args: | ||
| + seqlen_list: A list of sequence lengths to pack. | ||
| + | ||
| + Returns: | ||
| + A list of bins, where each bin is a list of indices into the original | ||
| + seqlen_list list. The number of bins will satisfy min_bin_count | ||
| + and bin_count_multiple constraints if specified. | ||
| + """ | ||
| + # Call the implementation | ||
| + bins = self._pack_implementation(seqlen_list) | ||
| + | ||
| + # Adjust bin count to meet constraints | ||
| + bins = self._adjust_bin_count(bins) | ||
| + | ||
| + return bins | ||
| + | ||
| + def _validate_sequence_lengths(self, seqlen_list: list[int]) -> None: | ||
| + """Validate that all sequence lengths are within bin capacity.""" | ||
| + for length in seqlen_list: | ||
| + if length > self.bin_capacity: | ||
| + raise ValueError( | ||
| + f"Sequence length {length} exceeds bin capacity {self.bin_capacity}" | ||
| + ) | ||
| + | ||
| +class KarmarkarKarpPacker(SequencePacker): | ||
| + def _pack_implementation(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + # see: https://en.wikipedia.org/wiki/Largest_differencing_method | ||
| + k_partitions = self.min_bin_count | ||
| + class Set: | ||
| + def __init__(self) -> None: | ||
| + self.sum = 0 | ||
| + self.items = [] | ||
| + | ||
| + def add(self, idx: int, val: int): | ||
| self.items.append((idx, val)) | ||
| self.sum += val | ||
|
|
||
| - def __lt__(self, other): | ||
| - if self.sum != other.sum: | ||
| - return self.sum < other.sum | ||
| - if len(self.items) != len(other.items): | ||
| - return len(self.items) < len(other.items) | ||
| - return self.items < other.items | ||
| - | ||
| - class State: | ||
| - def __init__(self, items: list[tuple[int, int]], k: int) -> None: | ||
| - self.k = k | ||
| - # sets should always be decreasing order | ||
| - self.sets = [Set() for _ in range(k)] | ||
| - assert len(items) in [1, k], f"{len(items)} not in [1, {k}]" | ||
| - for i, (idx, seqlen) in enumerate(items): | ||
| - self.sets[i].add(idx=idx, val=seqlen) | ||
| - self.sets = sorted(self.sets, reverse=True) | ||
| - | ||
| - def get_partitions(self): | ||
| - partitions = [] | ||
| - for i in range(len(self.sets)): | ||
| - cur_partition = [] | ||
| - for idx, _ in self.sets[i].items: | ||
| - cur_partition.append(idx) | ||
| - partitions.append(cur_partition) | ||
| - return partitions | ||
| - | ||
| - def merge(self, other): | ||
| - for i in range(self.k): | ||
| - self.sets[i].merge(other.sets[self.k - 1 - i]) | ||
| - self.sets = sorted(self.sets, reverse=True) | ||
| - | ||
| - @property | ||
| - def spread(self) -> int: | ||
| - return self.sets[0].sum - self.sets[-1].sum | ||
| - | ||
| - def __lt__(self, other): | ||
| - # least heap, let the state with largest spread to be popped first, | ||
| - # if the spread is the same, let the state who has the largest set | ||
| - # to be popped first. | ||
| - if self.spread != other.spread: | ||
| - return self.spread > other.spread | ||
| - return self.sets[0] > other.sets[0] | ||
| - | ||
| - def __repr__(self) -> str: | ||
| - repr_str = "[" | ||
| - for i in range(self.k): | ||
| - if i > 0: | ||
| - repr_str += "," | ||
| - repr_str += "{" | ||
| - for j, (_, seqlen) in enumerate(self.sets[i].items): | ||
| - if j > 0: | ||
| + def merge(self, other): | ||
| + for idx, val in other.items: | ||
| + self.items.append((idx, val)) | ||
| + self.sum += val | ||
| + | ||
| + def __lt__(self, other): | ||
| + if self.sum != other.sum: | ||
| + return self.sum < other.sum | ||
| + if len(self.items) != len(other.items): | ||
| + return len(self.items) < len(other.items) | ||
| + return self.items < other.items | ||
| + | ||
| + class State: | ||
| + def __init__(self, items: list[tuple[int, int]], k: int) -> None: | ||
| + self.k = k | ||
| + # sets should always be decreasing order | ||
| + self.sets = [Set() for _ in range(k)] | ||
| + assert len(items) in [1, k], f"{len(items)} not in [1, {k}]" | ||
| + for i, (idx, seqlen) in enumerate(items): | ||
| + self.sets[i].add(idx=idx, val=seqlen) | ||
| + self.sets = sorted(self.sets, reverse=True) | ||
| + | ||
| + def get_partitions(self): | ||
| + partitions = [] | ||
| + for i in range(len(self.sets)): | ||
| + cur_partition = [] | ||
| + for idx, _ in self.sets[i].items: | ||
| + cur_partition.append(idx) | ||
| + partitions.append(cur_partition) | ||
| + return partitions | ||
| + | ||
| + def merge(self, other): | ||
| + for i in range(self.k): | ||
| + self.sets[i].merge(other.sets[self.k - 1 - i]) | ||
| + self.sets = sorted(self.sets, reverse=True) | ||
| + | ||
| + @property | ||
| + def spread(self) -> int: | ||
| + return self.sets[0].sum - self.sets[-1].sum | ||
| + | ||
| + def __lt__(self, other): | ||
| + # least heap, let the state with largest spread to be popped first, | ||
| + # if the spread is the same, let the state who has the largest set | ||
| + # to be popped first. | ||
| + if self.spread != other.spread: | ||
| + return self.spread > other.spread | ||
| + return self.sets[0] > other.sets[0] | ||
| + | ||
| + def __repr__(self) -> str: | ||
| + repr_str = "[" | ||
| + for i in range(self.k): | ||
| + if i > 0: | ||
| repr_str += "," | ||
| - repr_str += str(seqlen) | ||
| - repr_str += "}" | ||
| - repr_str += "]" | ||
| - return repr_str | ||
| - | ||
| - sorted_seqlen_list = sorted([(seqlen, i) for i, seqlen in enumerate(seqlen_list)]) | ||
| - states_pq = [] | ||
| - if equal_size: | ||
| - assert len(seqlen_list) % k_partitions == 0, f"{len(seqlen_list)} % {k_partitions} != 0" | ||
| - for offset in range(0, len(sorted_seqlen_list), k_partitions): | ||
| - items = [] | ||
| - for i in range(k_partitions): | ||
| - seqlen, idx = sorted_seqlen_list[offset + i] | ||
| - items.append((idx, seqlen)) | ||
| - heapq.heappush(states_pq, State(items=items, k=k_partitions)) | ||
| - else: | ||
| - for seqlen, idx in sorted_seqlen_list: | ||
| - heapq.heappush(states_pq, State(items=[(idx, seqlen)], k=k_partitions)) | ||
| - | ||
| - while len(states_pq) > 1: | ||
| - state0 = heapq.heappop(states_pq) | ||
| - state1 = heapq.heappop(states_pq) | ||
| - # merge states | ||
| - state0.merge(state1) | ||
| - heapq.heappush(states_pq, state0) | ||
| - | ||
| - final_state = states_pq[0] | ||
| - partitions = final_state.get_partitions() | ||
| - if equal_size: | ||
| - for i, partition in enumerate(partitions): | ||
| - assert len(partition) * k_partitions == len(seqlen_list), ( | ||
| - f"{len(partition)} * {k_partitions} != {len(seqlen_list)}" | ||
| + repr_str += "{" | ||
| + for j, (_, seqlen) in enumerate(self.sets[i].items): | ||
| + if j > 0: | ||
| + repr_str += "," | ||
| + repr_str += str(seqlen) | ||
| + repr_str += "}" | ||
| + repr_str += "]" | ||
| + return repr_str | ||
| + | ||
| + sorted_seqlen_list = sorted([(seqlen, i) for i, seqlen in enumerate(seqlen_list)]) | ||
| + states_pq = [] | ||
| + if self.equal_size: | ||
| + assert len(seqlen_list) % k_partitions == 0, f"{len(seqlen_list)} % {k_partitions} != 0" | ||
| + for offset in range(0, len(sorted_seqlen_list), k_partitions): | ||
| + items = [] | ||
| + for i in range(k_partitions): | ||
| + seqlen, idx = sorted_seqlen_list[offset + i] | ||
| + items.append((idx, seqlen)) | ||
| + heapq.heappush(states_pq, State(items=items, k=k_partitions)) | ||
| + else: | ||
| + for seqlen, idx in sorted_seqlen_list: | ||
| + heapq.heappush(states_pq, State(items=[(idx, seqlen)], k=k_partitions)) | ||
| + | ||
| + while len(states_pq) > 1: | ||
| + state0 = heapq.heappop(states_pq) | ||
| + state1 = heapq.heappop(states_pq) | ||
| + # merge states | ||
| + state0.merge(state1) | ||
| + heapq.heappush(states_pq, state0) | ||
| + | ||
| + final_state = states_pq[0] | ||
| + partitions = final_state.get_partitions() | ||
| + if self.equal_size: | ||
| + for i, partition in enumerate(partitions): | ||
| + assert len(partition) * k_partitions == len(seqlen_list), ( | ||
| + f"{len(partition)} * {k_partitions} != {len(seqlen_list)}" | ||
| + ) | ||
| + return partitions | ||
| + | ||
| + | ||
| +class GreedyPartitionPacker(SequencePacker): | ||
| + def _pack_implementation(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + k_partitions = self.min_bin_count | ||
| + | ||
| + bias = sum(seqlen_list) + 1 if self.equal_size else 0 | ||
| + sorted_seqlen = [(seqlen + bias, i) for i, seqlen in enumerate(seqlen_list)] | ||
| + partitions = [[] for _ in range(k_partitions)] | ||
| + partition_sums = [0 for _ in range(k_partitions)] | ||
| + for seqlen, i in sorted_seqlen: | ||
| + min_idx = None | ||
| + for j in range(k_partitions): | ||
| + if min_idx is None or partition_sums[j] < partition_sums[min_idx]: | ||
| + min_idx = j | ||
| + partitions[min_idx].append(i) | ||
| + partition_sums[min_idx] += seqlen | ||
| + if self.equal_size: | ||
| + for i, partition in enumerate(partitions): | ||
| + assert len(partition) * k_partitions == len(seqlen_list), ( | ||
| + f"{len(partition)} * {k_partitions} != {len(seqlen_list)}" | ||
| + ) | ||
| + return partitions | ||
| + | ||
| + | ||
| +class ConcatenativePacker(SequencePacker): | ||
| + """Concatenative packing algorithm. | ||
| + | ||
| + This algorithm simply concatenates sequences in order until reaching the bin capacity, | ||
| + then starts a new bin. It doesn't try to optimize the packing in any way. | ||
| + | ||
| + Time complexity: O(n) where n is the number of sequences. | ||
| + | ||
| + Example: | ||
| + ```python | ||
| + >>> examples = { | ||
| + ... "sequence_lengths": [4, 1, 3, 2, 1, 3, 4, 5] | ||
| + ... } | ||
| + >>> # If packed with seq_length=5: | ||
| + ... {"bins": [ [0, 1], [2, 3], [4, 5], [6], [7] ]} | ||
| + >>> # If packed with seq_length=8: | ||
| + ... {"bins": [ [0, 1, 2], [3, 4, 5], [6], [7] ]} | ||
| + """ | ||
| + | ||
| + # Global class variable to limit the number of sequences packed in a unit | ||
| + # -1 disables this limit | ||
| + max_sequences_per_bin = -1 # Useful for debugging and testing | ||
| + | ||
| + def _pack_implementation(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + """Pack sequences using the concatenative algorithm.""" | ||
| + # Validate sequence lengths | ||
| + self._validate_sequence_lengths(seqlen_list) | ||
| + | ||
| + bins = [] # List of bins, each bin is a list of sequence indices | ||
| + current_bin = [] # Current bin being filled | ||
| + current_length = 0 # Current length of sequences in the bin | ||
| + | ||
| + for i, length in enumerate(seqlen_list): | ||
| + # Check if adding this sequence would exceed bin capacity or sequence limit | ||
| + exceeds_capacity = current_length + length > self.bin_capacity | ||
| + exceeds_sequence_limit = ( | ||
| + self.max_sequences_per_bin != -1 | ||
| + and len(current_bin) >= self.max_sequences_per_bin | ||
| ) | ||
| - return partitions | ||
| - | ||
| - | ||
| -def greedy_partition(seqlen_list: list[int], k_partitions: int, equal_size: bool): | ||
| - bias = sum(seqlen_list) + 1 if equal_size else 0 | ||
| - sorted_seqlen = [(seqlen + bias, i) for i, seqlen in enumerate(seqlen_list)] | ||
| - partitions = [[] for _ in range(k_partitions)] | ||
| - partition_sums = [0 for _ in range(k_partitions)] | ||
| - for seqlen, i in sorted_seqlen: | ||
| - min_idx = None | ||
| - for j in range(k_partitions): | ||
| - if min_idx is None or partition_sums[j] < partition_sums[min_idx]: | ||
| - min_idx = j | ||
| - partitions[min_idx].append(i) | ||
| - partition_sums[min_idx] += seqlen | ||
| - if equal_size: | ||
| - for i, partition in enumerate(partitions): | ||
| - assert len(partition) * k_partitions == len(seqlen_list), ( | ||
| - f"{len(partition)} * {k_partitions} != {len(seqlen_list)}" | ||
| + | ||
| + # If adding this sequence would exceed constraints, start a new bin | ||
| + if exceeds_capacity or exceeds_sequence_limit: | ||
| + if current_bin: # Only add the bin if it's not empty | ||
| + bins.append(current_bin) | ||
| + current_bin = [i] | ||
| + current_length = length | ||
| + else: | ||
| + # Add the sequence to the current bin | ||
| + current_bin.append(i) | ||
| + current_length += length | ||
| + | ||
| + # Add the last bin if it's not empty | ||
| + if current_bin: | ||
| + bins.append(current_bin) | ||
| + | ||
| + return bins | ||
| + | ||
| + | ||
| +class FirstFitPacker(SequencePacker): | ||
| + """Base class for First-Fit algorithms. | ||
| + | ||
| + First-Fit algorithms place each sequence into the first bin where it fits. | ||
| + If no bin can fit the sequence, a new bin is created. | ||
| + | ||
| + This is an abstract base class that provides the common implementation for | ||
| + First-Fit variants. Subclasses must implement the _prepare_sequences method | ||
| + to determine the order in which sequences are processed. | ||
| + """ | ||
| + | ||
| + def _prepare_sequences(self, seqlen_list: list[int]) -> list[tuple[int, int]]: | ||
| + raise NotImplementedError("Subclasses must implement _prepare_sequences") | ||
| + | ||
| + def _pack_implementation(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + # Prepare sequences for packing (order determined by subclass) | ||
| + indexed_lengths = self._prepare_sequences(sequence_lengths) | ||
| + | ||
| + bins = [] # List of bins, each bin is a list of sequence indices | ||
| + bin_remaining = [] # Remaining capacity for each bin | ||
| + | ||
| + for length, idx in indexed_lengths: | ||
| + # If the sequence is larger than the bin capacity, it cannot be packed | ||
| + if length > self.bin_capacity: | ||
| + raise ValueError( | ||
| + f"Sequence length {length} exceeds bin capacity {self.bin_capacity}" | ||
| + ) | ||
| + | ||
| + # Try to find a bin where the sequence fits | ||
| + bin_found = False | ||
| + for i, remaining in enumerate(bin_remaining): | ||
| + if remaining >= length: | ||
| + # Add the sequence to this bin | ||
| + bins[i].append(idx) | ||
| + bin_remaining[i] -= length | ||
| + bin_found = True | ||
| + break | ||
| + | ||
| + # If no suitable bin was found, create a new one | ||
| + if not bin_found: | ||
| + bins.append([idx]) | ||
| + bin_remaining.append(self.bin_capacity - length) | ||
| + | ||
| + return bins | ||
| + | ||
| + | ||
| +class FirstFitDecreasingPacker(FirstFitPacker): | ||
| + """First-Fit Decreasing (FFD) algorithm for sequence packing. | ||
| + | ||
| + This algorithm sorts sequences by length in descending order and then | ||
| + places each sequence into the first bin where it fits. | ||
| + | ||
| + Time complexity: O(n log n) for sorting + O(n * m) for packing, | ||
| + where n is the number of sequences and m is the number of bins. | ||
| + """ | ||
| + def _prepare_sequences(self, seqlen_list: list[int]) -> list[tuple[int, int]]: | ||
| + # Create a list of (length, index) pairs | ||
| + indexed_lengths = [(length, i) for i, length in enumerate(seqlen_list)] | ||
| + | ||
| + # Sort by length in descending order | ||
| + indexed_lengths.sort(reverse=True) | ||
| + | ||
| + return indexed_lengths | ||
| + | ||
| +class FirstFitShufflePacker(FirstFitPacker): | ||
| + """First-Fit Shuffle algorithm for sequence packing. | ||
| + | ||
| + This algorithm randomly shuffles the sequences and then places each | ||
| + sequence into the first bin where it fits. | ||
| + | ||
| + Time complexity: O(n * m) for packing, where n is the number of sequences | ||
| + and m is the number of bins. | ||
| + """ | ||
| + | ||
| + def _prepare_sequences(self, seqlen_list: list[int]) -> list[tuple[int, int]]: | ||
| + # Create a list of (length, index) pairs | ||
| + indexed_lengths = [(length, i) for i, length in enumerate(seqlen_list)] | ||
| + | ||
| + # Shuffle the sequences | ||
| + random.shuffle(indexed_lengths) | ||
| + | ||
| + return indexed_lengths | ||
| + | ||
| +class ModifiedFirstFitDecreasingPacker(SequencePacker): | ||
| + """Modified First-Fit Decreasing (MFFD) algorithm for sequence packing. | ||
| + | ||
| + This algorithm implements the Johnson & Garey (1985) Modified First-Fit-Decreasing | ||
| + heuristic. It classifies items into four categories (large, medium, small, tiny) | ||
| + and uses a sophisticated 5-phase packing strategy to achieve better bin utilization | ||
| + than standard First-Fit Decreasing. | ||
| + | ||
| + The algorithm phases: | ||
| + 1. Classify items by size relative to bin capacity | ||
| + 2. Create one bin per large item | ||
| + 3. Add medium items to large bins (forward pass) | ||
| + 4. Add pairs of small items to bins with medium items (backward pass) | ||
| + 5. Greedily fit remaining items | ||
| + 6. Apply FFD to any leftovers | ||
| + | ||
| + Time complexity: O(n log n) for sorting + O(n * m) for packing, | ||
| + where n is the number of sequences and m is the number of bins. | ||
| + """ | ||
| + | ||
| + def _classify_items(self, items: list[tuple[int, int]]) | ||
| + -> tuple[ | ||
| + list[tuple[int, int]], | ||
| + list[tuple[int, int]], | ||
| + list[tuple[int, int]], | ||
| + list[tuple[int, int]], | ||
| + ]: | ||
| + """Split items into large / medium / small / tiny classes. | ||
| + | ||
| + Follows the classification used by Johnson & Garey: | ||
| + large : (C/2, C] | ||
| + medium : (C/3, C/2] | ||
| + small : (C/6, C/3] | ||
| + tiny : (0 , C/6] | ||
| + | ||
| + Args: | ||
| + items: List of (index, size) tuples | ||
| + | ||
| + Returns: | ||
| + Tuple of four lists (large, medium, small, tiny) without additional sorting. | ||
| + """ | ||
| + large, medium, small, tiny = [], [], [], [] | ||
| + for idx, size in items: | ||
| + if size > self.bin_capacity / 2: | ||
| + large.append((idx, size)) | ||
| + elif size > self.bin_capacity / 3: | ||
| + medium.append((idx, size)) | ||
| + elif size > self.bin_capacity / 6: | ||
| + small.append((idx, size)) | ||
| + else: | ||
| + tiny.append((idx, size)) | ||
| + return large, medium, small, tiny | ||
| + | ||
| + def _pack_implementation(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + # Validate sequence lengths | ||
| + self._validate_sequence_lengths(seqlen_list) | ||
| + | ||
| + items: list[tuple[int, int]] = [(i, l) for i, l in enumerate(seqlen_list)] | ||
| + # Phase-0: classify | ||
| + large, medium, small, tiny = self._classify_items(items) | ||
| + # Sort according to the rules of MFFD | ||
| + | ||
| + # Sort according to the rules of MFFD | ||
| + large.sort(key=lambda x: x[1], reverse=True) # descending size | ||
| + medium.sort(key=lambda x: x[1], reverse=True) | ||
| + small.sort(key=lambda x: x[1]) # ascending size | ||
| + tiny.sort(key=lambda x: x[1]) | ||
| + | ||
| + # Phase-1: start one bin per large item | ||
| + bins: list[list[tuple[int, int]]] = [[item] for item in large] | ||
| + | ||
| + # Phase-2: try to add one medium item to each large bin (forward pass) | ||
| + for b in bins: | ||
| + remaining = self.bin_capacity - sum(size for _, size in b) | ||
| + for i, (idx, size) in enumerate(medium): | ||
| + if size <= remaining: | ||
| + b.append(medium.pop(i)) | ||
| + break | ||
| + | ||
| + # Phase-3: backward pass – fill with two small items where possible | ||
| + for b in reversed(bins): | ||
| + has_medium = any( | ||
| + self.bin_capacity / 3 < size <= self.bin_capacity / 2 for _, size in b | ||
| ) | ||
| - return partitions | ||
| + if has_medium or len(small) < 2: | ||
| + continue | ||
| + remaining = self.bin_capacity - sum(size for _, size in b) | ||
| + if small[0][1] + small[1][1] > remaining: | ||
| + continue | ||
| + first_small = small.pop(0) | ||
| + # pick the *largest* small that fits with first_small (so iterate from end) | ||
| + second_idx = None | ||
| + for j in range(len(small) - 1, -1, -1): | ||
| + if small[j][1] <= remaining - first_small[1]: | ||
| + second_idx = j | ||
| + break | ||
| + | ||
| + if second_idx is not None: | ||
| + second_small = small.pop(second_idx) | ||
| + b.extend([first_small, second_small]) | ||
| + | ||
| + # Phase-4: forward greedy fit of remaining items | ||
| + remaining_items = sorted(medium + small + tiny, key=lambda x: x[1], reverse=True) | ||
| + for b in bins: | ||
| + while remaining_items: | ||
| + rem = self.bin_capacity - sum(size for _, size in b) | ||
| + # if even the smallest remaining doesn't fit we break | ||
| + if rem < remaining_items[-1][1]: | ||
| + break | ||
| + # pick the first (largest) that fits | ||
| + chosen_idx = None | ||
| + for i, (_, size) in enumerate(remaining_items): | ||
| + if size <= rem: | ||
| + chosen_idx = i | ||
| + break | ||
| + if chosen_idx is None: | ||
| + break | ||
| + b.append(remaining_items.pop(chosen_idx)) | ||
| + | ||
| + # Phase-5: FFD on leftovers | ||
| + leftovers = remaining_items # renamed for clarity | ||
| + ffd_bins: list[list[tuple[int, int]]] = [] | ||
| + for idx, size in sorted(leftovers, key=lambda x: x[1], reverse=True): | ||
| + placed = False | ||
| + for bin_ffd in ffd_bins: | ||
| + if size <= self.bin_capacity - sum(s for _, s in bin_ffd): | ||
| + bin_ffd.append((idx, size)) | ||
| + placed = True | ||
| + break | ||
| + if not placed: | ||
| + ffd_bins.append([(idx, size)]) | ||
| + bins.extend(ffd_bins) | ||
| + | ||
| + # Convert to list of index lists (discard sizes) | ||
| + return [[idx for idx, _ in b] for b in bins] | ||
| + | ||
| + | ||
| +def get_packer( | ||
| + algorithm: Union[PackingAlgorithm, str], | ||
| + bin_capacity: int, | ||
| + min_bin_count: Optional[int] = None, | ||
| + bin_count_multiple: Optional[int] = None, | ||
| + equal_size: bool = True, | ||
| +) -> SequencePacker: | ||
| + """Get a sequence packer based on the specified algorithm.""" | ||
| + packers: Dict[PackingAlgorithm, Type[SequencePacker]] = { | ||
| + PackingAlgorithm.KARMARKAR_KARP: KarmarkarKarpPacker, | ||
| + PackingAlgorithm.GREEDY_PARTITION: GreedyPartitionPacker, | ||
| + PackingAlgorithm.CONCATENATIVE: ConcatenativePacker, | ||
| + PackingAlgorithm.FIRST_FIT_DECREASING: FirstFitDecreasingPacker, | ||
| + PackingAlgorithm.FIRST_FIT_SHUFFLE: FirstFitShufflePacker, | ||
| + PackingAlgorithm.MODIFIED_FIRST_FIT_DECREASING: ModifiedFirstFitDecreasingPacker, | ||
| + } | ||
| + if algorithm not in packers: | ||
| + raise ValueError(f"Unknown packing algorithm: {algorithm}") | ||
| + return packers[algorithm](bin_capacity, min_bin_count, bin_count_multiple, equal_size) | ||
|
|
||
|
|
||
| def get_seqlen_balanced_partitions(seqlen_list: list[int], k_partitions: int, equal_size: bool): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file appears to be a patch file (.patch) that has been added directly to the repository. This is not standard practice and can cause confusion. The code changes for the verl library should be committed to the verl repository itself, and this PR should only contain the updated submodule commit hash for third_party/verl, which it already does. This patch file is likely an artifact of the development process and should be removed from this pull request to keep the repository clean.
| +class KarmarkarKarpPacker(SequencePacker): | ||
| + def _pack_implementation(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + # see: https://en.wikipedia.org/wiki/Largest_differencing_method | ||
| + k_partitions = self.min_bin_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of partitions k_partitions is set to self.min_bin_count, but min_bin_count is an optional parameter and can be None. If it is None, this will likely cause a TypeError when used later (e.g., in range(k) within the State class). Please add a check to handle the case where self.min_bin_count is not provided.
if self.min_bin_count is None:
raise ValueError("KarmarkarKarpPacker requires the 'min_bin_count' parameter.")
k_partitions = self.min_bin_count
| + | ||
| +class GreedyPartitionPacker(SequencePacker): | ||
| + def _pack_implementation(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + k_partitions = self.min_bin_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to KarmarkarKarpPacker, k_partitions is set to self.min_bin_count without checking if it's None. This can lead to a TypeError if min_bin_count is not provided. Please add a validation check.
if self.min_bin_count is None:
raise ValueError("GreedyPartitionPacker requires the 'min_bin_count' parameter.")
k_partitions = self.min_bin_count
| + | ||
| + def _pack_implementation(self, seqlen_list: list[int]) -> list[list[int]]: | ||
| + # Prepare sequences for packing (order determined by subclass) | ||
| + indexed_lengths = self._prepare_sequences(sequence_lengths) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| + indexed_lengths = [(length, i) for i, length in enumerate(seqlen_list)] | ||
| + | ||
| + # Shuffle the sequences | ||
| + random.shuffle(indexed_lengths) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| + if algorithm not in packers: | ||
| + raise ValueError(f"Unknown packing algorithm: {algorithm}") | ||
| + return packers[algorithm](bin_capacity, min_bin_count, bin_count_multiple, equal_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The algorithm parameter can be a string, but the check if algorithm not in packers: will fail for strings because the keys of the packers dictionary are PackingAlgorithm enum members. You should first convert the input algorithm to a PackingAlgorithm enum instance. The current implementation will not work correctly for string inputs.
try:
algorithm = PackingAlgorithm(algorithm)
except ValueError:
raise ValueError(f"Unknown packing algorithm: {algorithm}") from None
return packers[algorithm](bin_capacity, min_bin_count, bin_count_multiple, equal_size)
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
…into seq_pack_1028
…tron Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
|
|
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
…into seq_pack_1028
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
…ize actual_response_len Signed-off-by: WenLiuyi <[email protected]>
Signed-off-by: WenLiuyi <[email protected]>
No description provided.