diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/allgather.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/allgather.py index a019d3d535b..4e2fa5bbc7f 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/allgather.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/allgather.py @@ -49,14 +49,21 @@ def insert(self, sequence_number: int, chunk: TableChunk) -> None: sequence_number: int The sequence number of the chunk to insert. chunk: TableChunk - The table chunk to insert. + The table chunk to insert. Need not be GPU-resident; if spilled, + it will be made available internally. """ + chunk = chunk.make_available_and_spill( + self.context.br(), allow_overbooking=True + ) self.allgather.insert( sequence_number, + # TODO: Avoid unnecessary copies. + # See https://github.com/rapidsai/rapidsmpf/issues/933 PackedData.from_cudf_packed_columns( pack( chunk.table_view(), chunk.stream, + mr=self.context.br().device_mr, ), chunk.stream, self.context.br(), diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/groupby.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/groupby.py index a5cf7366f95..feaf2f92e07 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/groupby.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/groupby.py @@ -9,7 +9,6 @@ from rapidsmpf.communicator.single import new_communicator as single_comm from rapidsmpf.config import Options, get_environment_variables -from rapidsmpf.memory.buffer import MemoryType from rapidsmpf.streaming.core.actor import define_actor from rapidsmpf.streaming.core.context import Context from rapidsmpf.streaming.core.message import Message @@ -222,7 +221,7 @@ async def _local_aggregation( ir_context=ir_context, ) chunk = _enforce_schema(chunk, decomposed.piecewise_ir.schema) - total_size += chunk.data_alloc_size(MemoryType.DEVICE) + total_size += chunk.data_alloc_size() evaluated_chunks.append(chunk) if total_size > target_partition_size and len(evaluated_chunks) > 1: evaluated_chunks = [ @@ -233,7 +232,7 @@ async def _local_aggregation( ir_context=ir_context, ) ] - total_size = evaluated_chunks[0].data_alloc_size(MemoryType.DEVICE) + total_size = evaluated_chunks[0].data_alloc_size() if total_size > target_partition_size and allow_early_exit: break @@ -577,7 +576,7 @@ async def _choose_strategy( ------- The output count. """ - aggregated_size = aggregated.data_alloc_size(MemoryType.DEVICE) + aggregated_size = aggregated.data_alloc_size() local_estimated_size = (aggregated_size // max(1, chunks_received)) * local_count if skip_global_comm: diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py index ed461890ac3..731eafea015 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py @@ -8,7 +8,6 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Literal -from rapidsmpf.memory.buffer import MemoryType from rapidsmpf.memory.memory_reservation import opaque_memory_usage from rapidsmpf.streaming.core.actor import define_actor from rapidsmpf.streaming.core.memory_reserve_or_wait import ( @@ -177,16 +176,8 @@ async def _collect_small_side_for_broadcast( chunks: list[TableChunk] = [] while (msg := await ch.recv(context)) is not None: chunks.append(TableChunk.from_message(msg)) - size += chunks[-1].data_alloc_size(MemoryType.DEVICE) - # TODO: We only need to spill the chunks here, because - # we don't track row-count metadata yet. - chunks, _ = await make_table_chunks_available_or_wait( - context, - chunks, - reserve_extra=0, - net_memory_delta=0, - ) - row_count = sum(c.table_view().num_rows() for c in chunks) + size += chunks[-1].data_alloc_size() + row_count = sum(c.shape[0] for c in chunks) if (can_concatenate := row_count < CUDF_ROW_LIMIT) and concat_size_limit: can_concatenate = size <= concat_size_limit @@ -222,6 +213,9 @@ async def _collect_small_side_for_broadcast( ) ] else: + chunks, _ = await make_table_chunks_available_or_wait( + context, chunks, reserve_extra=0, net_memory_delta=0 + ) dfs = [chunk_to_frame(c, ir) for c in chunks] return dfs, size @@ -244,7 +238,7 @@ async def _broadcast_join_large_chunk( ) -> None: """Join one large-side chunk with the small DataFrame(s) and send the result.""" large_df = chunk_to_frame(large_chunk, large_child) - large_chunk_size = large_chunk.data_alloc_size(MemoryType.DEVICE) + large_chunk_size = large_chunk.data_alloc_size() dfs_to_join = small_dfs if not dfs_to_join: @@ -835,8 +829,8 @@ async def _sample_chunks( context.br(), allow_overbooking=True ) sampled_chunks[msg.sequence_number] = chunk - total_size += chunk.data_alloc_size(MemoryType.DEVICE) - total_rows += chunk.table_view().num_rows() + total_size += chunk.data_alloc_size() + total_rows += chunk.shape[0] if total_size >= max_sample_bytes: break if sampled_chunks: diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyi b/python/pylibcudf/pylibcudf/contiguous_split.pyi index ba1e6421259..df241c079ae 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyi +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyi @@ -14,7 +14,11 @@ class PackedColumns: self, stream: Stream | None = None ) -> tuple[memoryview[bytes], gpumemoryview]: ... -def pack(input: Table, stream: Stream | None = None) -> PackedColumns: ... +def pack( + input: Table, + stream: Stream | None = None, + mr: DeviceMemoryResource | None = None, +) -> PackedColumns: ... def unpack(input: PackedColumns, stream: Stream | None = None) -> Table: ... def unpack_from_memoryviews( metadata: memoryview[bytes],