Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/include/cudf/detail/utilities/batched_memcpy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ namespace CUDF_EXPORT cudf {
namespace detail {

/**
* @brief A helper function that copies a vector of vectors from source to destination addresses in
* a batched manner.
* @brief Helper to batched memcpy specified numbers of bytes from source device iterators to
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor improvement to docstring here

* destination device iterators
*
* @tparam SrcIterator **[inferred]** The type of device-accessible source addresses iterator
* @tparam DstIterator **[inferred]** The type of device-accessible destination address iterator
Expand Down
41 changes: 20 additions & 21 deletions cpp/include/cudf/detail/utilities/batched_memset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,51 +33,50 @@ namespace CUDF_EXPORT cudf {
namespace detail {

/**
* @brief A helper function that takes in a vector of device spans and memsets them to the
* value provided using batches sent to the GPU.
* @brief Helper to batched memset a host span of device spans to the provided value
*
* @param bufs Vector with device spans of data
* @param host_buffers Host span of device spans of data
* @param value Value to memset all device spans to
* @param _stream Stream used for device memory operations and kernel launches
* @param stream Stream used for device memory operations and kernel launches
*
* @return The data in device spans all set to value
*/
template <typename T>
void batched_memset(std::vector<cudf::device_span<T>> const& bufs,
void batched_memset(cudf::host_span<cudf::device_span<T> const> host_buffers,
T const value,
rmm::cuda_stream_view stream)
{
// define task and bytes parameters
auto const num_bufs = bufs.size();
// Copy buffer spans into device memory and then get sizes
auto buffers = cudf::detail::make_device_uvector_async(
host_buffers, stream, cudf::get_current_device_resource_ref());

// copy bufs into device memory and then get sizes
auto gpu_bufs =
cudf::detail::make_device_uvector_async(bufs, stream, cudf::get_current_device_resource_ref());

// get a vector with the sizes of all buffers
// Vector of sizes of all buffer spans
auto sizes = thrust::make_transform_iterator(
thrust::counting_iterator<std::size_t>(0),
cuda::proclaim_return_type<std::size_t>(
[gpu_bufs = gpu_bufs.data()] __device__(std::size_t i) { return gpu_bufs[i].size(); }));
thrust::counting_iterator<size_t>(0),
cuda::proclaim_return_type<size_t>(
[buffers = buffers.data()] __device__(size_t i) { return buffers[i].size(); }));

// get an iterator with a constant value to memset
// Constant iterator to the value to memset
auto iter_in = thrust::make_constant_iterator(thrust::make_constant_iterator(value));

// get an iterator pointing to each device span
// Iterator to each device span pointer
auto iter_out = thrust::make_transform_iterator(
thrust::counting_iterator<std::size_t>(0),
thrust::counting_iterator<size_t>(0),
cuda::proclaim_return_type<T*>(
[gpu_bufs = gpu_bufs.data()] __device__(std::size_t i) { return gpu_bufs[i].data(); }));
[buffers = buffers.data()] __device__(size_t i) { return buffers[i].data(); }));

size_t temp_storage_bytes = 0;
auto const num_buffers = host_buffers.size();

cub::DeviceCopy::Batched(nullptr, temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream);
cub::DeviceCopy::Batched(
nullptr, temp_storage_bytes, iter_in, iter_out, sizes, num_buffers, stream);

// Allocate temporary storage
rmm::device_buffer d_temp_storage(
temp_storage_bytes, stream, cudf::get_current_device_resource_ref());

cub::DeviceCopy::Batched(
d_temp_storage.data(), temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream);
d_temp_storage.data(), temp_storage_bytes, iter_in, iter_out, sizes, num_buffers, stream);
}

} // namespace detail
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1561,7 +1561,7 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
// if we have any list columns that need further processing.
bool has_lists = false;
// Casting to std::byte since data buffer pointer is void *
std::vector<cudf::device_span<std::byte>> memset_bufs;
std::vector<cudf::device_span<cuda::std::byte>> memset_bufs;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using cuda::std::byte for proper device compatibility

// Validity Buffer is a uint32_t pointer
std::vector<cudf::device_span<cudf::bitmask_type>> nullmask_bufs;

Expand Down Expand Up @@ -1589,7 +1589,7 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
std::overflow_error);
out_buf.create_with_mask(
out_buf_size, cudf::mask_state::UNINITIALIZED, false, _stream, _mr);
memset_bufs.emplace_back(static_cast<std::byte*>(out_buf.data()), out_buf.data_size());
memset_bufs.emplace_back(static_cast<cuda::std::byte*>(out_buf.data()), out_buf.data_size());
nullmask_bufs.emplace_back(
out_buf.null_mask(),
cudf::util::round_up_safe(out_buf.null_mask_size(), sizeof(cudf::bitmask_type)) /
Expand Down Expand Up @@ -1704,7 +1704,7 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(
buffer_size, cudf::mask_state::UNINITIALIZED, false, _stream, _mr);
memset_bufs.emplace_back(static_cast<std::byte*>(out_buf.data()), out_buf.data_size());
memset_bufs.emplace_back(static_cast<cuda::std::byte*>(out_buf.data()), out_buf.data_size());
nullmask_bufs.emplace_back(
out_buf.null_mask(),
cudf::util::round_up_safe(out_buf.null_mask_size(), sizeof(cudf::bitmask_type)) /
Expand All @@ -1714,9 +1714,10 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
}
}

