Skip to content

Commit 968d97e

Browse files
authored
Merge pull request #670 from rapidsai/branch-25.04
Forward-merge branch-25.04 into branch-25.06
2 parents 6165e62 + a4170fc commit 968d97e

File tree

4 files changed

+117
-25
lines changed

4 files changed

+117
-25
lines changed

cpp/include/kvikio/parallel_operation.hpp

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
#include <atomic>
1919
#include <cassert>
2020
#include <future>
21+
#include <memory>
2122
#include <numeric>
2223
#include <system_error>
24+
#include <type_traits>
2325
#include <utility>
2426
#include <vector>
2527

@@ -32,6 +34,29 @@ namespace kvikio {
3234

3335
namespace detail {
3436

37+
/**
38+
* @brief Utility function to create a copyable callable from a move-only callable.
39+
*
40+
* The underlying thread pool uses `std::function` (until C++23) or `std::move_only_function`
41+
* (since C++23) as the element type of the task queue. For the former case that currently applies,
42+
* the `std::function` requires its "target" (associated callable) to be copy-constructible. This
43+
* utility function is a workaround for those move-only callables.
44+
*
45+
* @tparam F Callable type. F shall be move-only.
46+
* @param op Callable.
47+
* @return A new callable that satisfies the copy-constructible condition.
48+
*/
49+
template <typename F>
50+
auto make_copyable_lambda(F op)
51+
{
52+
// Create the callable on the heap by moving from op. Use a shared pointer to manage its lifetime.
53+
auto sp = std::make_shared<F>(std::move(op));
54+
55+
// Use the copyable closure as the proxy of the move-only callable.
56+
return
57+
[sp](auto&&... args) -> decltype(auto) { return (*sp)(std::forward<decltype(args)>(args)...); };
58+
}
59+
3560
/**
3661
* @brief Determine the NVTX color and call index. They are used to identify tasks from different
3762
* pread/pwrite calls. Tasks from the same pread/pwrite call are given the same color and call
@@ -50,6 +75,11 @@ inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and
5075
return {nvtx_color, call_idx};
5176
}
5277

78+
/**
79+
* @brief Submit the task callable to the underlying thread pool.
80+
*
81+
* Both the callable and arguments shall satisfy copy-constructible.
82+
*/
5383
template <typename F, typename T>
5484
std::future<std::size_t> submit_task(F op,
5585
T buf,
@@ -59,12 +89,40 @@ std::future<std::size_t> submit_task(F op,
5989
std::uint64_t nvtx_payload = 0ull,
6090
nvtx_color_type nvtx_color = NvtxManager::default_color())
6191
{
92+
static_assert(std::is_invocable_r_v<std::size_t,
93+
decltype(op),
94+
decltype(buf),
95+
decltype(size),
96+
decltype(file_offset),
97+
decltype(devPtr_offset)>);
98+
6299
return defaults::thread_pool().submit_task([=] {
63100
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
64101
return op(buf, size, file_offset, devPtr_offset);
65102
});
66103
}
67104

105+
/**
106+
* @brief Submit the move-only task callable to the underlying thread pool.
107+
*
108+
* @tparam F Callable type. F shall be move-only and have no argument.
109+
* @param op Callable.
110+
* @return A future to be used later to check if the operation has finished its execution.
111+
*/
112+
template <typename F>
113+
std::future<std::size_t> submit_move_only_task(
114+
F op_move_only,
115+
std::uint64_t nvtx_payload = 0ull,
116+
nvtx_color_type nvtx_color = NvtxManager::default_color())
117+
{
118+
static_assert(std::is_invocable_r_v<std::size_t, F>);
119+
auto op_copyable = make_copyable_lambda(std::move(op_move_only));
120+
return defaults::thread_pool().submit_task([=] {
121+
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
122+
return op_copyable();
123+
});
124+
}
125+
68126
} // namespace detail
69127

70128
/**
@@ -90,40 +148,40 @@ std::future<std::size_t> parallel_io(F op,
90148
nvtx_color_type nvtx_color = NvtxManager::default_color())
91149
{
92150
KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);
151+
static_assert(std::is_invocable_r_v<std::size_t,
152+
decltype(op),
153+
decltype(buf),
154+
decltype(size),
155+
decltype(file_offset),
156+
decltype(devPtr_offset)>);
93157

94158
// Single-task guard
95159
if (task_size >= size || page_size >= size) {
96160
return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
97161
}
98162

99-
// We know an upper bound of the total number of tasks
100163
std::vector<std::future<std::size_t>> tasks;
101-
tasks.reserve(size / task_size + 2);
164+
tasks.reserve(size / task_size);
102165

103-
// 1) Submit `task_size` sized tasks
104-
while (size >= task_size) {
166+
// 1) Submit all tasks but the last one. These are all `task_size` sized tasks.
167+
while (size > task_size) {
105168
tasks.push_back(
106169
detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
107170
file_offset += task_size;
108171
devPtr_offset += task_size;
109172
size -= task_size;
110173
}
111174

112-
// 2) Submit a task for the remainder
113-
if (size > 0) {
114-
tasks.push_back(
115-
detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color));
116-
}
117-
118-
// Finally, we sum the result of all tasks.
119-
auto gather_tasks = [](std::vector<std::future<std::size_t>>&& tasks) -> std::size_t {
120-
std::size_t ret = 0;
175+
// 2) Submit the last task, which consists of performing the last I/O and waiting the previous
176+
// tasks.
177+
auto last_task = [=, tasks = std::move(tasks)]() mutable -> std::size_t {
178+
auto ret = op(buf, size, file_offset, devPtr_offset);
121179
for (auto& task : tasks) {
122180
ret += task.get();
123181
}
124182
return ret;
125183
};
126-
return std::async(std::launch::deferred, gather_tasks, std::move(tasks));
184+
return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color);
127185
}
128186

129187
} // namespace kvikio

cpp/include/kvikio/utils.hpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,43 @@ class PushAndPopContext {
152152
std::tuple<void*, std::size_t, std::size_t> get_alloc_info(void const* devPtr,
153153
CUcontext* ctx = nullptr);
154154

155+
/**
156+
* @brief Create a shared state in a future object that is immediately ready.
157+
*
158+
* A partial implementation of the namesake function from the concurrency TS
159+
* (https://en.cppreference.com/w/cpp/experimental/make_ready_future). The cases of
160+
* std::reference_wrapper and void are not implemented.
161+
*
162+
* @tparam T Type of the value provided.
163+
* @param t Object provided.
164+
* @return A future holding a decayed copy of the object provided.
165+
*/
166+
template <typename T>
167+
std::future<std::decay_t<T>> make_ready_future(T&& t)
168+
{
169+
std::promise<std::decay_t<T>> p;
170+
auto fut = p.get_future();
171+
p.set_value(std::forward<T>(t));
172+
return fut;
173+
}
174+
175+
/**
176+
* @brief Check the status of the future object. True indicates that the result is available in the
177+
* future's shared state. False otherwise.
178+
*
179+
* The future shall not be created using `std::async(std::launch::deferred)`. Otherwise, this
180+
* function always returns true.
181+
*
182+
* @tparam T Type of the future.
183+
* @param future Instance of the future.
184+
* @return Boolean answer indicating if the future is ready or not.
185+
*/
155186
template <typename T>
156187
bool is_future_done(T const& future)
157188
{
189+
KVIKIO_EXPECT(future.valid(),
190+
"The future object does not refer to a valid shared state.",
191+
std::invalid_argument);
158192
return future.wait_for(std::chrono::seconds(0)) != std::future_status::timeout;
159193
}
160194

cpp/src/defaults.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ defaults::defaults()
103103
}
104104
// Determine the default value of `gds_threshold`
105105
{
106-
ssize_t const env = getenv_or("KVIKIO_GDS_THRESHOLD", 1024 * 1024);
106+
ssize_t const env = getenv_or("KVIKIO_GDS_THRESHOLD", 16 * 1024);
107107
KVIKIO_EXPECT(
108108
env >= 0, "KVIKIO_GDS_THRESHOLD has to be a positive integer", std::invalid_argument);
109109
_gds_threshold = env;

cpp/src/file_handle.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,11 @@ std::future<std::size_t> FileHandle::pread(void* buf,
173173

174174
// Shortcut that circumvent the threadpool and use the POSIX backend directly.
175175
if (size < gds_threshold) {
176-
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
177-
PushAndPopContext c(ctx);
178-
return detail::posix_device_read(_file_direct_off.fd(), buf, size, file_offset, 0);
179-
};
180-
return std::async(std::launch::deferred, task);
176+
PushAndPopContext c(ctx);
177+
auto bytes_read = detail::posix_device_read(_file_direct_off.fd(), buf, size, file_offset, 0);
178+
// Maintain API consistency while making this trivial case synchronous.
179+
// The result in the future is immediately available after the call.
180+
return make_ready_future(bytes_read);
181181
}
182182

183183
// Let's synchronize once instead of in each task.
@@ -225,11 +225,11 @@ std::future<std::size_t> FileHandle::pwrite(void const* buf,
225225

226226
// Shortcut that circumvent the threadpool and use the POSIX backend directly.
227227
if (size < gds_threshold) {
228-
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
229-
PushAndPopContext c(ctx);
230-
return detail::posix_device_write(_file_direct_off.fd(), buf, size, file_offset, 0);
231-
};
232-
return std::async(std::launch::deferred, task);
228+
PushAndPopContext c(ctx);
229+
auto bytes_write = detail::posix_device_write(_file_direct_off.fd(), buf, size, file_offset, 0);
230+
// Maintain API consistency while making this trivial case synchronous.
231+
// The result in the future is immediately available after the call.
232+
return make_ready_future(bytes_write);
233233
}
234234

235235
// Let's synchronize once instead of in each task.

0 commit comments

Comments
 (0)