Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,21 @@ def insert(self, sequence_number: int, chunk: TableChunk) -> None:
sequence_number: int
The sequence number of the chunk to insert.
chunk: TableChunk
The table chunk to insert.
The table chunk to insert. Need not be GPU-resident; if spilled,
it will be made available internally.
"""
chunk = chunk.make_available_and_spill(
self.context.br(), allow_overbooking=True
)
self.allgather.insert(
sequence_number,
# TODO: Avoid unnecessary copies.
# See https://github.com/rapidsai/rapidsmpf/issues/933
PackedData.from_cudf_packed_columns(
pack(
chunk.table_view(),
chunk.stream,
mr=self.context.br().device_mr,
),
chunk.stream,
self.context.br(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from rapidsmpf.communicator.single import new_communicator as single_comm
from rapidsmpf.config import Options, get_environment_variables
from rapidsmpf.memory.buffer import MemoryType
from rapidsmpf.streaming.core.actor import define_actor
from rapidsmpf.streaming.core.context import Context
from rapidsmpf.streaming.core.message import Message
Expand Down Expand Up @@ -222,7 +221,7 @@ async def _local_aggregation(
ir_context=ir_context,
)
chunk = _enforce_schema(chunk, decomposed.piecewise_ir.schema)
total_size += chunk.data_alloc_size(MemoryType.DEVICE)
total_size += chunk.data_alloc_size()
evaluated_chunks.append(chunk)
if total_size > target_partition_size and len(evaluated_chunks) > 1:
evaluated_chunks = [
Expand All @@ -233,7 +232,7 @@ async def _local_aggregation(
ir_context=ir_context,
)
]
total_size = evaluated_chunks[0].data_alloc_size(MemoryType.DEVICE)
total_size = evaluated_chunks[0].data_alloc_size()
if total_size > target_partition_size and allow_early_exit:
break

Expand Down Expand Up @@ -577,7 +576,7 @@ async def _choose_strategy(
-------
The output count.
"""
aggregated_size = aggregated.data_alloc_size(MemoryType.DEVICE)
aggregated_size = aggregated.data_alloc_size()
local_estimated_size = (aggregated_size // max(1, chunks_received)) * local_count

if skip_global_comm:
Expand Down
22 changes: 8 additions & 14 deletions python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Literal

from rapidsmpf.memory.buffer import MemoryType
from rapidsmpf.memory.memory_reservation import opaque_memory_usage
from rapidsmpf.streaming.core.actor import define_actor
from rapidsmpf.streaming.core.memory_reserve_or_wait import (
Expand Down Expand Up @@ -177,16 +176,8 @@ async def _collect_small_side_for_broadcast(
chunks: list[TableChunk] = []
while (msg := await ch.recv(context)) is not None:
chunks.append(TableChunk.from_message(msg))
size += chunks[-1].data_alloc_size(MemoryType.DEVICE)
# TODO: We only need to spill the chunks here, because
# we don't track row-count metadata yet.
chunks, _ = await make_table_chunks_available_or_wait(
context,
chunks,
reserve_extra=0,
net_memory_delta=0,
)
row_count = sum(c.table_view().num_rows() for c in chunks)
size += chunks[-1].data_alloc_size()
row_count = sum(c.shape[0] for c in chunks)

if (can_concatenate := row_count < CUDF_ROW_LIMIT) and concat_size_limit:
can_concatenate = size <= concat_size_limit
Expand Down Expand Up @@ -222,6 +213,9 @@ async def _collect_small_side_for_broadcast(
)
]
else:
chunks, _ = await make_table_chunks_available_or_wait(
context, chunks, reserve_extra=0, net_memory_delta=0
)
dfs = [chunk_to_frame(c, ir) for c in chunks]

return dfs, size
Expand All @@ -244,7 +238,7 @@ async def _broadcast_join_large_chunk(
) -> None:
"""Join one large-side chunk with the small DataFrame(s) and send the result."""
large_df = chunk_to_frame(large_chunk, large_child)
large_chunk_size = large_chunk.data_alloc_size(MemoryType.DEVICE)
large_chunk_size = large_chunk.data_alloc_size()

dfs_to_join = small_dfs
if not dfs_to_join:
Expand Down Expand Up @@ -835,8 +829,8 @@ async def _sample_chunks(
context.br(), allow_overbooking=True
)
sampled_chunks[msg.sequence_number] = chunk
total_size += chunk.data_alloc_size(MemoryType.DEVICE)
total_rows += chunk.table_view().num_rows()
total_size += chunk.data_alloc_size()
total_rows += chunk.shape[0]
if total_size >= max_sample_bytes:
break
if sampled_chunks:
Expand Down
6 changes: 5 additions & 1 deletion python/pylibcudf/pylibcudf/contiguous_split.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ class PackedColumns:
self, stream: Stream | None = None
) -> tuple[memoryview[bytes], gpumemoryview]: ...

def pack(input: Table, stream: Stream | None = None) -> PackedColumns: ...
def pack(
input: Table,
stream: Stream | None = None,
mr: DeviceMemoryResource | None = None,
) -> PackedColumns: ...
def unpack(input: PackedColumns, stream: Stream | None = None) -> Table: ...
def unpack_from_memoryviews(
metadata: memoryview[bytes],
Expand Down
Loading