cudf::detail::batched_memset(memset_bufs, static_cast<std::byte>(0), _stream);
cudf::detail::batched_memset<cuda::std::byte>(
memset_bufs, static_cast<cuda::std::byte>(0), _stream);
// Need to set null mask bufs to all high bits
cudf::detail::batched_memset(
cudf::detail::batched_memset<cudf::bitmask_type>(
nullmask_bufs, std::numeric_limits<cudf::bitmask_type>::max(), _stream);
}

Expand Down
48 changes: 25 additions & 23 deletions cpp/tests/utilities_tests/batched_memset_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct MultiBufferTestIntegral : public cudf::test::BaseFixture {};

TEST(MultiBufferTestIntegral, BasicTest1)
{
std::vector<size_t> const BUF_SIZES{
std::vector<size_t> const buffer_sizes{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style

50000, 4, 1000, 0, 250000, 1, 100, 8000, 0, 1, 100, 1000, 10000, 100000, 0, 1, 100000};

// Device init
Expand All @@ -46,52 +46,54 @@ TEST(MultiBufferTestIntegral, BasicTest1)

// Creating base vector for data and setting it to all 0xFF
std::vector<std::vector<uint64_t>> expected;
std::transform(BUF_SIZES.begin(), BUF_SIZES.end(), std::back_inserter(expected), [](auto size) {
return std::vector<uint64_t>(size + 2000, std::numeric_limits<uint64_t>::max());
});
std::transform(
buffer_sizes.begin(), buffer_sizes.end(), std::back_inserter(expected), [](auto size) {
return std::vector<uint64_t>(size + 2000, std::numeric_limits<uint64_t>::max());
});

// set buffer region to other value
std::for_each(thrust::make_zip_iterator(thrust::make_tuple(expected.begin(), BUF_SIZES.begin())),
thrust::make_zip_iterator(thrust::make_tuple(expected.end(), BUF_SIZES.end())),
[](auto elem) {
std::fill_n(
thrust::get<0>(elem).begin() + 1000, thrust::get<1>(elem), 0xEEEEEEEEEEEEEEEE);
});
std::for_each(
thrust::make_zip_iterator(thrust::make_tuple(expected.begin(), buffer_sizes.begin())),
thrust::make_zip_iterator(thrust::make_tuple(expected.end(), buffer_sizes.end())),
[](auto elem) {
std::fill_n(thrust::get<0>(elem).begin() + 1000, thrust::get<1>(elem), 0xEEEEEEEEEEEEEEEE);
});

// Copy host vector data to device
std::vector<rmm::device_uvector<uint64_t>> device_bufs;
std::vector<rmm::device_uvector<uint64_t>> device_buffers;
std::transform(expected.begin(),
expected.end(),
std::back_inserter(device_bufs),
std::back_inserter(device_buffers),
[stream, mr](auto const& vec) {
return cudf::detail::make_device_uvector_async(vec, stream, mr);
});

// Initialize device buffers for memset
std::vector<cudf::device_span<uint64_t>> memset_bufs;
auto buffers =
cudf::detail::make_host_vector<cudf::device_span<uint64_t>>(device_buffers.size(), stream);
std::transform(
thrust::make_zip_iterator(thrust::make_tuple(device_bufs.begin(), BUF_SIZES.begin())),
thrust::make_zip_iterator(thrust::make_tuple(device_bufs.end(), BUF_SIZES.end())),
std::back_inserter(memset_bufs),
thrust::make_zip_iterator(thrust::make_tuple(device_buffers.begin(), buffer_sizes.begin())),
thrust::make_zip_iterator(thrust::make_tuple(device_buffers.end(), buffer_sizes.end())),
buffers.begin(),
[](auto const& elem) {
return cudf::device_span<uint64_t>(thrust::get<0>(elem).data() + 1000, thrust::get<1>(elem));
});

// Function Call
cudf::detail::batched_memset(memset_bufs, uint64_t{0}, stream);
// Function call
cudf::detail::batched_memset<uint64_t>(buffers, uint64_t{0}, stream);

// Set all buffer regions to 0 for expected comparison
std::for_each(
thrust::make_zip_iterator(thrust::make_tuple(expected.begin(), BUF_SIZES.begin())),
thrust::make_zip_iterator(thrust::make_tuple(expected.end(), BUF_SIZES.end())),
thrust::make_zip_iterator(thrust::make_tuple(expected.begin(), buffer_sizes.begin())),
thrust::make_zip_iterator(thrust::make_tuple(expected.end(), buffer_sizes.end())),
[](auto elem) { std::fill_n(thrust::get<0>(elem).begin() + 1000, thrust::get<1>(elem), 0UL); });

// Compare to see that only given buffers are zeroed out
std::for_each(
thrust::make_zip_iterator(thrust::make_tuple(device_bufs.begin(), expected.begin())),
thrust::make_zip_iterator(thrust::make_tuple(device_bufs.end(), expected.end())),
thrust::make_zip_iterator(thrust::make_tuple(device_buffers.begin(), expected.begin())),
thrust::make_zip_iterator(thrust::make_tuple(device_buffers.end(), expected.end())),
[stream](auto const& elem) {
auto after_memset = cudf::detail::make_std_vector_async(thrust::get<0>(elem), stream);
auto const after_memset = cudf::detail::make_host_vector(thrust::get<0>(elem), stream);
EXPECT_TRUE(
std::equal(thrust::get<1>(elem).begin(), thrust::get<1>(elem).end(), after_memset.begin()));
});
Expand Down
Loading