Skip to content

Commit a29d011

Browse files
committed
q9 by hand implementation using read_parquet pieces
1 parent 7caf9a2 commit a29d011

File tree

10 files changed

+1631
-0
lines changed

10 files changed

+1631
-0
lines changed

cpp/benchmarks/streaming/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,5 @@ install(
3030
DESTINATION bin/benchmarks/librapidsmpf
3131
EXCLUDE_FROM_ALL
3232
)
33+
34+
add_subdirectory(ndsh)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# =================================================================================
2+
# cmake-format: off
3+
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# cmake-format: on
6+
# =================================================================================
7+
8+
if(NOT RAPIDSMPF_HAVE_MPI)
9+
message(FATAL_ERROR "Streaming NDSH benchmarks require MPI support")
10+
endif()
11+
12+
if(NOT RAPIDSMPF_HAVE_STREAMING)
13+
message(FATAL_ERROR "Streaming NDSH benchmarks require streaming support")
14+
endif()
15+
16+
add_library(rapidsmpfndsh concatenate.cpp join.cpp utilities.cpp)
17+
18+
set_target_properties(
19+
rapidsmpfndsh
20+
PROPERTIES BUILD_RPATH "\$ORIGIN"
21+
INSTALL_RPATH "\$ORIGIN"
22+
CXX_STANDARD 20
23+
CXX_STANDARD_REQUIRED ON
24+
CUDA_STANDARD 20
25+
CUDA_STANDARD_REQUIRED ON
26+
POSITION_INDEPENDENT_CODE ON
27+
INTERFACE_POSITION_INDEPENDENT_CODE ON
28+
)
29+
30+
target_compile_options(
31+
rapidsmpfndsh PRIVATE "$<$<COMPILE_LANGUAGE:CXX>:${RAPIDSMPF_CXX_FLAGS}>"
32+
"$<$<COMPILE_LANGUAGE:CUDA>:${RAPIDSMPF_CUDA_FLAGS}>"
33+
)
34+
target_link_libraries(
35+
rapidsmpfndsh
36+
PRIVATE rapidsmpf::rapidsmpf rmm::rmm cudf::cudf libcoro $<TARGET_NAME_IF_EXISTS:ucxx::ucxx>
37+
$<TARGET_NAME_IF_EXISTS:MPI::MPI_C> $<TARGET_NAME_IF_EXISTS:conda_env> maybe_asan
38+
)
39+
40+
add_executable(q09 "q09.cpp")
41+
set_target_properties(
42+
q09
43+
PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$<BUILD_INTERFACE:${RAPIDSMPF_BINARY_DIR}/benchmarks/ndsh>"
44+
CXX_STANDARD 20
45+
CXX_STANDARD_REQUIRED ON
46+
CUDA_STANDARD 20
47+
CUDA_STANDARD_REQUIRED ON
48+
)
49+
target_compile_options(
50+
q09 PRIVATE "$<$<COMPILE_LANGUAGE:CXX>:${RAPIDSMPF_CXX_FLAGS}>"
51+
"$<$<COMPILE_LANGUAGE:CUDA>:${RAPIDSMPF_CUDA_FLAGS}>"
52+
)
53+
target_link_libraries(
54+
q09 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf ucxx::ucxx ucx::ucp
55+
$<TARGET_NAME_IF_EXISTS:MPI::MPI_C> $<TARGET_NAME_IF_EXISTS:conda_env> maybe_asan
56+
)
57+
install(
58+
TARGETS q09
59+
COMPONENT benchmarking
60+
DESTINATION bin/benchmarks/librapidsmpf
61+
EXCLUDE_FROM_ALL
62+
)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/**
2+
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include "concatenate.hpp"
7+
8+
#include <memory>
9+
#include <ranges>
10+
11+
#include <cudf/concatenate.hpp>
12+
#include <cudf/table/table.hpp>
13+
#include <cudf/table/table_view.hpp>
14+
15+
#include <rapidsmpf/cuda_event.hpp>
16+
#include <rapidsmpf/cuda_stream.hpp>
17+
#include <rapidsmpf/streaming/core/channel.hpp>
18+
#include <rapidsmpf/streaming/core/context.hpp>
19+
#include <rapidsmpf/streaming/core/message.hpp>
20+
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>
21+
22+
#include "utilities.hpp"
23+
24+
namespace rapidsmpf::ndsh {
25+
26+
27+
streaming::Node concatenate(
28+
std::shared_ptr<streaming::Context> ctx,
29+
std::shared_ptr<streaming::Channel> ch_in,
30+
std::shared_ptr<streaming::Channel> ch_out,
31+
ConcatOrder order
32+
) {
33+
streaming::ShutdownAtExit c{ch_in, ch_out};
34+
CudaEvent event;
35+
std::vector<streaming::Message> messages;
36+
ctx->comm()->logger().print("Concatenate");
37+
auto concat_stream = ctx->br()->stream_pool().get_stream();
38+
while (true) {
39+
co_await ctx->executor()->schedule();
40+
auto msg = co_await ch_in->receive();
41+
if (msg.empty()) {
42+
break;
43+
}
44+
messages.push_back(std::move(msg));
45+
}
46+
if (messages.size() == 0) {
47+
co_await ch_out->send(
48+
streaming::to_message(
49+
0,
50+
std::make_unique<streaming::TableChunk>(
51+
std::make_unique<cudf::table>(), concat_stream
52+
)
53+
)
54+
);
55+
} else if (messages.size() == 1) {
56+
co_await ch_out->send(std::move(messages[0]));
57+
} else {
58+
std::vector<streaming::TableChunk> chunks;
59+
std::vector<cudf::table_view> views;
60+
if (order == ConcatOrder::LINEARIZE) {
61+
std::ranges::sort(messages, std::less{}, [](auto&& msg) {
62+
return msg.sequence_number();
63+
});
64+
}
65+
chunks.reserve(messages.size());
66+
views.reserve(messages.size());
67+
for (auto&& msg : messages) {
68+
auto chunk = msg.release<streaming::TableChunk>();
69+
chunk = to_device(ctx, std::move(chunk));
70+
cuda_stream_join(concat_stream, chunk.stream(), &event);
71+
views.push_back(chunk.table_view());
72+
chunks.push_back(std::move(chunk));
73+
}
74+
auto result = std::make_unique<streaming::TableChunk>(
75+
cudf::concatenate(views, concat_stream, ctx->br()->device_mr()), concat_stream
76+
);
77+
cuda_stream_join(
78+
chunks | std::views::transform([](auto&& chunk) { return chunk.stream(); }),
79+
std::ranges::single_view(concat_stream),
80+
&event
81+
);
82+
chunks.clear();
83+
co_await ch_out->send(streaming::to_message(0, std::move(result)));
84+
}
85+
co_await ch_out->drain(ctx->executor());
86+
}
87+
88+
} // namespace rapidsmpf::ndsh
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#pragma once
7+
#include <memory>
8+
9+
#include <rapidsmpf/streaming/core/channel.hpp>
10+
#include <rapidsmpf/streaming/core/context.hpp>
11+
#include <rapidsmpf/streaming/core/node.hpp>
12+
13+
namespace rapidsmpf::ndsh {
14+
15+
enum class ConcatOrder : bool {
16+
DONT_CARE,
17+
LINEARIZE,
18+
};
19+
20+
streaming::Node concatenate(
21+
std::shared_ptr<streaming::Context> ctx,
22+
std::shared_ptr<streaming::Channel> ch_in,
23+
std::shared_ptr<streaming::Channel> ch_out,
24+
ConcatOrder order = ConcatOrder::DONT_CARE
25+
);
26+
27+
} // namespace rapidsmpf::ndsh

0 commit comments

Comments
 (0)