Skip to content

Commit 756c575

Browse files
authored
TableChunk.copy(): allow wiggle room (#672)
Allow some wiggle room based on number of columns. Also: - use `estimated_memory_usage()` - add missing python bindings for `TableChunk.copy()` Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Niranda Perera (https://github.com/nirandaperera) URL: #672
1 parent 1df992a commit 756c575

File tree

6 files changed

+74
-29
lines changed

6 files changed

+74
-29
lines changed

cpp/include/rapidsmpf/streaming/cudf/table_chunk.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ class TableChunk {
7575
* the TableChunk is destroyed.
7676
*
7777
* @param table_view Device-resident table view.
78-
* @param device_alloc_size Number of bytes allocated in device memory.
7978
* @param stream CUDA stream on which the table was created.
8079
* @param owner Object owning the memory backing @p table_view. This object will be
8180
* destroyed last when the TableChunk is destroyed or spilled.
@@ -91,7 +90,6 @@ class TableChunk {
9190
*/
9291
TableChunk(
9392
cudf::table_view table_view,
94-
std::size_t device_alloc_size,
9593
rmm::cuda_stream_view stream,
9694
OwningWrapper&& owner,
9795
ExclusiveView exclusive_view

cpp/src/streaming/cudf/table_chunk.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@
55

66
#include <memory>
77

8-
#include <rmm/aligned.hpp>
9-
108
#include <rapidsmpf/buffer/buffer.hpp>
9+
#include <rapidsmpf/integrations/cudf/utils.hpp>
1110
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>
1211

1312
namespace rapidsmpf::streaming {
@@ -24,7 +23,6 @@ TableChunk::TableChunk(std::unique_ptr<cudf::table> table, rmm::cuda_stream_view
2423

2524
TableChunk::TableChunk(
2625
cudf::table_view table_view,
27-
std::size_t device_alloc_size,
2826
rmm::cuda_stream_view stream,
2927
OwningWrapper&& owner,
3028
ExclusiveView exclusive_view
@@ -33,7 +31,8 @@ TableChunk::TableChunk(
3331
table_view_{table_view},
3432
stream_{stream},
3533
is_spillable_{static_cast<bool>(exclusive_view)} {
36-
data_alloc_size_[static_cast<std::size_t>(MemoryType::DEVICE)] = device_alloc_size;
34+
data_alloc_size_[static_cast<std::size_t>(MemoryType::DEVICE)] =
35+
estimated_memory_usage(table_view, stream_);
3736
make_available_cost_ = 0;
3837
}
3938

@@ -182,11 +181,11 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const {
182181
// Handle the case where `cudf::pack` allocates slightly more than the
183182
// input size. This can occur because cudf uses aligned allocations,
184183
// which may exceed the requested size. To accommodate this, we
185-
// slightly increase the reservation if the packed data fits within
186-
// the aligned size.
184+
// allow some wiggle room.
187185
if (packed_data->data->size > reservation.size()) {
188-
auto const aligned_size = rmm::align_up(reservation.size(), 1024);
189-
if (packed_data->data->size <= aligned_size) {
186+
auto const wiggle_room =
187+
1024 * static_cast<std::size_t>(table_view().num_columns());
188+
if (packed_data->data->size <= reservation.size() + wiggle_room) {
190189
reservation =
191190
br->reserve(
192191
MemoryType::HOST, packed_data->data->size, true

cpp/tests/streaming/test_table_chunk.cpp

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,7 @@ TEST_F(StreamingTableChunk, TableChunkOwner) {
5959
};
6060
auto make_chunk = [&](TableChunk::ExclusiveView exclusive_view) {
6161
return TableChunk{
62-
expect,
63-
expect.alloc_size(),
64-
stream,
65-
OwningWrapper(new int, deleter),
66-
exclusive_view
62+
expect, stream, OwningWrapper(new int, deleter), exclusive_view
6763
};
6864
};
6965
auto check_chunk = [&](TableChunk const& chunk, bool is_spillable) {
@@ -360,11 +356,7 @@ TEST_F(StreamingTableChunk, ToMessageNotSpillable) {
360356

361357
auto deleter = [](void* p) { delete static_cast<int*>(p); };
362358
auto chunk = std::make_unique<TableChunk>(
363-
expect,
364-
expect.alloc_size(),
365-
stream,
366-
OwningWrapper(new int, deleter),
367-
TableChunk::ExclusiveView::NO
359+
expect, stream, OwningWrapper(new int, deleter), TableChunk::ExclusiveView::NO
368360
);
369361

370362
Message m = to_message(seq, std::move(chunk));

python/rapidsmpf/rapidsmpf/streaming/cudf/table_chunk.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ from rmm.librmm.cuda_stream_view cimport cuda_stream_view
1010
from rmm.pylibrmm.stream cimport Stream
1111

1212
from rapidsmpf.buffer.buffer cimport MemoryType
13+
from rapidsmpf.buffer.resource cimport cpp_MemoryReservation
1314

1415

1516
cdef extern from "<rapidsmpf/streaming/cudf/table_chunk.hpp>" nogil:
@@ -20,6 +21,7 @@ cdef extern from "<rapidsmpf/streaming/cudf/table_chunk.hpp>" nogil:
2021
size_t make_available_cost() noexcept
2122
cpp_table_view table_view() except +
2223
bool_t is_spillable() noexcept
24+
cpp_TableChunk copy(cpp_MemoryReservation& reservation) except +
2325

2426
cdef class TableChunk:
2527
cdef unique_ptr[cpp_TableChunk] _handle

python/rapidsmpf/rapidsmpf/streaming/cudf/table_chunk.pyx

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33

44
from cpython.object cimport PyObject
55
from cython.operator cimport dereference as deref
6-
from libc.stddef cimport size_t
76
from libc.stdint cimport uint64_t
87
from libcpp.memory cimport unique_ptr
98
from libcpp.utility cimport move
10-
from pylibcudf.column cimport Column
119
from pylibcudf.libcudf.table.table_view cimport table_view as cpp_table_view
1210
from pylibcudf.table cimport Table
1311

@@ -38,7 +36,6 @@ cdef extern from * nogil:
3836
3937
std::unique_ptr<rapidsmpf::streaming::TableChunk> cpp_from_table_view_with_owner(
4038
cudf::table_view view,
41-
std::size_t device_alloc_size,
4239
rmm::cuda_stream_view stream,
4340
PyObject *owner,
4441
void(*py_deleter)(void *),
@@ -49,7 +46,6 @@ cdef extern from * nogil:
4946
Py_XINCREF(owner);
5047
return std::make_unique<rapidsmpf::streaming::TableChunk>(
5148
view,
52-
device_alloc_size,
5349
stream,
5450
rapidsmpf::OwningWrapper(owner, py_deleter),
5551
exclusive_view ?
@@ -66,6 +62,15 @@ cdef extern from * nogil:
6662
table->make_available(*reservation)
6763
);
6864
}
65+
66+
std::unique_ptr<rapidsmpf::streaming::TableChunk> cpp_table_copy(
67+
std::unique_ptr<rapidsmpf::streaming::TableChunk> const& table,
68+
rapidsmpf::MemoryReservation* reservation
69+
) {
70+
return std::make_unique<rapidsmpf::streaming::TableChunk>(
71+
table->copy(*reservation)
72+
);
73+
}
6974
}
7075
"""
7176
unique_ptr[cpp_TableChunk] cpp_release_table_chunk_from_message(
@@ -75,6 +80,9 @@ cdef extern from * nogil:
7580
unique_ptr[cpp_TableChunk] cpp_table_make_available(
7681
unique_ptr[cpp_TableChunk], cpp_MemoryReservation*
7782
) except +
83+
unique_ptr[cpp_TableChunk] cpp_table_copy(
84+
unique_ptr[cpp_TableChunk], cpp_MemoryReservation*
85+
) except +
7886

7987
cdef class TableChunk:
8088
"""
@@ -161,15 +169,10 @@ cdef class TableChunk:
161169
ensure the stream remains valid for the lifetime of the streaming pipeline.
162170
"""
163171
cdef cuda_stream_view _stream = stream.view()
164-
cdef size_t device_alloc_size = 0
165-
for col in table.columns():
166-
device_alloc_size += (<Column?>col).device_buffer_size()
167-
168172
cdef cpp_table_view view = table.view()
169173
return TableChunk.from_handle(
170174
cpp_from_table_view_with_owner(
171175
view,
172-
device_alloc_size,
173176
_stream,
174177
<PyObject *>table,
175178
py_deleter,
@@ -429,3 +432,29 @@ cdef class TableChunk:
429432
True if the table chunk can be spilled, otherwise, False.
430433
"""
431434
return deref(self.handle_ptr()).is_spillable()
435+
436+
def copy(self, MemoryReservation reservation not None):
437+
"""
438+
Create a deep copy of this table chunk.
439+
440+
All buffers are allocated for the new table chunk using the provided
441+
memory reservation, which also determines the target memory type of
442+
the copy.
443+
444+
Parameters
445+
----------
446+
reservation
447+
Memory reservation to consume for allocating the buffers of the
448+
new table chunk.
449+
450+
Returns
451+
-------
452+
TableChunk
453+
A new table chunk containing a deep copy of this chunk's data and
454+
metadata.
455+
"""
456+
cdef unique_ptr[cpp_TableChunk] ret
457+
cdef cpp_MemoryReservation* res = reservation._handle.get()
458+
with nogil:
459+
ret = cpp_table_copy(self._handle, res)
460+
return TableChunk.from_handle(move(ret))

python/rapidsmpf/rapidsmpf/tests/streaming/test_table_chunk.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,31 @@ def test_roundtrip(context: Context, stream: Stream, *, exclusive_view: bool) ->
106106
assert_eq(expect, table_chunk6.table_view())
107107

108108

109+
def test_copy_roundtrip(context: Context, stream: Stream) -> None:
110+
for nrows, ncols in [(1, 1), (1000, 100), (1, 1000)]:
111+
expect = cudf_to_pylibcudf_table(
112+
cudf.DataFrame(
113+
{
114+
f"{name}": cupy.random.random(nrows, dtype=cupy.float32)
115+
for name in range(ncols)
116+
}
117+
)
118+
)
119+
120+
tbl1 = TableChunk.from_pylibcudf_table(expect, stream, exclusive_view=True)
121+
res, _ = context.br().reserve(
122+
MemoryType.HOST,
123+
tbl1.data_alloc_size(MemoryType.DEVICE),
124+
allow_overbooking=True,
125+
)
126+
tbl2 = tbl1.copy(res)
127+
res, _ = context.br().reserve(
128+
MemoryType.DEVICE, tbl2.make_available_cost(), allow_overbooking=True
129+
)
130+
tbl3 = tbl2.make_available(res)
131+
assert_eq(expect, tbl3.table_view())
132+
133+
109134
def test_spillable_messages(context: Context, stream: Stream) -> None:
110135
seq = 42
111136
df1 = random_table(1024)

0 commit comments

Comments
 (0)