Skip to content

Commit 5bd42c3

Browse files
authored
Merge pull request #62 from bdice/streams
Add stream support.
2 parents 887d60d + a627cd6 commit 5bd42c3

File tree

12 files changed

+123
-49
lines changed

12 files changed

+123
-49
lines changed

velox/experimental/cudf/connectors/parquet/ParquetDataSink.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,9 @@ void ParquetDataSink::appendData(RowVectorPtr input) {
157157
checkRunning();
158158

159159
// Convert the input RowVectorPtr to cudf::table
160-
auto cudfInput = with_arrow::to_cudf_table(input, input->pool());
160+
auto stream = cudfGlobalStreamPool().get_stream();
161+
auto cudfInput = with_arrow::to_cudf_table(input, input->pool(), stream);
162+
stream.synchronize();
161163
VELOX_CHECK_NOT_NULL(
162164
cudfInput, "Failed to convert input RowVectorPtr to cudf::table");
163165

velox/experimental/cudf/connectors/parquet/ParquetDataSource.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,13 @@ std::optional<RowVectorPtr> ParquetDataSource::next(
115115
}
116116

117117
if (readTables.size()) {
118-
auto readTable = concatenateTables(std::move(readTables));
118+
auto stream = cudf::get_default_stream();
119+
auto readTable = concatenateTables(std::move(readTables), stream);
119120
if (cudfTable_) {
120121
// Concatenate the current view ahead of the read table.
121122
auto tableViews = std::vector<cudf::table_view>{
122123
currentCudfTableView_, readTable->view()};
123-
cudfTable_ = cudf::concatenate(tableViews, cudf::get_default_stream());
124+
cudfTable_ = cudf::concatenate(tableViews, stream);
124125
} else {
125126
cudfTable_ = std::move(readTable);
126127
}
@@ -135,8 +136,10 @@ std::optional<RowVectorPtr> ParquetDataSource::next(
135136
// If the current table view has <= size rows, this is the last chunk.
136137
if (currentCudfTableView_.num_rows() <= size) {
137138
// Convert the current table view to RowVectorPtr.
138-
output =
139-
with_arrow::to_velox_column(currentCudfTableView_, pool_, columnNames);
139+
auto stream = cudf::get_default_stream();
140+
output = with_arrow::to_velox_column(
141+
currentCudfTableView_, pool_, columnNames, stream);
142+
stream.synchronize();
140143
// Reset internal tables
141144
resetCudfTableAndView();
142145
} else {
@@ -149,7 +152,10 @@ std::optional<RowVectorPtr> ParquetDataSource::next(
149152
tableSplits[0].num_rows(),
150153
"cudf::split yielded incorrect partitions");
151154
// Convert the first split view to RowVectorPtr.
152-
output = with_arrow::to_velox_column(tableSplits[0], pool_, columnNames);
155+
auto stream = cudf::get_default_stream();
156+
output =
157+
with_arrow::to_velox_column(tableSplits[0], pool_, columnNames, stream);
158+
stream.synchronize();
153159
// Set the current view to the second split view.
154160
currentCudfTableView_ = tableSplits[1];
155161
}

velox/experimental/cudf/exec/CudfConversion.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,14 @@ RowVectorPtr CudfFromVelox::getOutput() {
111111
return nullptr;
112112
}
113113

114+
// Get a stream from the global stream pool
115+
auto stream = cudfGlobalStreamPool().get_stream();
116+
114117
// Convert RowVector to cudf table
115-
auto tbl = with_arrow::to_cudf_table(input, input->pool());
116-
cudf::get_default_stream().synchronize();
118+
auto tbl = with_arrow::to_cudf_table(input, input->pool(), stream);
119+
120+
stream.synchronize();
121+
117122
VELOX_CHECK_NOT_NULL(tbl);
118123

119124
if (cudfDebugEnabled()) {
@@ -126,7 +131,7 @@ RowVectorPtr CudfFromVelox::getOutput() {
126131
// Return a CudfVector that owns the cudf table
127132
auto const size = tbl->num_rows();
128133
return std::make_shared<CudfVector>(
129-
input->pool(), outputType_, size, std::move(tbl));
134+
input->pool(), outputType_, size, std::move(tbl), stream);
130135
}
131136

132137
void CudfFromVelox::close() {
@@ -162,6 +167,7 @@ RowVectorPtr CudfToVelox::getOutput() {
162167
return nullptr;
163168
}
164169

170+
auto stream = inputs_.front()->stream();
165171
std::unique_ptr<cudf::table> tbl = inputs_.front()->release();
166172
inputs_.pop_front();
167173

@@ -172,12 +178,12 @@ RowVectorPtr CudfToVelox::getOutput() {
172178
std::cout << "CudfToVelox table number of rows: " << tbl->num_rows()
173179
<< std::endl;
174180
}
175-
176-
cudf::get_default_stream().synchronize();
177181
if (tbl->num_rows() == 0) {
178182
return nullptr;
179183
}
180-
RowVectorPtr output = with_arrow::to_velox_column(tbl->view(), pool(), "");
184+
RowVectorPtr output =
185+
with_arrow::to_velox_column(tbl->view(), pool(), "", stream);
186+
stream.synchronize();
181187
finished_ = noMoreInput_ && inputs_.empty();
182188
output->setType(outputType_);
183189
return output;

velox/experimental/cudf/exec/CudfFilterProject.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ RowVectorPtr CudfFilterProject::getOutput() {
258258

259259
auto cudf_input = std::dynamic_pointer_cast<CudfVector>(input_);
260260
VELOX_CHECK_NOT_NULL(cudf_input);
261+
auto stream = cudf_input->stream();
261262
auto input_table_columns = cudf_input->release()->release();
262263
// add ast unsupported precomputed columns to input_table
263264
// Works only directly on column in input table, not intermediate columns
@@ -267,13 +268,13 @@ RowVectorPtr CudfFilterProject::getOutput() {
267268
auto new_column = cudf::datetime::extract_datetime_component(
268269
input_table_columns[dependent_column_index]->view(),
269270
cudf::datetime::datetime_component::YEAR,
270-
cudf::get_default_stream(),
271+
stream,
271272
cudf::get_current_device_resource_ref());
272273
input_table_columns.emplace_back(std::move(new_column));
273274
} else if (ins_name == "length") {
274275
auto new_column = cudf::strings::count_characters(
275276
input_table_columns[dependent_column_index]->view(),
276-
cudf::get_default_stream(),
277+
stream,
277278
cudf::get_current_device_resource_ref());
278279
input_table_columns.emplace_back(std::move(new_column));
279280
} else {
@@ -289,7 +290,7 @@ RowVectorPtr CudfFilterProject::getOutput() {
289290
auto col = cudf::compute_column(
290291
cudf_table_view,
291292
tree.back(),
292-
cudf::get_default_stream(),
293+
stream,
293294
cudf::get_current_device_resource_ref());
294295
columns.emplace_back(std::move(col));
295296
}
@@ -308,6 +309,7 @@ RowVectorPtr CudfFilterProject::getOutput() {
308309
}
309310

310311
auto output_table = std::make_unique<cudf::table>(std::move(output_columns));
312+
stream.synchronize();
311313
auto const num_columns = output_table->num_columns();
312314
auto const size = output_table->num_rows();
313315
if (cudfDebugEnabled()) {
@@ -316,7 +318,7 @@ RowVectorPtr CudfFilterProject::getOutput() {
316318
}
317319

318320
auto cudf_output = std::make_shared<CudfVector>(
319-
input_->pool(), outputType_, size, std::move(output_table));
321+
input_->pool(), outputType_, size, std::move(output_table), stream);
320322
input_.reset();
321323
if (num_columns == 0 or size == 0) {
322324
return nullptr;

velox/experimental/cudf/exec/CudfHashJoin.cpp

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,22 @@ void CudfHashJoinBuild::noMoreInput() {
151151
};
152152

153153
auto cudf_tables = std::vector<std::unique_ptr<cudf::table>>(inputs_.size());
154+
auto input_streams = std::vector<rmm::cuda_stream_view>(inputs_.size());
154155
for (int i = 0; i < inputs_.size(); i++) {
155156
VELOX_CHECK_NOT_NULL(inputs_[i]);
157+
input_streams[i] = inputs_[i]->stream();
156158
cudf_tables[i] = inputs_[i]->release();
157159
}
158-
auto tbl = concatenateTables(std::move(cudf_tables));
160+
auto stream = cudfGlobalStreamPool().get_stream();
161+
cudf::detail::join_streams(input_streams, stream);
162+
auto tbl = concatenateTables(std::move(cudf_tables), stream);
163+
164+
// Release input data after synchronizing
165+
stream.synchronize();
166+
input_streams.clear();
167+
cudf_tables.clear();
159168

160169
// Release input data
161-
cudf::get_default_stream().synchronize();
162170
inputs_.clear();
163171

164172
VELOX_CHECK_NOT_NULL(tbl);
@@ -246,6 +254,7 @@ RowVectorPtr CudfHashJoinProbe::getOutput() {
246254
}
247255
auto cudf_input = std::dynamic_pointer_cast<CudfVector>(input_);
248256
VELOX_CHECK_NOT_NULL(cudf_input);
257+
auto stream = cudf_input->stream();
249258
auto tbl = cudf_input->release();
250259
if (cudfDebugEnabled()) {
251260
std::cout << "Probe table number of columns: " << tbl->num_columns()
@@ -307,8 +316,8 @@ RowVectorPtr CudfHashJoinProbe::getOutput() {
307316
hb.get(),
308317
hashObject_.has_value());
309318
}
310-
auto const [left_join_indices, right_join_indices] =
311-
hb->inner_join(tbl->view().select(probe_key_indices));
319+
auto const [left_join_indices, right_join_indices] = hb->inner_join(
320+
tbl->view().select(probe_key_indices), std::nullopt, stream);
312321
auto left_indices_span =
313322
cudf::device_span<cudf::size_type const>{*left_join_indices};
314323
auto right_indices_span =
@@ -361,8 +370,10 @@ RowVectorPtr CudfHashJoinProbe::getOutput() {
361370
auto left_indices_col = cudf::column_view{left_indices_span};
362371
auto right_indices_col = cudf::column_view{right_indices_span};
363372
auto constexpr oob_policy = cudf::out_of_bounds_policy::DONT_CHECK;
364-
auto left_result = cudf::gather(left_input, left_indices_col, oob_policy);
365-
auto right_result = cudf::gather(right_input, right_indices_col, oob_policy);
373+
auto left_result =
374+
cudf::gather(left_input, left_indices_col, oob_policy, stream);
375+
auto right_result =
376+
cudf::gather(right_input, right_indices_col, oob_policy, stream);
366377

367378
if (cudfDebugEnabled()) {
368379
std::cout << "Left result number of columns: " << left_result->num_columns()
@@ -391,7 +402,7 @@ RowVectorPtr CudfHashJoinProbe::getOutput() {
391402
return nullptr;
392403
}
393404
return std::make_shared<CudfVector>(
394-
pool(), outputType, size, std::move(cudf_output));
405+
pool(), outputType, size, std::move(cudf_output), stream);
395406
}
396407

397408
exec::BlockingReason CudfHashJoinProbe::isBlocked(ContinueFuture* future) {

velox/experimental/cudf/exec/CudfOrderBy.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,20 @@ void CudfOrderBy::noMoreInput() {
8686
return;
8787
}
8888
auto cudf_tables = std::vector<std::unique_ptr<cudf::table>>(inputs_.size());
89+
auto input_streams = std::vector<rmm::cuda_stream_view>(inputs_.size());
8990
for (int i = 0; i < inputs_.size(); i++) {
9091
VELOX_CHECK_NOT_NULL(inputs_[i]);
92+
input_streams[i] = inputs_[i]->stream();
9193
cudf_tables[i] = inputs_[i]->release();
9294
}
93-
auto tbl = concatenateTables(std::move(cudf_tables));
95+
auto stream = cudfGlobalStreamPool().get_stream();
96+
cudf::detail::join_streams(input_streams, stream);
97+
auto tbl = concatenateTables(std::move(cudf_tables), stream);
98+
99+
// Release input data after synchronizing
100+
stream.synchronize();
101+
input_streams.clear();
102+
cudf_tables.clear();
94103

95104
// Release input data
96105
inputs_.clear();
@@ -105,10 +114,11 @@ void CudfOrderBy::noMoreInput() {
105114

106115
auto keys = tbl->view().select(sort_keys_);
107116
auto values = tbl->view();
108-
auto result = cudf::sort_by_key(values, keys, column_order_, null_order_);
117+
auto result =
118+
cudf::sort_by_key(values, keys, column_order_, null_order_, stream);
109119
auto const size = result->num_rows();
110120
outputTable_ = std::make_shared<CudfVector>(
111-
pool(), outputType_, size, std::move(result));
121+
pool(), outputType_, size, std::move(result), stream);
112122
}
113123

114124
RowVectorPtr CudfOrderBy::getOutput() {

velox/experimental/cudf/exec/Utilities.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
#include <memory>
1919
#include <string_view>
2020

21+
#include "velox/experimental/cudf/exec/Utilities.h"
22+
2123
#include <common/base/Exceptions.h>
24+
2225
#include <rmm/mr/device/arena_memory_resource.hpp>
2326
#include <rmm/mr/device/cuda_async_memory_resource.hpp>
2427
#include <rmm/mr/device/cuda_memory_resource.hpp>
@@ -28,12 +31,11 @@
2831
#include <rmm/mr/device/pool_memory_resource.hpp>
2932

3033
#include <cudf/concatenate.hpp>
34+
#include <cudf/detail/utilities/stream_pool.hpp>
3135
#include <cudf/utilities/default_stream.hpp>
3236
#include <cudf/utilities/error.hpp>
3337
#include <cudf/utilities/memory_resource.hpp>
3438

35-
#include "velox/experimental/cudf/exec/Utilities.h"
36-
3739
namespace facebook::velox::cudf_velox {
3840

3941
namespace {
@@ -84,13 +86,18 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(
8486
"\nExpecting: cuda, pool, async, arena, managed, or managed_pool");
8587
}
8688

89+
cudf::detail::cuda_stream_pool& cudfGlobalStreamPool() {
90+
return cudf::detail::global_cuda_stream_pool();
91+
};
92+
8793
bool cudfDebugEnabled() {
8894
const char* env_cudf_debug = std::getenv("VELOX_CUDF_DEBUG");
8995
return env_cudf_debug != nullptr && std::stoi(env_cudf_debug);
9096
}
9197

9298
std::unique_ptr<cudf::table> concatenateTables(
93-
std::vector<std::unique_ptr<cudf::table>> tables) {
99+
std::vector<std::unique_ptr<cudf::table>> tables,
100+
rmm::cuda_stream_view stream) {
94101
// Check for empty vector
95102
VELOX_CHECK_GT(tables.size(), 0);
96103

@@ -105,9 +112,7 @@ std::unique_ptr<cudf::table> concatenateTables(
105112
std::back_inserter(tableViews),
106113
[&](auto const& tbl) { return tbl->view(); });
107114
return cudf::concatenate(
108-
tableViews,
109-
cudf::get_default_stream(),
110-
cudf::get_current_device_resource_ref());
115+
tableViews, stream, cudf::get_current_device_resource_ref());
111116
}
112117

113118
} // namespace facebook::velox::cudf_velox

velox/experimental/cudf/exec/Utilities.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,32 @@
1919
#include <memory>
2020
#include <string_view>
2121

22+
#include <cudf/detail/utilities/stream_pool.hpp>
2223
#include <cudf/table/table.hpp>
2324
#include <rmm/mr/device/device_memory_resource.hpp>
2425

2526
namespace facebook::velox::cudf_velox {
2627

28+
/**
29+
* @brief Creates a memory resource based on the given mode.
30+
*/
2731
[[nodiscard]] std::shared_ptr<rmm::mr::device_memory_resource>
2832
create_memory_resource(std::string_view mode);
2933

34+
/**
35+
* @brief Returns the global CUDA stream pool used by cudf.
36+
*/
37+
[[nodiscard]] cudf::detail::cuda_stream_pool& cudfGlobalStreamPool();
38+
39+
/**
40+
* @brief Returns true if the VELOX_CUDF_DEBUG environment variable is set to a
41+
* nonzero value.
42+
*/
3043
bool cudfDebugEnabled();
3144

3245
// Concatenate a vector of cuDF tables into a single table
3346
std::unique_ptr<cudf::table> concatenateTables(
34-
std::vector<std::unique_ptr<cudf::table>> tables);
47+
std::vector<std::unique_ptr<cudf::table>> tables,
48+
rmm::cuda_stream_view stream);
3549

3650
} // namespace facebook::velox::cudf_velox

0 commit comments

Comments
 (0)