From d051bf857841b2feb4123760ce811d1ebea618b7 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 02:08:22 +0000 Subject: [PATCH 01/14] Add API to count number of deleted rows across deletion vector(s) --- .../cudf/io/experimental/deletion_vectors.hpp | 17 ++ .../parquet/experimental/deletion_vectors.cu | 228 ++++++++++++++++-- cpp/tests/io/parquet_deletion_vectors_test.cu | 228 ++++++++++++++++++ 3 files changed, 449 insertions(+), 24 deletions(-) diff --git a/cpp/include/cudf/io/experimental/deletion_vectors.hpp b/cpp/include/cudf/io/experimental/deletion_vectors.hpp index 8624c0d364e..d65fdfc34a7 100644 --- a/cpp/include/cudf/io/experimental/deletion_vectors.hpp +++ b/cpp/include/cudf/io/experimental/deletion_vectors.hpp @@ -178,6 +178,23 @@ table_with_metadata read_parquet( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref()); +/** + * @brief Computes the number of rows deleted by the serialized 64-bit roaring bitmap deletion + * vectors + * + * When the total number of rows exceeds @p chunk_max_rows, the computation is split into + * chunks of at most @p chunk_max_rows rows each, allowing datasets larger than 2 billion rows. + * + * @param deletion_vector_info Information about the deletion vectors and the index column + * @param chunk_max_rows Maximum number of rows per chunk. Defaults to `size_type` max + * @param stream CUDA stream used for device memory operations and kernel launches + * @return Number of rows deleted by the specified 64-bit roaring bitmap deletion vectors + */ +size_t compute_num_deleted_rows( + deletion_vector_info const& deletion_vector_info, + size_t chunk_max_rows = static_cast(std::numeric_limits::max()), + rmm::cuda_stream_view stream = cudf::get_default_stream()); + /** @} */ // end of group } // namespace io::parquet::experimental diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index 79b285c5d9d..b260c60ff10 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -18,7 +18,9 @@ #include #include +#include #include +#include #include #include #include @@ -162,8 +164,8 @@ std::unique_ptr compute_row_index_column( auto d_row_group_span_offsets = cudf::detail::make_device_uvector_async( row_group_span_offsets, stream, cudf::get_current_device_resource_ref()); auto in_iter = - thrust::make_zip_iterator(d_row_group_offsets.begin(), thrust::counting_iterator(0)); - auto out_iter = thrust::make_zip_iterator(row_indices_iter, row_group_keys.begin()); + cuda::make_zip_iterator(d_row_group_offsets.begin(), cuda::counting_iterator(0)); + auto out_iter = cuda::make_zip_iterator(row_indices_iter, row_group_keys.begin()); thrust::scatter(rmm::exec_policy_nosync(stream), in_iter, in_iter + num_row_groups, @@ -333,28 +335,102 @@ std::unique_ptr compute_row_mask_column( } /** - * @brief Computes a chunk of the BOOL8 row mask column from the row index column and the deletion - * vectors + * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors + * + * @note This function synchronizes the stream before returning the row mask column * * @param row_index_column View of the row index column - * @param deletion_vectors Queue of roaring bitmap wrappers - * @param deletion_vector_row_counts Queue of number of rows in each deletion vector - * @param start_row Starting row index of the current table chunk + * @param deletion_vector_refs Host span of cuco roaring bitmap references + * @param rows_per_deletion_vector Host span of number of rows per deletion vector * @param stream CUDA stream for kernel launches and data transfers * @param mr Device memory resource to allocate device memory for the row mask column * * @return Unique pointer to the row mask column */ -std::unique_ptr compute_partial_row_mask_column( +size_t compute_deleted_num_rows( cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span rows_per_deletion_vector, + rmm::cuda_stream_view stream) +{ + auto const num_rows = row_index_column.size(); + auto const num_deletion_vectors = static_cast(deletion_vector_refs.size()); + auto row_mask = + rmm::device_buffer(num_rows * sizeof(bool), stream, cudf::get_current_device_resource_ref()); + + auto row_mask_iter = static_cast(row_mask.data()); + + if (num_deletion_vectors == 1) { + CUDF_EXPECTS(rows_per_deletion_vector.front() == num_rows, + "Encountered a mismatch in the number of rows in the row index column and the " + "number of rows in the deletion vector"); + + deletion_vector_refs.front().get().contains( + row_index_column.begin(), row_index_column.end(), row_mask_iter, stream); + } else { + auto deletion_vector_row_offsets = std::vector(deletion_vector_refs.size() + 1); + deletion_vector_row_offsets[0] = 0; + std::inclusive_scan(rows_per_deletion_vector.begin(), + rows_per_deletion_vector.end(), + deletion_vector_row_offsets.begin() + 1); + + CUDF_EXPECTS(deletion_vector_row_offsets.back() == num_rows, + "Encountered a mismatch in the number of rows in the row index column and the " + "number of rows in the deletion vector(s)"); + + // Fork the stream if the number of deletion vectors is greater than the threshold + constexpr auto stream_fork_threshold = 8; + if (num_deletion_vectors >= stream_fork_threshold) { + auto streams = cudf::detail::fork_streams(stream, num_deletion_vectors); + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), + [&](auto const dv_idx) { + deletion_vector_refs[dv_idx].get().contains_async( + row_index_column.begin() + deletion_vector_row_offsets[dv_idx], + row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], + row_mask_iter + deletion_vector_row_offsets[dv_idx], + streams[dv_idx]); + }); + cudf::detail::join_streams(streams, stream); + } else { + // Otherwise, launch the queries on the same stream + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), + [&](auto const dv_idx) { + deletion_vector_refs[dv_idx].get().contains_async( + row_index_column.begin() + deletion_vector_row_offsets[dv_idx], + row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], + row_mask_iter + deletion_vector_row_offsets[dv_idx], + stream); + }); + stream.synchronize(); + } + } + + return static_cast( + thrust::count(rmm::exec_policy_nosync(stream), row_mask_iter, row_mask_iter + num_rows, true)); +} + +/** + * @brief Consumes deletion vectors and their row counts from the provided queues for a given + * number of rows + * + * Constructs roaring bitmaps from the queued deletion vector data and returns the constructed + * bitmaps (for lifetime management), their references, and per-deletion-vector row counts. + * + * @param num_rows Number of rows to consume from the queues + * @param deletion_vectors Queue of roaring bitmap wrappers + * @param deletion_vector_row_counts Queue of number of rows in each deletion vector + * @param stream CUDA stream for kernel launches and data transfers + * + * @return Tuple of (consumed roaring bitmap impls, deletion vector refs, row counts) + */ +auto consume_deletion_vectors( + size_type num_rows, std::queue& deletion_vectors, std::queue& deletion_vector_row_counts, - size_t start_row, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) + rmm::cuda_stream_view stream) { - auto const num_rows = row_index_column.size(); - std::vector row_counts; std::vector deletion_vectors_impls; std::vector> deletion_vector_refs; @@ -367,7 +443,6 @@ std::unique_ptr compute_partial_row_mask_column( std::to_string(deletion_vector_row_counts.size()) + " " + std::to_string(deletion_vectors.size())); - // Compute how many rows can be queried from the current roaring bitmap auto const row_count = std::min(num_rows - rows_filled, deletion_vector_row_counts.front()); row_counts.emplace_back(row_count); @@ -377,13 +452,9 @@ std::unique_ptr compute_partial_row_mask_column( CUDF_EXPECTS(deletion_vector.roaring_bitmap != nullptr, "Failed to construct roaring_bitmap"); deletion_vector_refs.emplace_back(std::ref(*(deletion_vector.roaring_bitmap))); - // If we still have remaining rows to query from the current roaring bitmap, update its row - // count if (std::cmp_less(row_count, deletion_vector_row_counts.front())) { deletion_vector_row_counts.front() = deletion_vector_row_counts.front() - row_count; } else { - // Else if the deletion vector is fully queried, move it to the temporary vector and pop it - // from the queue deletion_vectors_impls.emplace_back(std::move(deletion_vectors.front())); deletion_vectors.pop(); deletion_vector_row_counts.pop(); @@ -392,8 +463,53 @@ std::unique_ptr compute_partial_row_mask_column( rows_filled += row_count; } - // Compute the row index column with the computed row group row offsets and counts - return compute_row_mask_column(row_index_column, deletion_vector_refs, row_counts, stream, mr); + return std::make_tuple( + std::move(deletion_vectors_impls), std::move(deletion_vector_refs), std::move(row_counts)); +} + +/** + * @brief Computes a chunk of the BOOL8 row mask column from the row index column and the deletion + * vectors + * + * @param row_index_column View of the row index column + * @param deletion_vectors Queue of roaring bitmap wrappers + * @param deletion_vector_row_counts Queue of number of rows in each deletion vector + * @param stream CUDA stream for kernel launches and data transfers + * @param mr Device memory resource to allocate device memory for the row mask column + * + * @return Unique pointer to the row mask column + */ +std::unique_ptr compute_partial_row_mask_column( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto [impls, refs, row_counts] = consume_deletion_vectors( + row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); + return compute_row_mask_column(row_index_column, refs, row_counts, stream, mr); +} + +/** + * @brief Computes the number of deleted rows for a chunk of the row index column + * + * @param row_index_column View of the row index column for the current chunk + * @param deletion_vectors Queue of roaring bitmap wrappers + * @param deletion_vector_row_counts Queue of number of rows in each deletion vector + * @param stream CUDA stream for kernel launches and data transfers + * + * @return Number of rows deleted by the applicable deletion vectors in this chunk + */ +size_t compute_partial_deleted_num_rows( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream) +{ + auto [impls, refs, row_counts] = consume_deletion_vectors( + row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); + return compute_deleted_num_rows(row_index_column, refs, row_counts, stream); } } // namespace @@ -441,7 +557,7 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, // Push row group offsets and counts to the internal queues if (not row_group_offsets.empty()) { - auto iter = thrust::make_zip_iterator(row_group_offsets.begin(), row_group_num_rows.begin()); + auto iter = cuda::make_zip_iterator(row_group_offsets.begin(), row_group_num_rows.begin()); std::for_each(iter, iter + row_group_offsets.size(), [&](auto const& elem) { _row_group_row_offsets.push(cuda::std::get<0>(elem)); _row_group_row_counts.push(cuda::std::get<1>(elem)); @@ -450,8 +566,8 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, // Push deletion vector data spans and row counts to the internal queues if (not serialized_roaring_bitmaps.empty()) { - auto iter = thrust::make_zip_iterator(serialized_roaring_bitmaps.begin(), - deletion_vector_row_counts.begin()); + auto iter = cuda::make_zip_iterator(serialized_roaring_bitmaps.begin(), + deletion_vector_row_counts.begin()); std::for_each(iter, iter + serialized_roaring_bitmaps.size(), [&](auto const& elem) { _deletion_vectors.emplace(cuda::std::get<0>(elem)); _deletion_vector_row_counts.push(cuda::std::get<1>(elem)); @@ -519,7 +635,6 @@ table_with_metadata chunked_parquet_reader::read_chunk() auto row_mask = compute_partial_row_mask_column(table_with_index->get_column(0).view(), _deletion_vectors, _deletion_vector_row_counts, - _start_row, _stream, cudf::get_current_device_resource_ref()); return table_with_metadata{ @@ -611,4 +726,69 @@ table_with_metadata read_parquet(parquet_reader_options const& options, std::move(metadata)}; } +/** + * @copydoc cudf::io::parquet::experimental::compute_num_deleted_rows + */ +size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, + size_t chunk_max_rows, + rmm::cuda_stream_view stream) +{ + auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; + auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; + auto const& row_group_offsets = deletion_vector_info.row_group_offsets; + auto const& row_group_num_rows = deletion_vector_info.row_group_num_rows; + + if (serialized_roaring_bitmaps.empty()) { return 0; } + + auto const num_rows = + std::accumulate(row_group_num_rows.begin(), row_group_num_rows.end(), size_t{0}); + + CUDF_EXPECTS(chunk_max_rows > 0 and + std::cmp_less_equal(chunk_max_rows, std::numeric_limits::max()), + "chunk_max_rows must be in range (0, size_type max]"); + + auto const temp_mr = rmm::mr::get_current_device_resource_ref(); + auto const is_unspecified_rg_data = row_group_offsets.empty(); + + // Build queues of row group offsets and counts + std::queue rg_offsets_queue; + std::queue rg_counts_queue; + for (size_t i = 0; i < row_group_offsets.size(); ++i) { + rg_offsets_queue.push(row_group_offsets[i]); + rg_counts_queue.push(row_group_num_rows[i]); + } + + // Build queue of roaring bitmap impls and their row counts + std::queue dv_queue; + std::queue dv_row_counts_queue; + for (size_t i = 0; i < serialized_roaring_bitmaps.size(); ++i) { + dv_queue.emplace(serialized_roaring_bitmaps[i]); + dv_row_counts_queue.push(deletion_vector_row_counts[i]); + } + + size_t total_deleted = 0; + size_t rows_remaining = num_rows; + size_t start_row = 0; + + while (rows_remaining > 0) { + auto const chunk_rows = static_cast(std::min(rows_remaining, chunk_max_rows)); + + auto row_index_column = compute_partial_row_index_column(rg_offsets_queue, + rg_counts_queue, + start_row, + chunk_rows, + is_unspecified_rg_data, + stream, + temp_mr); + + total_deleted += compute_partial_deleted_num_rows( + row_index_column->view(), dv_queue, dv_row_counts_queue, stream); + + start_row += chunk_rows; + rows_remaining -= chunk_rows; + } + + return total_deleted; +} + } // namespace cudf::io::parquet::experimental diff --git a/cpp/tests/io/parquet_deletion_vectors_test.cu b/cpp/tests/io/parquet_deletion_vectors_test.cu index 376e0e00563..65d914ebd3f 100644 --- a/cpp/tests/io/parquet_deletion_vectors_test.cu +++ b/cpp/tests/io/parquet_deletion_vectors_test.cu @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -634,3 +635,230 @@ TEST_F(ParquetDeletionVectorsTest, CustomRowIndexColumn) mr); } } + +// Tests for compute_num_deleted_rows + +TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsEmpty) +{ + auto const stream = cudf::get_default_stream(); + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{}; + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, std::numeric_limits::max(), stream); + EXPECT_EQ(result, 0); +} + +TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsNoRowIndex) +{ + auto constexpr num_rows = 50'000; + auto constexpr deletion_probability = 0.5; + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + auto row_indices = thrust::host_vector(num_rows); + std::iota(row_indices.begin(), row_indices.end(), size_t{0}); + + auto [deletion_vector, expected_row_mask_column] = build_deletion_vector_and_expected_row_mask( + num_rows, deletion_probability, row_indices, stream, mr); + + auto const expected_row_mask = cudf::detail::make_host_vector( + cudf::device_span(expected_row_mask_column->view().data(), num_rows), stream); + auto const expected_deleted = + std::count(expected_row_mask.begin(), expected_row_mask.end(), false); + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ + .serialized_roaring_bitmaps = {deletion_vector}, + .deletion_vector_row_counts = {num_rows}, + .row_group_offsets = {}, + .row_group_num_rows = {num_rows}}; + + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, std::numeric_limits::max(), stream); + EXPECT_EQ(result, static_cast(expected_deleted)); +} + +TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsCustomRowIndex) +{ + auto constexpr num_rows = 25'000; + auto constexpr num_row_groups = 5; + auto constexpr deletion_probability = 0.4; + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Arbitrary row group offsets + auto row_group_offsets = std::vector(num_row_groups); + row_group_offsets[0] = static_cast(std::llround(1e9)); + std::transform( + thrust::counting_iterator(1), + thrust::counting_iterator(num_row_groups), + row_group_offsets.begin() + 1, + [&](auto i) { return static_cast(std::llround(row_group_offsets[i - 1] + 0.5e9)); }); + + // Split num_rows into num_row_groups spans + auto row_group_splits = std::vector(num_row_groups - 1); + { + std::mt19937 engine{0xf00d}; + std::uniform_int_distribution dist{1, num_rows}; + std::generate(row_group_splits.begin(), row_group_splits.end(), [&]() { return dist(engine); }); + std::sort(row_group_splits.begin(), row_group_splits.end()); + } + + auto row_group_num_rows = std::vector{}; + { + row_group_num_rows.reserve(num_row_groups); + auto previous_split = cudf::size_type{0}; + std::transform(row_group_splits.begin(), + row_group_splits.end(), + std::back_inserter(row_group_num_rows), + [&](auto current_split) { + auto current_split_size = current_split - previous_split; + previous_split = current_split; + return current_split_size; + }); + row_group_num_rows.push_back(num_rows - row_group_splits.back()); + } + + auto expected_row_indices = + build_expected_row_indices(row_group_offsets, row_group_num_rows, num_rows); + + auto [deletion_vector, expected_row_mask_column] = build_deletion_vector_and_expected_row_mask( + num_rows, deletion_probability, expected_row_indices, stream, mr); + + auto const expected_row_mask = cudf::detail::make_host_vector( + cudf::device_span(expected_row_mask_column->view().data(), num_rows), stream); + auto const expected_deleted = + std::count(expected_row_mask.begin(), expected_row_mask.end(), false); + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ + .serialized_roaring_bitmaps = {deletion_vector}, + .deletion_vector_row_counts = {num_rows}, + .row_group_offsets = row_group_offsets, + .row_group_num_rows = row_group_num_rows}; + + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, std::numeric_limits::max(), stream); + EXPECT_EQ(result, static_cast(expected_deleted)); +} + +TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsMultipleDeletionVectors) +{ + auto constexpr num_rows_per_dv = 10'000; + auto constexpr num_deletion_vectors = 5; + auto constexpr total_rows = num_rows_per_dv * num_deletion_vectors; + auto constexpr deletion_probability = 0.3; + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Each deletion vector covers a separate row group with a distinct offset + auto row_group_offsets = std::vector(num_deletion_vectors); + auto row_group_num_rows = std::vector(num_deletion_vectors, num_rows_per_dv); + for (int i = 0; i < num_deletion_vectors; ++i) { + row_group_offsets[i] = static_cast(i) * 100'000; + } + + auto expected_row_indices = + build_expected_row_indices(row_group_offsets, row_group_num_rows, total_rows); + + size_t total_expected_deleted = 0; + auto serialized_bitmaps = std::vector>{}; + auto bitmap_spans = std::vector>{}; + auto dv_row_counts = std::vector{}; + + for (int i = 0; i < num_deletion_vectors; ++i) { + auto span_start = i * num_rows_per_dv; + auto local_indices = + cudf::host_span(expected_row_indices.data() + span_start, num_rows_per_dv); + + auto [dv, mask_col] = build_deletion_vector_and_expected_row_mask( + num_rows_per_dv, deletion_probability, local_indices, stream, mr); + + auto const host_mask = cudf::detail::make_host_vector( + cudf::device_span(mask_col->view().data(), num_rows_per_dv), stream); + total_expected_deleted += std::count(host_mask.begin(), host_mask.end(), false); + + serialized_bitmaps.emplace_back(std::move(dv)); + dv_row_counts.push_back(num_rows_per_dv); + } + + bitmap_spans.reserve(num_deletion_vectors); + for (auto const& bm : serialized_bitmaps) { + bitmap_spans.emplace_back(bm); + } + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ + .serialized_roaring_bitmaps = bitmap_spans, + .deletion_vector_row_counts = dv_row_counts, + .row_group_offsets = row_group_offsets, + .row_group_num_rows = row_group_num_rows}; + + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, std::numeric_limits::max(), stream); + EXPECT_EQ(result, total_expected_deleted); +} + +TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsChunked) +{ + auto constexpr num_rows_per_dv = 10'000; + auto constexpr num_deletion_vectors = 5; + auto constexpr total_rows = num_rows_per_dv * num_deletion_vectors; + auto constexpr deletion_probability = 0.35; + // Force the chunking path by using a small chunk size + auto constexpr chunk_max_rows = size_t{15'000}; + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // No custom row indices — row_group_offsets is empty, row_group_num_rows provides the counts + auto row_group_num_rows = std::vector(num_deletion_vectors, num_rows_per_dv); + + // Build row indices as a simple 0..total_rows sequence (no custom offsets) + auto row_indices = thrust::host_vector(total_rows); + std::iota(row_indices.begin(), row_indices.end(), size_t{0}); + + size_t total_expected_deleted = 0; + auto serialized_bitmaps = std::vector>{}; + auto bitmap_spans = std::vector>{}; + auto dv_row_counts = std::vector{}; + + for (int i = 0; i < num_deletion_vectors; ++i) { + auto span_start = i * num_rows_per_dv; + auto local_indices = + cudf::host_span(row_indices.data() + span_start, num_rows_per_dv); + + auto [dv, mask_col] = build_deletion_vector_and_expected_row_mask( + num_rows_per_dv, deletion_probability, local_indices, stream, mr); + + auto const host_mask = cudf::detail::make_host_vector( + cudf::device_span(mask_col->view().data(), num_rows_per_dv), stream); + total_expected_deleted += std::count(host_mask.begin(), host_mask.end(), false); + + serialized_bitmaps.emplace_back(std::move(dv)); + dv_row_counts.push_back(num_rows_per_dv); + } + + bitmap_spans.reserve(num_deletion_vectors); + for (auto const& bm : serialized_bitmaps) { + bitmap_spans.emplace_back(bm); + } + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ + .serialized_roaring_bitmaps = bitmap_spans, + .deletion_vector_row_counts = dv_row_counts, + .row_group_offsets = {}, + .row_group_num_rows = row_group_num_rows}; + + // With default chunk_max_rows (small-path): verify baseline + auto const result_default = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, std::numeric_limits::max(), stream); + EXPECT_EQ(result_default, total_expected_deleted); + + // With small chunk_max_rows: forces the chunking path and splits across deletion vector + // boundaries (chunk_max_rows=15000 with 5 DVs of 10000 rows each = 50000 total rows, + // so we get chunks of 15000, 15000, 15000, 5000 rows) + auto const result_chunked = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, chunk_max_rows, stream); + EXPECT_EQ(result_chunked, total_expected_deleted); +} From a70ba27b066c1864d1b83a4bd7c5f3b2ae523131 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 02:20:12 +0000 Subject: [PATCH 02/14] Initial cleanup --- .../cudf/io/experimental/deletion_vectors.hpp | 4 +- .../parquet/experimental/deletion_vectors.cu | 6 +- cpp/tests/io/parquet_deletion_vectors_test.cu | 116 +++++------------- 3 files changed, 37 insertions(+), 89 deletions(-) diff --git a/cpp/include/cudf/io/experimental/deletion_vectors.hpp b/cpp/include/cudf/io/experimental/deletion_vectors.hpp index d65fdfc34a7..da23f2412d8 100644 --- a/cpp/include/cudf/io/experimental/deletion_vectors.hpp +++ b/cpp/include/cudf/io/experimental/deletion_vectors.hpp @@ -192,8 +192,8 @@ table_with_metadata read_parquet( */ size_t compute_num_deleted_rows( deletion_vector_info const& deletion_vector_info, - size_t chunk_max_rows = static_cast(std::numeric_limits::max()), - rmm::cuda_stream_view stream = cudf::get_default_stream()); + cudf::size_type chunk_max_rows = std::numeric_limits::max(), + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** @} */ // end of group diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index b260c60ff10..f514bc7fb8d 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -730,7 +730,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options, * @copydoc cudf::io::parquet::experimental::compute_num_deleted_rows */ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, - size_t chunk_max_rows, + cudf::size_type chunk_max_rows, rmm::cuda_stream_view stream) { auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; @@ -744,7 +744,7 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info std::accumulate(row_group_num_rows.begin(), row_group_num_rows.end(), size_t{0}); CUDF_EXPECTS(chunk_max_rows > 0 and - std::cmp_less_equal(chunk_max_rows, std::numeric_limits::max()), + std::cmp_less_equal(chunk_max_rows, std::numeric_limits::max()), "chunk_max_rows must be in range (0, size_type max]"); auto const temp_mr = rmm::mr::get_current_device_resource_ref(); @@ -771,7 +771,7 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info size_t start_row = 0; while (rows_remaining > 0) { - auto const chunk_rows = static_cast(std::min(rows_remaining, chunk_max_rows)); + auto const chunk_rows = std::min(rows_remaining, chunk_max_rows); auto row_index_column = compute_partial_row_index_column(rg_offsets_queue, rg_counts_queue, diff --git a/cpp/tests/io/parquet_deletion_vectors_test.cu b/cpp/tests/io/parquet_deletion_vectors_test.cu index 65d914ebd3f..f3a43a1ef65 100644 --- a/cpp/tests/io/parquet_deletion_vectors_test.cu +++ b/cpp/tests/io/parquet_deletion_vectors_test.cu @@ -17,7 +17,8 @@ #include #include -#include +#include +#include #include #include @@ -88,14 +89,13 @@ auto build_expected_row_indices(cudf::host_span row_group_offsets, expected_row_indices.begin()); // Inclusive scan to compute the rest of the row indices - std::for_each( - thrust::counting_iterator(0), thrust::counting_iterator(num_row_groups), [&](auto i) { - auto start_row_index = row_group_span_offsets[i]; - auto end_row_index = row_group_span_offsets[i + 1]; - std::inclusive_scan(expected_row_indices.begin() + start_row_index, - expected_row_indices.begin() + end_row_index, - expected_row_indices.begin() + start_row_index); - }); + std::for_each(cuda::counting_iterator(0), cuda::counting_iterator(num_row_groups), [&](auto i) { + auto start_row_index = row_group_span_offsets[i]; + auto end_row_index = row_group_span_offsets[i + 1]; + std::inclusive_scan(expected_row_indices.begin() + start_row_index, + expected_row_indices.begin() + end_row_index, + expected_row_indices.begin() + start_row_index); + }); return expected_row_indices; } @@ -154,8 +154,8 @@ auto build_deletion_vector_and_expected_row_mask(cudf::size_type num_rows, auto roaring64_context = roaring::api::roaring64_bulk_context_t{.high_bytes = {0, 0, 0, 0, 0, 0}, .leaf = nullptr}; - std::for_each(thrust::counting_iterator(0), - thrust::counting_iterator(num_rows), + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_rows), [&](auto row_idx) { // Insert provided host row index if the row is deleted in the row mask if (not expected_row_mask[row_idx]) { @@ -201,8 +201,8 @@ std::unique_ptr build_expected_table( auto index_and_columns = std::vector{}; index_and_columns.reserve(input_table_view.num_columns() + 1); index_and_columns.push_back(expected_row_index_column); - std::transform(thrust::counting_iterator(0), - thrust::counting_iterator(input_table_view.num_columns()), + std::transform(cuda::counting_iterator(0), + cuda::counting_iterator(input_table_view.num_columns()), std::back_inserter(index_and_columns), [&](auto col_idx) { return input_table_view.column(col_idx); }); return cudf::apply_boolean_mask(cudf::table_view{index_and_columns}, row_mask_column, stream, mr); @@ -382,7 +382,7 @@ TYPED_TEST(RoaringBitmapBasicsTest, BitmapSerialization) roaring::api::roaring64_bulk_context_t{.high_bytes = {0, 0, 0, 0, 0, 0}, .leaf = nullptr}; std::for_each( - thrust::counting_iterator(0), thrust::counting_iterator(num_keys), [&](auto key) { + cuda::counting_iterator(0), cuda::counting_iterator(num_keys), [&](auto key) { if (is_even[key]) { roaring::api::roaring64_bitmap_add_bulk(roaring64_bitmap, &roaring64_context, key); } @@ -405,7 +405,7 @@ TYPED_TEST(RoaringBitmapBasicsTest, BitmapSerialization) auto roaring_context = roaring::api::roaring_bulk_context_t{}; std::for_each( - thrust::counting_iterator(0), thrust::counting_iterator(num_keys), [&](auto key) { + cuda::counting_iterator(0), cuda::counting_iterator(num_keys), [&](auto key) { if (is_even[key]) { roaring::api::roaring_bitmap_add_bulk(roaring_bitmap, &roaring_context, key); } @@ -435,16 +435,16 @@ TYPED_TEST(RoaringBitmapBasicsTest, BitmapSerialization) // Query the roaring bitmap auto contained = rmm::device_uvector(num_keys, stream, mr); - roaring_bitmap.contains_async(thrust::counting_iterator(0), - thrust::counting_iterator(num_keys), + roaring_bitmap.contains_async(cuda::counting_iterator(0), + cuda::counting_iterator(num_keys), contained.data(), stream); auto results = cudf::detail::make_host_vector_async(contained, stream); // Validate stream.synchronize(); - EXPECT_TRUE(std::all_of(thrust::counting_iterator(0), - thrust::counting_iterator(num_keys), + EXPECT_TRUE(std::all_of(cuda::counting_iterator(0), + cuda::counting_iterator(num_keys), [&](auto key) { return results[key] == is_even[key]; })); } @@ -545,8 +545,8 @@ TEST_F(ParquetDeletionVectorsTest, CustomRowIndexColumn) auto row_group_offsets = std::vector(num_row_groups); row_group_offsets[0] = static_cast(std::llround(1e9)); std::transform( - thrust::counting_iterator(1), - thrust::counting_iterator(num_row_groups), + cuda::counting_iterator(1), + cuda::counting_iterator(num_row_groups), row_group_offsets.begin() + 1, [&](auto i) { return static_cast(std::llround(row_group_offsets[i - 1] + 0.5e9)); }); @@ -691,8 +691,8 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsCustomRowIndex) auto row_group_offsets = std::vector(num_row_groups); row_group_offsets[0] = static_cast(std::llround(1e9)); std::transform( - thrust::counting_iterator(1), - thrust::counting_iterator(num_row_groups), + cuda::counting_iterator(1), + cuda::counting_iterator(num_row_groups), row_group_offsets.begin() + 1, [&](auto i) { return static_cast(std::llround(row_group_offsets[i - 1] + 0.5e9)); }); @@ -740,6 +740,12 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsCustomRowIndex) auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( deletion_vector_info, std::numeric_limits::max(), stream); EXPECT_EQ(result, static_cast(expected_deleted)); + + auto const result_chunked = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, + static_cast(std::llround(row_group_splits.front() * 1.2)), + stream); + EXPECT_EQ(result_chunked, static_cast(expected_deleted)); } TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsMultipleDeletionVectors) @@ -794,70 +800,12 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsMultipleDeletionVectors) .row_group_offsets = row_group_offsets, .row_group_num_rows = row_group_num_rows}; - auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( - deletion_vector_info, std::numeric_limits::max(), stream); - EXPECT_EQ(result, total_expected_deleted); -} - -TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsChunked) -{ - auto constexpr num_rows_per_dv = 10'000; - auto constexpr num_deletion_vectors = 5; - auto constexpr total_rows = num_rows_per_dv * num_deletion_vectors; - auto constexpr deletion_probability = 0.35; - // Force the chunking path by using a small chunk size - auto constexpr chunk_max_rows = size_t{15'000}; - - auto const stream = cudf::get_default_stream(); - auto const mr = cudf::get_current_device_resource_ref(); - - // No custom row indices — row_group_offsets is empty, row_group_num_rows provides the counts - auto row_group_num_rows = std::vector(num_deletion_vectors, num_rows_per_dv); - - // Build row indices as a simple 0..total_rows sequence (no custom offsets) - auto row_indices = thrust::host_vector(total_rows); - std::iota(row_indices.begin(), row_indices.end(), size_t{0}); - - size_t total_expected_deleted = 0; - auto serialized_bitmaps = std::vector>{}; - auto bitmap_spans = std::vector>{}; - auto dv_row_counts = std::vector{}; - - for (int i = 0; i < num_deletion_vectors; ++i) { - auto span_start = i * num_rows_per_dv; - auto local_indices = - cudf::host_span(row_indices.data() + span_start, num_rows_per_dv); - - auto [dv, mask_col] = build_deletion_vector_and_expected_row_mask( - num_rows_per_dv, deletion_probability, local_indices, stream, mr); - - auto const host_mask = cudf::detail::make_host_vector( - cudf::device_span(mask_col->view().data(), num_rows_per_dv), stream); - total_expected_deleted += std::count(host_mask.begin(), host_mask.end(), false); - - serialized_bitmaps.emplace_back(std::move(dv)); - dv_row_counts.push_back(num_rows_per_dv); - } - - bitmap_spans.reserve(num_deletion_vectors); - for (auto const& bm : serialized_bitmaps) { - bitmap_spans.emplace_back(bm); - } - - auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ - .serialized_roaring_bitmaps = bitmap_spans, - .deletion_vector_row_counts = dv_row_counts, - .row_group_offsets = {}, - .row_group_num_rows = row_group_num_rows}; - - // With default chunk_max_rows (small-path): verify baseline + auto chunk_max_rows = std::numeric_limits::max(); auto const result_default = cudf::io::parquet::experimental::compute_num_deleted_rows( - deletion_vector_info, std::numeric_limits::max(), stream); + deletion_vector_info, chunk_max_rows, stream); EXPECT_EQ(result_default, total_expected_deleted); - // With small chunk_max_rows: forces the chunking path and splits across deletion vector - // boundaries (chunk_max_rows=15000 with 5 DVs of 10000 rows each = 50000 total rows, - // so we get chunks of 15000, 15000, 15000, 5000 rows) + chunk_max_rows = static_cast(std::llround(num_rows_per_dv * 1.5)); auto const result_chunked = cudf::io::parquet::experimental::compute_num_deleted_rows( deletion_vector_info, chunk_max_rows, stream); EXPECT_EQ(result_chunked, total_expected_deleted); From 965efd62ea8c7b4fd205e96e2ef4f47863c38abb Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 02:25:47 +0000 Subject: [PATCH 03/14] More cleanup --- .../parquet/experimental/deletion_vectors.cu | 125 +++++++----------- 1 file changed, 49 insertions(+), 76 deletions(-) diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index f514bc7fb8d..e470ba50451 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -252,41 +252,36 @@ std::unique_ptr compute_partial_row_index_column( } /** - * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors + * @brief Queries deletion vectors against a row index column, writing results to the given output + * iterator * - * @note This function synchronizes the stream before returning the row mask column + * @note This function synchronizes the stream when multiple deletion vectors are present * + * @tparam OutputIterator Type of the output iterator for contains results * @param row_index_column View of the row index column * @param deletion_vector_refs Host span of cuco roaring bitmap references * @param rows_per_deletion_vector Host span of number of rows per deletion vector + * @param output Output iterator to write per-row boolean contains results * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row mask column - * - * @return Unique pointer to the row mask column */ -std::unique_ptr compute_row_mask_column( +template +void query_deletion_vectors( cudf::column_view const& row_index_column, cudf::host_span const> deletion_vector_refs, cudf::host_span rows_per_deletion_vector, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) + OutputIterator output, + rmm::cuda_stream_view stream) { auto const num_rows = row_index_column.size(); auto const num_deletion_vectors = static_cast(deletion_vector_refs.size()); - auto row_mask = rmm::device_buffer(num_rows * sizeof(bool), stream, mr); - - // Iterator to negate and store the output value from `contains_async` - auto row_mask_iter = thrust::make_transform_output_iterator( - static_cast(row_mask.data()), [] __device__(auto b) { return not b; }); if (num_deletion_vectors == 1) { + CUDF_EXPECTS(rows_per_deletion_vector.front() == num_rows, + "Encountered a mismatch in the number of rows in the row index column and the " + "number of rows in the deletion vector"); deletion_vector_refs.front().get().contains( - row_index_column.begin(), row_index_column.end(), row_mask_iter, stream); - return std::make_unique(cudf::data_type{cudf::type_id::BOOL8}, - num_rows, - std::move(row_mask), - rmm::device_buffer{0, stream, mr}, - 0); + row_index_column.begin(), row_index_column.end(), output, stream); + return; } auto deletion_vector_row_offsets = std::vector(deletion_vector_refs.size() + 1); @@ -299,7 +294,6 @@ std::unique_ptr compute_row_mask_column( "Encountered a mismatch in the number of rows in the row index column and the " "number of rows in the deletion vector(s)"); - // Fork the stream if the number of deletion vectors is greater than the threshold constexpr auto stream_fork_threshold = 8; if (num_deletion_vectors >= stream_fork_threshold) { auto streams = cudf::detail::fork_streams(stream, num_deletion_vectors); @@ -309,23 +303,50 @@ std::unique_ptr compute_row_mask_column( deletion_vector_refs[dv_idx].get().contains_async( row_index_column.begin() + deletion_vector_row_offsets[dv_idx], row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], - row_mask_iter + deletion_vector_row_offsets[dv_idx], + output + deletion_vector_row_offsets[dv_idx], streams[dv_idx]); }); cudf::detail::join_streams(streams, stream); } else { - // Otherwise, launch the queries on the same stream std::for_each(thrust::counting_iterator(0), thrust::counting_iterator(num_deletion_vectors), [&](auto const dv_idx) { deletion_vector_refs[dv_idx].get().contains_async( row_index_column.begin() + deletion_vector_row_offsets[dv_idx], row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], - row_mask_iter + deletion_vector_row_offsets[dv_idx], + output + deletion_vector_row_offsets[dv_idx], stream); }); stream.synchronize(); } +} + +/** + * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors + * + * @param row_index_column View of the row index column + * @param deletion_vector_refs Host span of cuco roaring bitmap references + * @param rows_per_deletion_vector Host span of number of rows per deletion vector + * @param stream CUDA stream for kernel launches and data transfers + * @param mr Device memory resource to allocate device memory for the row mask column + * + * @return Unique pointer to the row mask column + */ +std::unique_ptr compute_row_mask_column( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span rows_per_deletion_vector, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto const num_rows = row_index_column.size(); + auto row_mask = rmm::device_buffer(num_rows * sizeof(bool), stream, mr); + + auto row_mask_iter = thrust::make_transform_output_iterator( + static_cast(row_mask.data()), [] __device__(auto b) { return not b; }); + + query_deletion_vectors( + row_index_column, deletion_vector_refs, rows_per_deletion_vector, row_mask_iter, stream); return std::make_unique(cudf::data_type{cudf::type_id::BOOL8}, num_rows, @@ -335,17 +356,14 @@ std::unique_ptr compute_row_mask_column( } /** - * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors - * - * @note This function synchronizes the stream before returning the row mask column + * @brief Computes the number of deleted rows from the row index column and deletion vectors * * @param row_index_column View of the row index column * @param deletion_vector_refs Host span of cuco roaring bitmap references * @param rows_per_deletion_vector Host span of number of rows per deletion vector * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row mask column * - * @return Unique pointer to the row mask column + * @return Number of rows deleted by the specified deletion vectors */ size_t compute_deleted_num_rows( cudf::column_view const& row_index_column, @@ -353,59 +371,14 @@ size_t compute_deleted_num_rows( cudf::host_span rows_per_deletion_vector, rmm::cuda_stream_view stream) { - auto const num_rows = row_index_column.size(); - auto const num_deletion_vectors = static_cast(deletion_vector_refs.size()); + auto const num_rows = row_index_column.size(); auto row_mask = rmm::device_buffer(num_rows * sizeof(bool), stream, cudf::get_current_device_resource_ref()); auto row_mask_iter = static_cast(row_mask.data()); - if (num_deletion_vectors == 1) { - CUDF_EXPECTS(rows_per_deletion_vector.front() == num_rows, - "Encountered a mismatch in the number of rows in the row index column and the " - "number of rows in the deletion vector"); - - deletion_vector_refs.front().get().contains( - row_index_column.begin(), row_index_column.end(), row_mask_iter, stream); - } else { - auto deletion_vector_row_offsets = std::vector(deletion_vector_refs.size() + 1); - deletion_vector_row_offsets[0] = 0; - std::inclusive_scan(rows_per_deletion_vector.begin(), - rows_per_deletion_vector.end(), - deletion_vector_row_offsets.begin() + 1); - - CUDF_EXPECTS(deletion_vector_row_offsets.back() == num_rows, - "Encountered a mismatch in the number of rows in the row index column and the " - "number of rows in the deletion vector(s)"); - - // Fork the stream if the number of deletion vectors is greater than the threshold - constexpr auto stream_fork_threshold = 8; - if (num_deletion_vectors >= stream_fork_threshold) { - auto streams = cudf::detail::fork_streams(stream, num_deletion_vectors); - std::for_each(cuda::counting_iterator(0), - cuda::counting_iterator(num_deletion_vectors), - [&](auto const dv_idx) { - deletion_vector_refs[dv_idx].get().contains_async( - row_index_column.begin() + deletion_vector_row_offsets[dv_idx], - row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], - row_mask_iter + deletion_vector_row_offsets[dv_idx], - streams[dv_idx]); - }); - cudf::detail::join_streams(streams, stream); - } else { - // Otherwise, launch the queries on the same stream - std::for_each(cuda::counting_iterator(0), - cuda::counting_iterator(num_deletion_vectors), - [&](auto const dv_idx) { - deletion_vector_refs[dv_idx].get().contains_async( - row_index_column.begin() + deletion_vector_row_offsets[dv_idx], - row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], - row_mask_iter + deletion_vector_row_offsets[dv_idx], - stream); - }); - stream.synchronize(); - } - } + query_deletion_vectors( + row_index_column, deletion_vector_refs, rows_per_deletion_vector, row_mask_iter, stream); return static_cast( thrust::count(rmm::exec_policy_nosync(stream), row_mask_iter, row_mask_iter + num_rows, true)); From 4d628a5392ecafd439053d5eda6100fdf514981f Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 02:28:21 +0000 Subject: [PATCH 04/14] More clean up --- cpp/src/io/parquet/experimental/deletion_vectors.cu | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index e470ba50451..5f2d69a09cc 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -297,8 +296,8 @@ void query_deletion_vectors( constexpr auto stream_fork_threshold = 8; if (num_deletion_vectors >= stream_fork_threshold) { auto streams = cudf::detail::fork_streams(stream, num_deletion_vectors); - std::for_each(thrust::counting_iterator(0), - thrust::counting_iterator(num_deletion_vectors), + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), [&](auto const dv_idx) { deletion_vector_refs[dv_idx].get().contains_async( row_index_column.begin() + deletion_vector_row_offsets[dv_idx], @@ -308,8 +307,8 @@ void query_deletion_vectors( }); cudf::detail::join_streams(streams, stream); } else { - std::for_each(thrust::counting_iterator(0), - thrust::counting_iterator(num_deletion_vectors), + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), [&](auto const dv_idx) { deletion_vector_refs[dv_idx].get().contains_async( row_index_column.begin() + deletion_vector_row_offsets[dv_idx], @@ -342,7 +341,7 @@ std::unique_ptr compute_row_mask_column( auto const num_rows = row_index_column.size(); auto row_mask = rmm::device_buffer(num_rows * sizeof(bool), stream, mr); - auto row_mask_iter = thrust::make_transform_output_iterator( + auto row_mask_iter = cuda::make_transform_output_iterator( static_cast(row_mask.data()), [] __device__(auto b) { return not b; }); query_deletion_vectors( From b17a3d46388b238c27335a446c50aaa7124bcc16 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:47:39 +0000 Subject: [PATCH 05/14] Cleanup --- .../cudf/io/experimental/deletion_vectors.hpp | 7 +- .../parquet/experimental/deletion_vectors.cu | 82 ++++++++++++------- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/cpp/include/cudf/io/experimental/deletion_vectors.hpp b/cpp/include/cudf/io/experimental/deletion_vectors.hpp index da23f2412d8..f4b2d4d73f1 100644 --- a/cpp/include/cudf/io/experimental/deletion_vectors.hpp +++ b/cpp/include/cudf/io/experimental/deletion_vectors.hpp @@ -182,17 +182,14 @@ table_with_metadata read_parquet( * @brief Computes the number of rows deleted by the serialized 64-bit roaring bitmap deletion * vectors * - * When the total number of rows exceeds @p chunk_max_rows, the computation is split into - * chunks of at most @p chunk_max_rows rows each, allowing datasets larger than 2 billion rows. - * * @param deletion_vector_info Information about the deletion vectors and the index column - * @param chunk_max_rows Maximum number of rows per chunk. Defaults to `size_type` max + * @param max_chunk_rows Maximum number of rows to process at a time * @param stream CUDA stream used for device memory operations and kernel launches * @return Number of rows deleted by the specified 64-bit roaring bitmap deletion vectors */ size_t compute_num_deleted_rows( deletion_vector_info const& deletion_vector_info, - cudf::size_type chunk_max_rows = std::numeric_limits::max(), + cudf::size_type max_chunk_rows = std::numeric_limits::max(), rmm::cuda_stream_view stream = cudf::get_default_stream()); /** @} */ // end of group diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index 5f2d69a09cc..c814ddc060b 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -214,7 +214,7 @@ std::unique_ptr compute_partial_row_index_column( rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - // Build a simple row index column if the row group offsets and counts are unspecified + // Build a simple row index column if the row group data are unspecified if (is_unspecified_row_group_data) { return compute_row_index_column({}, {}, start_row, num_rows, stream, mr); } @@ -355,19 +355,19 @@ std::unique_ptr compute_row_mask_column( } /** - * @brief Computes the number of deleted rows from the row index column and deletion vectors + * @brief Computes the number of rows deleted by the deletion vectors * * @param row_index_column View of the row index column * @param deletion_vector_refs Host span of cuco roaring bitmap references - * @param rows_per_deletion_vector Host span of number of rows per deletion vector + * @param rows_per_dv Host span of number of rows per deletion vector * @param stream CUDA stream for kernel launches and data transfers * * @return Number of rows deleted by the specified deletion vectors */ -size_t compute_deleted_num_rows( +size_t compute_deleted_row_count( cudf::column_view const& row_index_column, cudf::host_span const> deletion_vector_refs, - cudf::host_span rows_per_deletion_vector, + cudf::host_span deletion_vector_row_counts, rmm::cuda_stream_view stream) { auto const num_rows = row_index_column.size(); @@ -377,7 +377,7 @@ size_t compute_deleted_num_rows( auto row_mask_iter = static_cast(row_mask.data()); query_deletion_vectors( - row_index_column, deletion_vector_refs, rows_per_deletion_vector, row_mask_iter, stream); + row_index_column, deletion_vector_refs, deletion_vector_row_counts, row_mask_iter, stream); return static_cast( thrust::count(rmm::exec_policy_nosync(stream), row_mask_iter, row_mask_iter + num_rows, true)); @@ -473,7 +473,7 @@ std::unique_ptr compute_partial_row_mask_column( * * @return Number of rows deleted by the applicable deletion vectors in this chunk */ -size_t compute_partial_deleted_num_rows( +size_t compute_partial_deleted_row_count( cudf::column_view const& row_index_column, std::queue& deletion_vectors, std::queue& deletion_vector_row_counts, @@ -481,7 +481,7 @@ size_t compute_partial_deleted_num_rows( { auto [impls, refs, row_counts] = consume_deletion_vectors( row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); - return compute_deleted_num_rows(row_index_column, refs, row_counts, stream); + return compute_deleted_row_count(row_index_column, refs, row_counts, stream); } } // namespace @@ -702,7 +702,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options, * @copydoc cudf::io::parquet::experimental::compute_num_deleted_rows */ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, - cudf::size_type chunk_max_rows, + cudf::size_type max_chunk_rows, rmm::cuda_stream_view stream) { auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; @@ -710,17 +710,36 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info auto const& row_group_offsets = deletion_vector_info.row_group_offsets; auto const& row_group_num_rows = deletion_vector_info.row_group_num_rows; + // Return early if no deletion vectors are present if (serialized_roaring_bitmaps.empty()) { return 0; } - auto const num_rows = - std::accumulate(row_group_num_rows.begin(), row_group_num_rows.end(), size_t{0}); - - CUDF_EXPECTS(chunk_max_rows > 0 and - std::cmp_less_equal(chunk_max_rows, std::numeric_limits::max()), - "chunk_max_rows must be in range (0, size_type max]"); + CUDF_EXPECTS( + row_group_offsets.size() == row_group_num_rows.size(), + "Encountered a mismatch in the number of row group offsets and row group row counts"); - auto const temp_mr = rmm::mr::get_current_device_resource_ref(); - auto const is_unspecified_rg_data = row_group_offsets.empty(); + // Validate max_chunk_rows + CUDF_EXPECTS(max_chunk_rows > 0 and + std::cmp_less_equal(max_chunk_rows, std::numeric_limits::max()), + "max_chunk_rows must be in range (0, size_type max]"); + + // Check if row group data are unspecified + auto const is_row_group_data_unspecified = row_group_offsets.empty(); + + // Total number of rows + auto const num_rows = [&]() { + auto const rows_in_dvs = std::accumulate( + deletion_vector_row_counts.begin(), deletion_vector_row_counts.end(), size_t{0}); + + // Validate number of rows across deletion vectors and row groups + if (not is_row_group_data_unspecified) { + auto const rows_in_rgs = + std::accumulate(row_group_num_rows.begin(), row_group_num_rows.end(), size_t{0}); + CUDF_EXPECTS(std::cmp_equal(rows_in_dvs, rows_in_rgs), + "Encountered a mismatch in the number of rows across deletion vectors and the " + "number of rows across row groups"); + } + return rows_in_dvs; + }(); // Build queues of row group offsets and counts std::queue rg_offsets_queue; @@ -732,35 +751,36 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info // Build queue of roaring bitmap impls and their row counts std::queue dv_queue; - std::queue dv_row_counts_queue; + std::queue dv_row_counts_queue; for (size_t i = 0; i < serialized_roaring_bitmaps.size(); ++i) { dv_queue.emplace(serialized_roaring_bitmaps[i]); dv_row_counts_queue.push(deletion_vector_row_counts[i]); } - size_t total_deleted = 0; + size_t rows_deleted = 0; size_t rows_remaining = num_rows; size_t start_row = 0; while (rows_remaining > 0) { - auto const chunk_rows = std::min(rows_remaining, chunk_max_rows); - - auto row_index_column = compute_partial_row_index_column(rg_offsets_queue, - rg_counts_queue, - start_row, - chunk_rows, - is_unspecified_rg_data, - stream, - temp_mr); - - total_deleted += compute_partial_deleted_num_rows( + auto const chunk_rows = std::min(rows_remaining, max_chunk_rows); + + auto row_index_column = + compute_partial_row_index_column(rg_offsets_queue, + rg_counts_queue, + start_row, + chunk_rows, + is_row_group_data_unspecified, + stream, + rmm::mr::get_current_device_resource_ref()); + + rows_deleted += compute_partial_deleted_row_count( row_index_column->view(), dv_queue, dv_row_counts_queue, stream); start_row += chunk_rows; rows_remaining -= chunk_rows; } - return total_deleted; + return rows_deleted; } } // namespace cudf::io::parquet::experimental From 9d2671d7427e7badde6bb0a6acfbabad858a84d3 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 20:07:27 +0000 Subject: [PATCH 06/14] Cleanup --- cpp/CMakeLists.txt | 1 + .../parquet/experimental/deletion_vectors.cu | 496 +----------------- .../experimental/deletion_vectors_helpers.cu | 354 +++++++++++++ .../experimental/deletion_vectors_helpers.hpp | 182 +++++++ cpp/tests/io/parquet_deletion_vectors_test.cu | 69 ++- 5 files changed, 581 insertions(+), 521 deletions(-) create mode 100644 cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu create mode 100644 cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 428878e9456..1c7d4b01fc2 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -530,6 +530,7 @@ add_library( src/io/parquet/decode_preprocess.cu src/io/parquet/experimental/dictionary_page_filter.cu src/io/parquet/experimental/deletion_vectors.cu + src/io/parquet/experimental/deletion_vectors_helpers.cu src/io/parquet/experimental/hybrid_scan.cpp src/io/parquet/experimental/hybrid_scan_chunking.cu src/io/parquet/experimental/hybrid_scan_helpers.cpp diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index c814ddc060b..aa1bd126cf1 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -3,489 +3,22 @@ * SPDX-License-Identifier: Apache-2.0 */ -#include -#include +#include "deletion_vectors_helpers.hpp" + #include #include -#include #include #include -#include -#include #include -#include -#include -#include #include #include -#include -#include -#include -#include #include -#include namespace cudf::io::parquet::experimental { -// Type alias for the cuco 64-bit roaring bitmap -using roaring_bitmap_type = - cuco::experimental::roaring_bitmap>; - -/** - * @brief Opaque wrapper class for cuco's 64-bit roaring bitmap - */ -struct chunked_parquet_reader::roaring_bitmap_impl { - std::unique_ptr roaring_bitmap; - cudf::host_span const roaring_bitmap_data; - - explicit roaring_bitmap_impl( - cudf::host_span const& serialized_roaring_bitmap) - : roaring_bitmap_data{serialized_roaring_bitmap} - { - } - - roaring_bitmap_impl(roaring_bitmap_impl&&) = default; - roaring_bitmap_impl(roaring_bitmap_impl const&) = delete; - - void construct_roaring_bitmap(rmm::mr::polymorphic_allocator const& allocator, - rmm::cuda_stream_view stream) - { - if (roaring_bitmap == nullptr) { - CUDF_EXPECTS(not roaring_bitmap_data.empty(), - "Encountered empty data while constructing roaring bitmap"); - roaring_bitmap = std::make_unique( - static_cast(roaring_bitmap_data.data()), allocator, stream); - } - } -}; - -namespace { - -/** - * @brief Prepends the index column information to the table metadata - * - * @param[in,out] metadata Table metadata - */ -void prepend_index_column_to_table_metadata(table_metadata& metadata) -{ - auto updated_schema_info = std::vector{}; - updated_schema_info.reserve(metadata.schema_info.size() + 1); - updated_schema_info.emplace_back("index"); - updated_schema_info.insert(updated_schema_info.end(), - std::make_move_iterator(metadata.schema_info.begin()), - std::make_move_iterator(metadata.schema_info.end())); - metadata.schema_info = std::move(updated_schema_info); -} - -/** - * @brief Prepends the index column to the table columns - * - * @param table Input table - * @param row_index_column row index column to prepend - * - * @return A table with the index column prepended to the table columns - */ -std::unique_ptr prepend_index_column_to_table( - std::unique_ptr&& table, std::unique_ptr&& row_index_column) -{ - auto index_and_table_columns = std::vector>{}; - index_and_table_columns.reserve(table->num_columns() + 1); - index_and_table_columns.push_back(std::move(row_index_column)); - auto table_columns = table->release(); - index_and_table_columns.insert(index_and_table_columns.end(), - std::make_move_iterator(table_columns.begin()), - std::make_move_iterator(table_columns.end())); - return std::make_unique(std::move(index_and_table_columns)); -} - -/** - * @brief Computes a row index column from the specified row group row offsets and counts - * - * @param row_group_offsets Host span of row offsets of each row group - * @param row_group_num_rows Host span of number of rows in each row group - * @param start_row Starting row index if the row group offsets and counts are empty - * @param num_rows Number of rows in the table - * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row index column - * - * @return UINT64 column containing row indices - */ -std::unique_ptr compute_row_index_column( - cudf::host_span row_group_offsets, - cudf::host_span row_group_num_rows, - std::optional start_row, - size_type num_rows, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto const num_row_groups = static_cast(row_group_num_rows.size()); - - if (row_group_offsets.empty()) { - auto row_indices = rmm::device_buffer(num_rows * sizeof(size_t), stream, mr); - auto row_indices_iter = static_cast(row_indices.data()); - thrust::sequence(rmm::exec_policy_nosync(stream), - row_indices_iter, - row_indices_iter + num_rows, - start_row.value_or(size_t{0})); - return std::make_unique(cudf::data_type{cudf::type_id::UINT64}, - num_rows, - std::move(row_indices), - rmm::device_buffer{0, stream, mr}, - 0); - } - - // Convert number of rows in each row group to row group span offsets. Here, a span is the range - // of (table) rows that belong to a row group - auto row_group_span_offsets = - cudf::detail::make_host_vector(num_row_groups + 1, stream); - row_group_span_offsets[0] = 0; - std::inclusive_scan( - row_group_num_rows.begin(), row_group_num_rows.end(), row_group_span_offsets.begin() + 1); - - CUDF_EXPECTS(row_group_span_offsets.back() == num_rows, - "Encountered a mismatch in the number of rows in the row index column and the " - "number of rows in the row group(s)"); - - auto row_indices = rmm::device_buffer(num_rows * sizeof(size_t), stream, mr); - auto row_indices_iter = static_cast(row_indices.data()); - thrust::fill(rmm::exec_policy_nosync(stream), row_indices_iter, row_indices_iter + num_rows, 1); - - auto row_group_keys = rmm::device_uvector(num_rows, stream); - thrust::fill(rmm::exec_policy_nosync(stream), row_group_keys.begin(), row_group_keys.end(), 0); - - // Scatter row group offsets and row group indices (or span indices) to their corresponding - // row group span offsets - auto d_row_group_offsets = cudf::detail::make_device_uvector_async( - row_group_offsets, stream, cudf::get_current_device_resource_ref()); - auto d_row_group_span_offsets = cudf::detail::make_device_uvector_async( - row_group_span_offsets, stream, cudf::get_current_device_resource_ref()); - auto in_iter = - cuda::make_zip_iterator(d_row_group_offsets.begin(), cuda::counting_iterator(0)); - auto out_iter = cuda::make_zip_iterator(row_indices_iter, row_group_keys.begin()); - thrust::scatter(rmm::exec_policy_nosync(stream), - in_iter, - in_iter + num_row_groups, - d_row_group_span_offsets.begin(), - out_iter); - - // Fill in the the rest of the row group span indices - thrust::inclusive_scan(rmm::exec_policy_nosync(stream), - row_group_keys.begin(), - row_group_keys.end(), - row_group_keys.begin(), - cuda::maximum()); - - // Segmented inclusive scan to compute the rest of the row indices - thrust::inclusive_scan_by_key(rmm::exec_policy_nosync(stream), - row_group_keys.begin(), - row_group_keys.end(), - row_indices_iter, - row_indices_iter); - - return std::make_unique(cudf::data_type{cudf::type_id::UINT64}, - num_rows, - std::move(row_indices), - rmm::device_buffer{0, stream, mr}, - 0); -} - -/** - * @brief Computes a chunk of the row index column from the specified row group offsets and counts - * - * @param row_group_offsets Queue of row offsets of eachrow group - * @param row_group_num_rows Queue of number of rows in each row group - * @param start_row Starting row index of the current table chunk - * @param num_rows Total number of rows in the current table chunk - * @param is_unspecified_row_group_data Whether the row group offsets and counts are unspecified - * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row index column - * - * @return UINT64 column containing row indices - */ -std::unique_ptr compute_partial_row_index_column( - std::queue& row_group_offsets, - std::queue& row_group_num_rows, - size_t start_row, - size_type num_rows, - bool is_unspecified_row_group_data, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - // Build a simple row index column if the row group data are unspecified - if (is_unspecified_row_group_data) { - return compute_row_index_column({}, {}, start_row, num_rows, stream, mr); - } - - // Compute current table chunk's vectors of row group row offsets and counts from the input queues - std::vector row_offsets; - std::vector row_counts; - size_type rows_filled = 0; - while (std::cmp_less(rows_filled, num_rows)) { - CUDF_EXPECTS( - not(row_group_offsets.empty() or row_group_num_rows.empty()), - "Unable to compute the row index column from the specified row group offsets and counts"); - - // Compute how many rows can be extracted from the current row group - auto const row_count = std::min(num_rows - rows_filled, row_group_num_rows.front()); - row_counts.emplace_back(row_count); - row_offsets.emplace_back(row_group_offsets.front()); - - // If we still have remaining rows in the current row group, update its offset and row count - if (std::cmp_less(row_count, row_group_num_rows.front())) { - row_group_offsets.front() = row_group_offsets.front() + row_count; - row_group_num_rows.front() = row_group_num_rows.front() - row_count; - } else { - // Else if the row group is fully consumed, pop it from the queues - row_group_offsets.pop(); - row_group_num_rows.pop(); - } - - rows_filled += row_count; - } - - // Compute the row index column with the computed row group row offsets and counts - return compute_row_index_column(row_offsets, row_counts, std::nullopt, num_rows, stream, mr); -} - -/** - * @brief Queries deletion vectors against a row index column, writing results to the given output - * iterator - * - * @note This function synchronizes the stream when multiple deletion vectors are present - * - * @tparam OutputIterator Type of the output iterator for contains results - * @param row_index_column View of the row index column - * @param deletion_vector_refs Host span of cuco roaring bitmap references - * @param rows_per_deletion_vector Host span of number of rows per deletion vector - * @param output Output iterator to write per-row boolean contains results - * @param stream CUDA stream for kernel launches and data transfers - */ -template -void query_deletion_vectors( - cudf::column_view const& row_index_column, - cudf::host_span const> deletion_vector_refs, - cudf::host_span rows_per_deletion_vector, - OutputIterator output, - rmm::cuda_stream_view stream) -{ - auto const num_rows = row_index_column.size(); - auto const num_deletion_vectors = static_cast(deletion_vector_refs.size()); - - if (num_deletion_vectors == 1) { - CUDF_EXPECTS(rows_per_deletion_vector.front() == num_rows, - "Encountered a mismatch in the number of rows in the row index column and the " - "number of rows in the deletion vector"); - deletion_vector_refs.front().get().contains( - row_index_column.begin(), row_index_column.end(), output, stream); - return; - } - - auto deletion_vector_row_offsets = std::vector(deletion_vector_refs.size() + 1); - deletion_vector_row_offsets[0] = 0; - std::inclusive_scan(rows_per_deletion_vector.begin(), - rows_per_deletion_vector.end(), - deletion_vector_row_offsets.begin() + 1); - - CUDF_EXPECTS(deletion_vector_row_offsets.back() == num_rows, - "Encountered a mismatch in the number of rows in the row index column and the " - "number of rows in the deletion vector(s)"); - - constexpr auto stream_fork_threshold = 8; - if (num_deletion_vectors >= stream_fork_threshold) { - auto streams = cudf::detail::fork_streams(stream, num_deletion_vectors); - std::for_each(cuda::counting_iterator(0), - cuda::counting_iterator(num_deletion_vectors), - [&](auto const dv_idx) { - deletion_vector_refs[dv_idx].get().contains_async( - row_index_column.begin() + deletion_vector_row_offsets[dv_idx], - row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], - output + deletion_vector_row_offsets[dv_idx], - streams[dv_idx]); - }); - cudf::detail::join_streams(streams, stream); - } else { - std::for_each(cuda::counting_iterator(0), - cuda::counting_iterator(num_deletion_vectors), - [&](auto const dv_idx) { - deletion_vector_refs[dv_idx].get().contains_async( - row_index_column.begin() + deletion_vector_row_offsets[dv_idx], - row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], - output + deletion_vector_row_offsets[dv_idx], - stream); - }); - stream.synchronize(); - } -} - -/** - * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors - * - * @param row_index_column View of the row index column - * @param deletion_vector_refs Host span of cuco roaring bitmap references - * @param rows_per_deletion_vector Host span of number of rows per deletion vector - * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row mask column - * - * @return Unique pointer to the row mask column - */ -std::unique_ptr compute_row_mask_column( - cudf::column_view const& row_index_column, - cudf::host_span const> deletion_vector_refs, - cudf::host_span rows_per_deletion_vector, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto const num_rows = row_index_column.size(); - auto row_mask = rmm::device_buffer(num_rows * sizeof(bool), stream, mr); - - auto row_mask_iter = cuda::make_transform_output_iterator( - static_cast(row_mask.data()), [] __device__(auto b) { return not b; }); - - query_deletion_vectors( - row_index_column, deletion_vector_refs, rows_per_deletion_vector, row_mask_iter, stream); - - return std::make_unique(cudf::data_type{cudf::type_id::BOOL8}, - num_rows, - std::move(row_mask), - rmm::device_buffer{0, stream, mr}, - 0); -} - -/** - * @brief Computes the number of rows deleted by the deletion vectors - * - * @param row_index_column View of the row index column - * @param deletion_vector_refs Host span of cuco roaring bitmap references - * @param rows_per_dv Host span of number of rows per deletion vector - * @param stream CUDA stream for kernel launches and data transfers - * - * @return Number of rows deleted by the specified deletion vectors - */ -size_t compute_deleted_row_count( - cudf::column_view const& row_index_column, - cudf::host_span const> deletion_vector_refs, - cudf::host_span deletion_vector_row_counts, - rmm::cuda_stream_view stream) -{ - auto const num_rows = row_index_column.size(); - auto row_mask = - rmm::device_buffer(num_rows * sizeof(bool), stream, cudf::get_current_device_resource_ref()); - - auto row_mask_iter = static_cast(row_mask.data()); - - query_deletion_vectors( - row_index_column, deletion_vector_refs, deletion_vector_row_counts, row_mask_iter, stream); - - return static_cast( - thrust::count(rmm::exec_policy_nosync(stream), row_mask_iter, row_mask_iter + num_rows, true)); -} - -/** - * @brief Consumes deletion vectors and their row counts from the provided queues for a given - * number of rows - * - * Constructs roaring bitmaps from the queued deletion vector data and returns the constructed - * bitmaps (for lifetime management), their references, and per-deletion-vector row counts. - * - * @param num_rows Number of rows to consume from the queues - * @param deletion_vectors Queue of roaring bitmap wrappers - * @param deletion_vector_row_counts Queue of number of rows in each deletion vector - * @param stream CUDA stream for kernel launches and data transfers - * - * @return Tuple of (consumed roaring bitmap impls, deletion vector refs, row counts) - */ -auto consume_deletion_vectors( - size_type num_rows, - std::queue& deletion_vectors, - std::queue& deletion_vector_row_counts, - rmm::cuda_stream_view stream) -{ - std::vector row_counts; - std::vector deletion_vectors_impls; - std::vector> deletion_vector_refs; - size_type rows_filled = 0; - - while (std::cmp_less(rows_filled, num_rows)) { - CUDF_EXPECTS( - not(deletion_vector_row_counts.empty() or deletion_vectors.empty()), - "Encountered insufficient number of deletion vector row counts or deletion vectors: " + - std::to_string(deletion_vector_row_counts.size()) + " " + - std::to_string(deletion_vectors.size())); - - auto const row_count = - std::min(num_rows - rows_filled, deletion_vector_row_counts.front()); - row_counts.emplace_back(row_count); - - auto& deletion_vector = deletion_vectors.front(); - deletion_vector.construct_roaring_bitmap(rmm::mr::polymorphic_allocator{}, stream); - CUDF_EXPECTS(deletion_vector.roaring_bitmap != nullptr, "Failed to construct roaring_bitmap"); - deletion_vector_refs.emplace_back(std::ref(*(deletion_vector.roaring_bitmap))); - - if (std::cmp_less(row_count, deletion_vector_row_counts.front())) { - deletion_vector_row_counts.front() = deletion_vector_row_counts.front() - row_count; - } else { - deletion_vectors_impls.emplace_back(std::move(deletion_vectors.front())); - deletion_vectors.pop(); - deletion_vector_row_counts.pop(); - } - - rows_filled += row_count; - } - - return std::make_tuple( - std::move(deletion_vectors_impls), std::move(deletion_vector_refs), std::move(row_counts)); -} - -/** - * @brief Computes a chunk of the BOOL8 row mask column from the row index column and the deletion - * vectors - * - * @param row_index_column View of the row index column - * @param deletion_vectors Queue of roaring bitmap wrappers - * @param deletion_vector_row_counts Queue of number of rows in each deletion vector - * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row mask column - * - * @return Unique pointer to the row mask column - */ -std::unique_ptr compute_partial_row_mask_column( - cudf::column_view const& row_index_column, - std::queue& deletion_vectors, - std::queue& deletion_vector_row_counts, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto [impls, refs, row_counts] = consume_deletion_vectors( - row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); - return compute_row_mask_column(row_index_column, refs, row_counts, stream, mr); -} - -/** - * @brief Computes the number of deleted rows for a chunk of the row index column - * - * @param row_index_column View of the row index column for the current chunk - * @param deletion_vectors Queue of roaring bitmap wrappers - * @param deletion_vector_row_counts Queue of number of rows in each deletion vector - * @param stream CUDA stream for kernel launches and data transfers - * - * @return Number of rows deleted by the applicable deletion vectors in this chunk - */ -size_t compute_partial_deleted_row_count( - cudf::column_view const& row_index_column, - std::queue& deletion_vectors, - std::queue& deletion_vector_row_counts, - rmm::cuda_stream_view stream) -{ - auto [impls, refs, row_counts] = consume_deletion_vectors( - row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); - return compute_deleted_row_count(row_index_column, refs, row_counts, stream); -} - -} // namespace - /** * @copydoc * cudf::io::parquet::experimental::chunked_parquet_reader::chunked_parquet_reader @@ -504,7 +37,7 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, // be applying the deletion vector to produce the output table _table_mr{deletion_vector_info.serialized_roaring_bitmaps.empty() ? mr - : rmm::mr::get_current_device_resource_ref()} + : cudf::get_current_device_resource_ref()} { auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; @@ -643,9 +176,8 @@ table_with_metadata read_parquet(parquet_reader_options const& options, // Use default mr to read parquet table and build row index column if we will be applying the // deletion vector to produce a new table later auto const table_mr = - serialized_roaring_bitmaps.empty() ? mr : rmm::mr::get_current_device_resource_ref(); + serialized_roaring_bitmaps.empty() ? mr : cudf::get_current_device_resource_ref(); - // Read the parquet table auto [table, metadata] = cudf::io::read_parquet(options, stream, table_mr); auto const num_rows = table->num_rows(); @@ -663,10 +195,8 @@ table_with_metadata read_parquet(parquet_reader_options const& options, auto table_with_index = prepend_index_column_to_table(std::move(table), std::move(row_index_column)); - // Also prepend the row index column's metadata to the table schema prepend_index_column_to_table_metadata(metadata); - // Return early if roaring64 data is empty if (serialized_roaring_bitmaps.empty()) { return table_with_metadata{std::move(table_with_index), std::move(metadata)}; } @@ -685,7 +215,6 @@ table_with_metadata read_parquet(parquet_reader_options const& options, stream); return std::ref(deletion_vectors.back()); }); - // Compute the row mask column from the deletion vectors auto row_mask = compute_row_mask_column(table_with_index->get_column(0).view(), deletion_vectors_refs, deletion_vector_row_counts, @@ -710,27 +239,24 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info auto const& row_group_offsets = deletion_vector_info.row_group_offsets; auto const& row_group_num_rows = deletion_vector_info.row_group_num_rows; - // Return early if no deletion vectors are present if (serialized_roaring_bitmaps.empty()) { return 0; } + // Validate input + CUDF_EXPECTS(std::cmp_equal(serialized_roaring_bitmaps.size(), deletion_vector_row_counts.size()), + "Encountered a mismatch in the number of deletion vector data spans and the number " + "of rows per deletion vector"); CUDF_EXPECTS( row_group_offsets.size() == row_group_num_rows.size(), "Encountered a mismatch in the number of row group offsets and row group row counts"); - - // Validate max_chunk_rows CUDF_EXPECTS(max_chunk_rows > 0 and std::cmp_less_equal(max_chunk_rows, std::numeric_limits::max()), "max_chunk_rows must be in range (0, size_type max]"); - // Check if row group data are unspecified auto const is_row_group_data_unspecified = row_group_offsets.empty(); - // Total number of rows auto const num_rows = [&]() { auto const rows_in_dvs = std::accumulate( deletion_vector_row_counts.begin(), deletion_vector_row_counts.end(), size_t{0}); - - // Validate number of rows across deletion vectors and row groups if (not is_row_group_data_unspecified) { auto const rows_in_rgs = std::accumulate(row_group_num_rows.begin(), row_group_num_rows.end(), size_t{0}); @@ -741,7 +267,6 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info return rows_in_dvs; }(); - // Build queues of row group offsets and counts std::queue rg_offsets_queue; std::queue rg_counts_queue; for (size_t i = 0; i < row_group_offsets.size(); ++i) { @@ -749,7 +274,6 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info rg_counts_queue.push(row_group_num_rows[i]); } - // Build queue of roaring bitmap impls and their row counts std::queue dv_queue; std::queue dv_row_counts_queue; for (size_t i = 0; i < serialized_roaring_bitmaps.size(); ++i) { @@ -762,7 +286,7 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info size_t start_row = 0; while (rows_remaining > 0) { - auto const chunk_rows = std::min(rows_remaining, max_chunk_rows); + auto const chunk_rows = std::min(rows_remaining, max_chunk_rows); auto row_index_column = compute_partial_row_index_column(rg_offsets_queue, @@ -771,7 +295,7 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info chunk_rows, is_row_group_data_unspecified, stream, - rmm::mr::get_current_device_resource_ref()); + cudf::get_current_device_resource_ref()); rows_deleted += compute_partial_deleted_row_count( row_index_column->view(), dv_queue, dv_row_counts_queue, stream); diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu new file mode 100644 index 00000000000..473f70cc7a4 --- /dev/null +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu @@ -0,0 +1,354 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "deletion_vectors_helpers.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace cudf::io::parquet::experimental { + +void prepend_index_column_to_table_metadata(table_metadata& metadata) +{ + auto updated_schema_info = std::vector{}; + updated_schema_info.reserve(metadata.schema_info.size() + 1); + updated_schema_info.emplace_back("index"); + updated_schema_info.insert(updated_schema_info.end(), + std::make_move_iterator(metadata.schema_info.begin()), + std::make_move_iterator(metadata.schema_info.end())); + metadata.schema_info = std::move(updated_schema_info); +} + +std::unique_ptr prepend_index_column_to_table( + std::unique_ptr&& table, std::unique_ptr&& row_index_column) +{ + auto index_and_table_columns = std::vector>{}; + index_and_table_columns.reserve(table->num_columns() + 1); + index_and_table_columns.push_back(std::move(row_index_column)); + auto table_columns = table->release(); + index_and_table_columns.insert(index_and_table_columns.end(), + std::make_move_iterator(table_columns.begin()), + std::make_move_iterator(table_columns.end())); + return std::make_unique(std::move(index_and_table_columns)); +} + +std::unique_ptr compute_row_index_column( + cudf::host_span row_group_offsets, + cudf::host_span row_group_num_rows, + std::optional start_row, + size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto const num_row_groups = static_cast(row_group_num_rows.size()); + + if (row_group_offsets.empty()) { + auto row_indices = rmm::device_buffer(num_rows * sizeof(size_t), stream, mr); + auto row_indices_iter = static_cast(row_indices.data()); + thrust::sequence(rmm::exec_policy_nosync(stream), + row_indices_iter, + row_indices_iter + num_rows, + start_row.value_or(size_t{0})); + return std::make_unique(cudf::data_type{cudf::type_id::UINT64}, + num_rows, + std::move(row_indices), + rmm::device_buffer{0, stream, mr}, + 0); + } + + auto row_group_span_offsets = + cudf::detail::make_host_vector(num_row_groups + 1, stream); + row_group_span_offsets[0] = 0; + std::inclusive_scan( + row_group_num_rows.begin(), row_group_num_rows.end(), row_group_span_offsets.begin() + 1); + + CUDF_EXPECTS(row_group_span_offsets.back() == num_rows, + "Encountered a mismatch in the number of rows in the row index column and the " + "number of rows in the row group(s)"); + + auto row_indices = rmm::device_buffer(num_rows * sizeof(size_t), stream, mr); + auto row_indices_iter = static_cast(row_indices.data()); + thrust::fill(rmm::exec_policy_nosync(stream), row_indices_iter, row_indices_iter + num_rows, 1); + + auto row_group_keys = rmm::device_uvector(num_rows, stream); + thrust::fill(rmm::exec_policy_nosync(stream), row_group_keys.begin(), row_group_keys.end(), 0); + + auto d_row_group_offsets = cudf::detail::make_device_uvector_async( + row_group_offsets, stream, cudf::get_current_device_resource_ref()); + auto d_row_group_span_offsets = cudf::detail::make_device_uvector_async( + row_group_span_offsets, stream, cudf::get_current_device_resource_ref()); + auto in_iter = + cuda::make_zip_iterator(d_row_group_offsets.begin(), cuda::counting_iterator(0)); + auto out_iter = cuda::make_zip_iterator(row_indices_iter, row_group_keys.begin()); + thrust::scatter(rmm::exec_policy_nosync(stream), + in_iter, + in_iter + num_row_groups, + d_row_group_span_offsets.begin(), + out_iter); + + thrust::inclusive_scan(rmm::exec_policy_nosync(stream), + row_group_keys.begin(), + row_group_keys.end(), + row_group_keys.begin(), + cuda::maximum()); + + thrust::inclusive_scan_by_key(rmm::exec_policy_nosync(stream), + row_group_keys.begin(), + row_group_keys.end(), + row_indices_iter, + row_indices_iter); + + return std::make_unique(cudf::data_type{cudf::type_id::UINT64}, + num_rows, + std::move(row_indices), + rmm::device_buffer{0, stream, mr}, + 0); +} + +std::unique_ptr compute_partial_row_index_column( + std::queue& row_group_offsets, + std::queue& row_group_num_rows, + size_t start_row, + size_type num_rows, + bool is_unspecified_row_group_data, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + if (is_unspecified_row_group_data) { + return compute_row_index_column({}, {}, start_row, num_rows, stream, mr); + } + + std::vector row_offsets; + std::vector row_counts; + size_type rows_filled = 0; + while (std::cmp_less(rows_filled, num_rows)) { + CUDF_EXPECTS( + not(row_group_offsets.empty() or row_group_num_rows.empty()), + "Unable to compute the row index column from the specified row group offsets and counts"); + + auto const row_count = std::min(num_rows - rows_filled, row_group_num_rows.front()); + row_counts.emplace_back(row_count); + row_offsets.emplace_back(row_group_offsets.front()); + + if (std::cmp_less(row_count, row_group_num_rows.front())) { + row_group_offsets.front() = row_group_offsets.front() + row_count; + row_group_num_rows.front() = row_group_num_rows.front() - row_count; + } else { + row_group_offsets.pop(); + row_group_num_rows.pop(); + } + + rows_filled += row_count; + } + + return compute_row_index_column(row_offsets, row_counts, std::nullopt, num_rows, stream, mr); +} + +namespace { + +/** + * @brief Queries the deletion vectors and writes the result to the output iterator + * + * @tparam OutputIterator The output iterator type + * + * @param row_index_column Row index column + * @param deletion_vector_refs Deletion vector refs + * @param rows_per_deletion_vector Rows per deletion vector + * @param output Output iterator + * @param stream CUDA stream to launch the query kernel + */ +template +void query_deletion_vectors( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span rows_per_deletion_vector, + OutputIterator output, + rmm::cuda_stream_view stream) +{ + auto const num_rows = row_index_column.size(); + auto const num_deletion_vectors = static_cast(deletion_vector_refs.size()); + + if (num_deletion_vectors == 1) { + CUDF_EXPECTS(rows_per_deletion_vector.front() == num_rows, + "Encountered a mismatch in the number of rows in the row index column and the " + "number of rows in the deletion vector"); + deletion_vector_refs.front().get().contains( + row_index_column.begin(), row_index_column.end(), output, stream); + return; + } + + auto deletion_vector_row_offsets = std::vector(deletion_vector_refs.size() + 1); + deletion_vector_row_offsets[0] = 0; + std::inclusive_scan(rows_per_deletion_vector.begin(), + rows_per_deletion_vector.end(), + deletion_vector_row_offsets.begin() + 1); + + CUDF_EXPECTS(deletion_vector_row_offsets.back() == num_rows, + "Encountered a mismatch in the number of rows in the row index column and the " + "number of rows in the deletion vector(s)"); + + constexpr auto stream_fork_threshold = 8; + if (num_deletion_vectors >= stream_fork_threshold) { + auto streams = cudf::detail::fork_streams(stream, num_deletion_vectors); + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), + [&](auto const dv_idx) { + deletion_vector_refs[dv_idx].get().contains_async( + row_index_column.begin() + deletion_vector_row_offsets[dv_idx], + row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], + output + deletion_vector_row_offsets[dv_idx], + streams[dv_idx]); + }); + cudf::detail::join_streams(streams, stream); + } else { + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), + [&](auto const dv_idx) { + deletion_vector_refs[dv_idx].get().contains_async( + row_index_column.begin() + deletion_vector_row_offsets[dv_idx], + row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], + output + deletion_vector_row_offsets[dv_idx], + stream); + }); + stream.synchronize(); + } +} + +} // namespace + +std::unique_ptr compute_row_mask_column( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span rows_per_deletion_vector, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto const num_rows = row_index_column.size(); + auto row_mask = rmm::device_buffer(num_rows * sizeof(bool), stream, mr); + auto row_mask_iter = cuda::make_transform_output_iterator( + static_cast(row_mask.data()), [] __device__(auto b) { return not b; }); + query_deletion_vectors( + row_index_column, deletion_vector_refs, rows_per_deletion_vector, row_mask_iter, stream); + return std::make_unique(cudf::data_type{cudf::type_id::BOOL8}, + num_rows, + std::move(row_mask), + rmm::device_buffer{0, stream, mr}, + 0); +} + +size_t compute_deleted_row_count( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span deletion_vector_row_counts, + rmm::cuda_stream_view stream) +{ + auto const num_rows = row_index_column.size(); + auto row_mask = + rmm::device_buffer(num_rows * sizeof(bool), stream, cudf::get_current_device_resource_ref()); + auto row_mask_iter = static_cast(row_mask.data()); + query_deletion_vectors( + row_index_column, deletion_vector_refs, deletion_vector_row_counts, row_mask_iter, stream); + return static_cast( + thrust::count(rmm::exec_policy_nosync(stream), row_mask_iter, row_mask_iter + num_rows, true)); +} + +namespace { + +/** + * @brief Consumes a specified number of rows from a queue of deletion vectors into vectors of + * currently active deletion vectors, their corresponding refs, and row counts + * + * @param num_rows The number of rows + * @param deletion_vectors The deletion vectors + * @param deletion_vector_row_counts The deletion vector row counts + * @param stream The stream + * @return A tuple of currently active deletion vectors, their corresponding refs, and row counts + */ +auto consume_deletion_vectors( + size_type num_rows, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream) +{ + std::vector row_counts; + std::vector deletion_vectors_impls; + std::vector> deletion_vector_refs; + size_type rows_filled = 0; + + while (std::cmp_less(rows_filled, num_rows)) { + CUDF_EXPECTS( + not(deletion_vector_row_counts.empty() or deletion_vectors.empty()), + "Encountered insufficient number of deletion vector row counts or deletion vectors: " + + std::to_string(deletion_vector_row_counts.size()) + " " + + std::to_string(deletion_vectors.size())); + + auto const row_count = + std::min(num_rows - rows_filled, deletion_vector_row_counts.front()); + row_counts.emplace_back(row_count); + + auto& deletion_vector = deletion_vectors.front(); + deletion_vector.construct_roaring_bitmap(rmm::mr::polymorphic_allocator{}, stream); + CUDF_EXPECTS(deletion_vector.roaring_bitmap != nullptr, "Failed to construct roaring_bitmap"); + deletion_vector_refs.emplace_back(std::ref(*(deletion_vector.roaring_bitmap))); + + if (std::cmp_less(row_count, deletion_vector_row_counts.front())) { + deletion_vector_row_counts.front() = deletion_vector_row_counts.front() - row_count; + } else { + deletion_vectors_impls.emplace_back(std::move(deletion_vectors.front())); + deletion_vectors.pop(); + deletion_vector_row_counts.pop(); + } + + rows_filled += row_count; + } + + return std::tuple{ + std::move(deletion_vectors_impls), std::move(deletion_vector_refs), std::move(row_counts)}; +} + +} // namespace + +std::unique_ptr compute_partial_row_mask_column( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto [_, dv_refs, dv_row_counts] = consume_deletion_vectors( + row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); + return compute_row_mask_column(row_index_column, dv_refs, dv_row_counts, stream, mr); +} + +size_t compute_partial_deleted_row_count( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream) +{ + auto [_, dv_refs, dv_row_counts] = consume_deletion_vectors( + row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); + return compute_deleted_row_count(row_index_column, dv_refs, dv_row_counts, stream); +} + +} // namespace cudf::io::parquet::experimental diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp new file mode 100644 index 00000000000..88f72a80657 --- /dev/null +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp @@ -0,0 +1,182 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace cudf::io::parquet::experimental { + +// Type alias for the cuco 64-bit roaring bitmap +using roaring_bitmap_type = + cuco::experimental::roaring_bitmap>; + +/** + * @brief Opaque wrapper class for cuco's 64-bit roaring bitmap + */ +struct chunked_parquet_reader::roaring_bitmap_impl { + /// Unique pointer to the roaring bitmap + std::unique_ptr roaring_bitmap; + /// Host span of the serialized roaring bitmap data + cudf::host_span const roaring_bitmap_data; + + explicit roaring_bitmap_impl( + cudf::host_span const& serialized_roaring_bitmap) + : roaring_bitmap_data{serialized_roaring_bitmap} + { + } + + roaring_bitmap_impl(roaring_bitmap_impl&&) = default; + roaring_bitmap_impl(roaring_bitmap_impl const&) = delete; + + /** + * @brief Constructs a roaring bitmap from the serialized data + * + * @param allocator Memory allocator + * @param stream CUDA stream to launch the query kernel + */ + void construct_roaring_bitmap(rmm::mr::polymorphic_allocator const& allocator, + rmm::cuda_stream_view stream) + { + if (roaring_bitmap == nullptr) { + CUDF_EXPECTS(not roaring_bitmap_data.empty(), + "Encountered empty data while constructing roaring bitmap"); + roaring_bitmap = std::make_unique( + static_cast(roaring_bitmap_data.data()), allocator, stream); + } + } +}; + +/** + * @brief Prepends the index column information to the table metadata + * + * @param metadata Table metadata + */ +void prepend_index_column_to_table_metadata(table_metadata& metadata); + +/** + * @brief Prepends the index column to the table columns + * + * @param table Table + * @param row_index_column Row index column + * @return A unique pointer to the prepended table + */ +std::unique_ptr prepend_index_column_to_table( + std::unique_ptr&& table, std::unique_ptr&& row_index_column); + +/** + * @brief Computes a row index column from the specified row group row offsets and counts + * + * @param row_group_offsets Row group offsets + * @param row_group_num_rows Row group row counts + * @param start_row Start row + * @param num_rows Number of rows + * @param stream CUDA stream to launch the query kernel + * @param mr Memory resource to allocate the output column's memory + * @return A unique pointer to the computed row index column + */ +std::unique_ptr compute_row_index_column( + cudf::host_span row_group_offsets, + cudf::host_span row_group_num_rows, + std::optional start_row, + size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Computes a chunk of the row index column from the specified row group offsets and counts + */ +std::unique_ptr compute_partial_row_index_column( + std::queue& row_group_offsets, + std::queue& row_group_num_rows, + size_t start_row, + size_type num_rows, + bool is_unspecified_row_group_data, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors + * + * @param row_index_column Row index column + * @param deletion_vector_refs Deletion vector refs + * @param rows_per_deletion_vector Rows per deletion vector + * @param stream CUDA stream to launch the query kernel + * @param mr Memory resource to allocate the output column's memory + * @return A unique pointer to the computed BOOL8 row mask column + */ +std::unique_ptr compute_row_mask_column( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span rows_per_deletion_vector, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Computes the number of rows deleted by the deletion vectors + * + * @param row_index_column Row index column + * @param deletion_vector_refs Deletion vector refs + * @param deletion_vector_row_counts Rows per deletion vector + * @param stream CUDA stream to launch the query kernel + * @return The number of deleted rows + */ +size_t compute_deleted_row_count( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span deletion_vector_row_counts, + rmm::cuda_stream_view stream); + +/** + * @brief Computes a chunk of the BOOL8 row mask column from the row index column and the deletion + * vectors + * + * @param row_index_column Row index column + * @param deletion_vectors Deletion vectors + * @param deletion_vector_row_counts Rows per deletion vector + * @param stream CUDA stream to launch the query kernel + * @param mr Memory resource to allocate the output column's memory + * @return A unique pointer to the computed BOOL8 row mask column + */ +std::unique_ptr compute_partial_row_mask_column( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Computes the number of deleted rows for a chunk of the row index column + * + * @param row_index_column Row index column + * @param deletion_vectors Deletion vectors + * @param deletion_vector_row_counts Rows per deletion vector + * @param stream CUDA stream to launch the query kernel + * @return The number of deleted rows + */ +size_t compute_partial_deleted_row_count( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream); + +} // namespace cudf::io::parquet::experimental diff --git a/cpp/tests/io/parquet_deletion_vectors_test.cu b/cpp/tests/io/parquet_deletion_vectors_test.cu index f3a43a1ef65..18386cc9f22 100644 --- a/cpp/tests/io/parquet_deletion_vectors_test.cu +++ b/cpp/tests/io/parquet_deletion_vectors_test.cu @@ -636,9 +636,10 @@ TEST_F(ParquetDeletionVectorsTest, CustomRowIndexColumn) } } -// Tests for compute_num_deleted_rows +// Test fixture for deletion vectors delete count API +struct DeletionVectorsCountTests : public ParquetDeletionVectorsTest {}; -TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsEmpty) +TEST_F(DeletionVectorsCountTests, EmptyDeletionVector) { auto const stream = cudf::get_default_stream(); @@ -648,7 +649,7 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsEmpty) EXPECT_EQ(result, 0); } -TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsNoRowIndex) +TEST_F(DeletionVectorsCountTests, NoRowIndex) { auto constexpr num_rows = 50'000; auto constexpr deletion_probability = 0.5; @@ -671,14 +672,16 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsNoRowIndex) .serialized_roaring_bitmaps = {deletion_vector}, .deletion_vector_row_counts = {num_rows}, .row_group_offsets = {}, - .row_group_num_rows = {num_rows}}; + .row_group_num_rows = {}}; - auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( - deletion_vector_info, std::numeric_limits::max(), stream); - EXPECT_EQ(result, static_cast(expected_deleted)); + for (auto chunk_size : {num_rows, num_rows / 2}) { + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, chunk_size, stream); + EXPECT_EQ(result, expected_deleted); + } } -TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsCustomRowIndex) +TEST_F(DeletionVectorsCountTests, CustomRowIndex) { auto constexpr num_rows = 25'000; auto constexpr num_row_groups = 5; @@ -696,10 +699,10 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsCustomRowIndex) row_group_offsets.begin() + 1, [&](auto i) { return static_cast(std::llround(row_group_offsets[i - 1] + 0.5e9)); }); - // Split num_rows into num_row_groups spans + // Split num_rows into spans auto row_group_splits = std::vector(num_row_groups - 1); { - std::mt19937 engine{0xf00d}; + std::mt19937 engine{0xdeaL}; std::uniform_int_distribution dist{1, num_rows}; std::generate(row_group_splits.begin(), row_group_splits.end(), [&]() { return dist(engine); }); std::sort(row_group_splits.begin(), row_group_splits.end()); @@ -728,7 +731,7 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsCustomRowIndex) auto const expected_row_mask = cudf::detail::make_host_vector( cudf::device_span(expected_row_mask_column->view().data(), num_rows), stream); - auto const expected_deleted = + size_t const expected_deleted = std::count(expected_row_mask.begin(), expected_row_mask.end(), false); auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ @@ -737,22 +740,19 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsCustomRowIndex) .row_group_offsets = row_group_offsets, .row_group_num_rows = row_group_num_rows}; - auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( - deletion_vector_info, std::numeric_limits::max(), stream); - EXPECT_EQ(result, static_cast(expected_deleted)); - - auto const result_chunked = cudf::io::parquet::experimental::compute_num_deleted_rows( - deletion_vector_info, - static_cast(std::llround(row_group_splits.front() * 1.2)), - stream); - EXPECT_EQ(result_chunked, static_cast(expected_deleted)); + for (auto chunk_size : + {num_rows, static_cast(std::llround(row_group_splits.front() * 1.2))}) { + auto const num_deleted_rows = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, chunk_size, stream); + EXPECT_EQ(num_deleted_rows, expected_deleted); + } } -TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsMultipleDeletionVectors) +TEST_F(DeletionVectorsCountTests, MultipleDeletionVectors) { auto constexpr num_rows_per_dv = 10'000; auto constexpr num_deletion_vectors = 5; - auto constexpr total_rows = num_rows_per_dv * num_deletion_vectors; + auto constexpr num_rows = num_rows_per_dv * num_deletion_vectors; auto constexpr deletion_probability = 0.3; auto const stream = cudf::get_default_stream(); @@ -761,12 +761,13 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsMultipleDeletionVectors) // Each deletion vector covers a separate row group with a distinct offset auto row_group_offsets = std::vector(num_deletion_vectors); auto row_group_num_rows = std::vector(num_deletion_vectors, num_rows_per_dv); - for (int i = 0; i < num_deletion_vectors; ++i) { - row_group_offsets[i] = static_cast(i) * 100'000; - } + std::transform(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), + row_group_offsets.begin(), + [](auto i) { return static_cast(i) * 100'000; }); auto expected_row_indices = - build_expected_row_indices(row_group_offsets, row_group_num_rows, total_rows); + build_expected_row_indices(row_group_offsets, row_group_num_rows, num_rows); size_t total_expected_deleted = 0; auto serialized_bitmaps = std::vector>{}; @@ -800,13 +801,11 @@ TEST_F(ParquetDeletionVectorsTest, ComputeNumDeletedRowsMultipleDeletionVectors) .row_group_offsets = row_group_offsets, .row_group_num_rows = row_group_num_rows}; - auto chunk_max_rows = std::numeric_limits::max(); - auto const result_default = cudf::io::parquet::experimental::compute_num_deleted_rows( - deletion_vector_info, chunk_max_rows, stream); - EXPECT_EQ(result_default, total_expected_deleted); - - chunk_max_rows = static_cast(std::llround(num_rows_per_dv * 1.5)); - auto const result_chunked = cudf::io::parquet::experimental::compute_num_deleted_rows( - deletion_vector_info, chunk_max_rows, stream); - EXPECT_EQ(result_chunked, total_expected_deleted); + // All rows in one chunk + for (auto chunk_size : + {num_rows, static_cast(std::llround(num_rows_per_dv * 1.2))}) { + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, chunk_size, stream); + EXPECT_EQ(result, total_expected_deleted); + } } From 8166a540aa73ac55af667bbc9e0a1e94a8b71fee Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 20:20:31 +0000 Subject: [PATCH 07/14] Apply review suggestions --- .../cudf/io/experimental/deletion_vectors.hpp | 2 +- .../parquet/experimental/deletion_vectors.cu | 21 ++-- .../experimental/deletion_vectors_helpers.hpp | 104 ++++++++++-------- 3 files changed, 72 insertions(+), 55 deletions(-) diff --git a/cpp/include/cudf/io/experimental/deletion_vectors.hpp b/cpp/include/cudf/io/experimental/deletion_vectors.hpp index f4b2d4d73f1..d2101cc139c 100644 --- a/cpp/include/cudf/io/experimental/deletion_vectors.hpp +++ b/cpp/include/cudf/io/experimental/deletion_vectors.hpp @@ -187,7 +187,7 @@ table_with_metadata read_parquet( * @param stream CUDA stream used for device memory operations and kernel launches * @return Number of rows deleted by the specified 64-bit roaring bitmap deletion vectors */ -size_t compute_num_deleted_rows( +[[nodiscard]] size_t compute_num_deleted_rows( deletion_vector_info const& deletion_vector_info, cudf::size_type max_chunk_rows = std::numeric_limits::max(), rmm::cuda_stream_view stream = cudf::get_default_stream()); diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index aa1bd126cf1..2d7fd4541e1 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -4,7 +4,8 @@ */ #include "deletion_vectors_helpers.hpp" - +# +#include #include #include #include @@ -109,6 +110,8 @@ bool chunked_parquet_reader::has_next() const { return _reader->has_next(); } */ table_with_metadata chunked_parquet_reader::read_chunk() { + CUDF_FUNC_RANGE(); + // Read a chunk of the parquet table auto [table, metadata] = _reader->read_chunk(); auto const num_rows = table->num_rows(); @@ -156,6 +159,8 @@ table_with_metadata read_parquet(parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + CUDF_FUNC_RANGE(); + auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; auto const& row_group_offsets = deletion_vector_info.row_group_offsets; @@ -230,10 +235,12 @@ table_with_metadata read_parquet(parquet_reader_options const& options, /** * @copydoc cudf::io::parquet::experimental::compute_num_deleted_rows */ -size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, - cudf::size_type max_chunk_rows, - rmm::cuda_stream_view stream) +[[nodiscard]] size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, + cudf::size_type max_chunk_rows, + rmm::cuda_stream_view stream) { + CUDF_FUNC_RANGE(); + auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; auto const& row_group_offsets = deletion_vector_info.row_group_offsets; @@ -248,9 +255,7 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info CUDF_EXPECTS( row_group_offsets.size() == row_group_num_rows.size(), "Encountered a mismatch in the number of row group offsets and row group row counts"); - CUDF_EXPECTS(max_chunk_rows > 0 and - std::cmp_less_equal(max_chunk_rows, std::numeric_limits::max()), - "max_chunk_rows must be in range (0, size_type max]"); + CUDF_EXPECTS(max_chunk_rows > 0, "Encountered an invalid chunk size"); auto const is_row_group_data_unspecified = row_group_offsets.empty(); @@ -292,7 +297,7 @@ size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info compute_partial_row_index_column(rg_offsets_queue, rg_counts_queue, start_row, - chunk_rows, + static_cast(chunk_rows), is_row_group_data_unspecified, stream, cudf::get_current_device_resource_ref()); diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp index 88f72a80657..bfc3c25026c 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp @@ -17,10 +17,8 @@ #include #include -#include #include -#include #include #include @@ -69,32 +67,33 @@ struct chunked_parquet_reader::roaring_bitmap_impl { /** * @brief Prepends the index column information to the table metadata * - * @param metadata Table metadata + * @param[in,out] metadata Table metadata to update with the index column name */ void prepend_index_column_to_table_metadata(table_metadata& metadata); /** * @brief Prepends the index column to the table columns * - * @param table Table - * @param row_index_column Row index column - * @return A unique pointer to the prepended table + * @param table Input table to prepend the index column to + * @param row_index_column Row index column to prepend + * @return A new table with the index column prepended to the input table columns */ -std::unique_ptr prepend_index_column_to_table( +[[nodiscard]] std::unique_ptr prepend_index_column_to_table( std::unique_ptr&& table, std::unique_ptr&& row_index_column); /** * @brief Computes a row index column from the specified row group row offsets and counts * - * @param row_group_offsets Row group offsets - * @param row_group_num_rows Row group row counts - * @param start_row Start row - * @param num_rows Number of rows - * @param stream CUDA stream to launch the query kernel - * @param mr Memory resource to allocate the output column's memory - * @return A unique pointer to the computed row index column + * @param row_group_offsets Host span of row offsets of each row group + * @param row_group_num_rows Host span of number of rows in each row group + * @param start_row Starting row index when row group offsets are empty + * @param num_rows Total number of rows in the output column + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to allocate the output column + * + * @return UINT64 column containing row indices */ -std::unique_ptr compute_row_index_column( +[[nodiscard]] std::unique_ptr compute_row_index_column( cudf::host_span row_group_offsets, cudf::host_span row_group_num_rows, std::optional start_row, @@ -103,9 +102,19 @@ std::unique_ptr compute_row_index_column( rmm::device_async_resource_ref mr); /** - * @brief Computes a chunk of the row index column from the specified row group offsets and counts + * @brief Computes a chunk of the row index column by consuming row group data from queues + * + * @param[in,out] row_group_offsets Queue of per-row-group starting row offsets + * @param[in,out] row_group_num_rows Queue of per-row-group row counts + * @param start_row Starting row index when row group data is unspecified + * @param num_rows Number of rows in the output column + * @param is_unspecified_row_group_data Whether to ignore queues and produce a simple sequence + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to allocate the output column + * + * @return UINT64 column containing row indices for this chunk */ -std::unique_ptr compute_partial_row_index_column( +[[nodiscard]] std::unique_ptr compute_partial_row_index_column( std::queue& row_group_offsets, std::queue& row_group_num_rows, size_t start_row, @@ -117,14 +126,15 @@ std::unique_ptr compute_partial_row_index_column( /** * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors * - * @param row_index_column Row index column - * @param deletion_vector_refs Deletion vector refs - * @param rows_per_deletion_vector Rows per deletion vector - * @param stream CUDA stream to launch the query kernel - * @param mr Memory resource to allocate the output column's memory - * @return A unique pointer to the computed BOOL8 row mask column + * @param row_index_column View of the row index column + * @param deletion_vector_refs Host span of cuco roaring bitmap references + * @param rows_per_deletion_vector Host span of number of rows per deletion vector + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to allocate the output column + * + * @return BOOL8 column where `true` indicates a non-deleted row */ -std::unique_ptr compute_row_mask_column( +[[nodiscard]] std::unique_ptr compute_row_mask_column( cudf::column_view const& row_index_column, cudf::host_span const> deletion_vector_refs, cudf::host_span rows_per_deletion_vector, @@ -134,30 +144,31 @@ std::unique_ptr compute_row_mask_column( /** * @brief Computes the number of rows deleted by the deletion vectors * - * @param row_index_column Row index column - * @param deletion_vector_refs Deletion vector refs - * @param deletion_vector_row_counts Rows per deletion vector - * @param stream CUDA stream to launch the query kernel - * @return The number of deleted rows + * @param row_index_column View of the row index column + * @param deletion_vector_refs Host span of cuco roaring bitmap references + * @param deletion_vector_row_counts Host span of number of rows per deletion vector + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Number of rows marked as deleted */ -size_t compute_deleted_row_count( +[[nodiscard]] size_t compute_deleted_row_count( cudf::column_view const& row_index_column, cudf::host_span const> deletion_vector_refs, cudf::host_span deletion_vector_row_counts, rmm::cuda_stream_view stream); /** - * @brief Computes a chunk of the BOOL8 row mask column from the row index column and the deletion - * vectors + * @brief Computes a chunk of the BOOL8 row mask column by consuming deletion vectors from queues + * + * @param row_index_column View of the row index column for the current chunk + * @param[in,out] deletion_vectors Queue of roaring bitmap wrappers to consume from + * @param[in,out] deletion_vector_row_counts Queue of per-deletion-vector row counts + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to allocate the output column * - * @param row_index_column Row index column - * @param deletion_vectors Deletion vectors - * @param deletion_vector_row_counts Rows per deletion vector - * @param stream CUDA stream to launch the query kernel - * @param mr Memory resource to allocate the output column's memory - * @return A unique pointer to the computed BOOL8 row mask column + * @return BOOL8 column where `true` indicates a non-deleted row */ -std::unique_ptr compute_partial_row_mask_column( +[[nodiscard]] std::unique_ptr compute_partial_row_mask_column( cudf::column_view const& row_index_column, std::queue& deletion_vectors, std::queue& deletion_vector_row_counts, @@ -165,15 +176,16 @@ std::unique_ptr compute_partial_row_mask_column( rmm::device_async_resource_ref mr); /** - * @brief Computes the number of deleted rows for a chunk of the row index column + * @brief Computes the number of deleted rows for a chunk by consuming deletion vectors from queues + * + * @param row_index_column View of the row index column for the current chunk + * @param[in,out] deletion_vectors Queue of roaring bitmap wrappers to consume from + * @param[in,out] deletion_vector_row_counts Queue of per-deletion-vector row counts + * @param stream CUDA stream used for device memory operations and kernel launches * - * @param row_index_column Row index column - * @param deletion_vectors Deletion vectors - * @param deletion_vector_row_counts Rows per deletion vector - * @param stream CUDA stream to launch the query kernel - * @return The number of deleted rows + * @return Number of deleted rows in this chunk */ -size_t compute_partial_deleted_row_count( +[[nodiscard]] size_t compute_partial_deleted_row_count( cudf::column_view const& row_index_column, std::queue& deletion_vectors, std::queue& deletion_vector_row_counts, From cff7844b419a6a74d2e0b7ef2a89840e6f6454ce Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 21:11:10 +0000 Subject: [PATCH 08/14] Update headers --- cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu | 2 +- cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu index 473f70cc7a4..aa9ebf4665c 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp index bfc3c25026c..49f8380f617 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ From 351153a0e76e34ffe891830ae988729826ffd34f Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 21:14:54 +0000 Subject: [PATCH 09/14] Minor --- .../io/parquet/experimental/deletion_vectors.cu | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index 2d7fd4541e1..da6d9fa77a3 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -286,13 +286,13 @@ table_with_metadata read_parquet(parquet_reader_options const& options, dv_row_counts_queue.push(deletion_vector_row_counts[i]); } - size_t rows_deleted = 0; - size_t rows_remaining = num_rows; + size_t deleted_rows = 0; + size_t remaining_rows = num_rows; size_t start_row = 0; - while (rows_remaining > 0) { - auto const chunk_rows = std::min(rows_remaining, max_chunk_rows); - + while (remaining_rows > 0) { + // Maximum number of rows to process in this chunk + auto const chunk_rows = std::min(remaining_rows, max_chunk_rows); auto row_index_column = compute_partial_row_index_column(rg_offsets_queue, rg_counts_queue, @@ -301,15 +301,14 @@ table_with_metadata read_parquet(parquet_reader_options const& options, is_row_group_data_unspecified, stream, cudf::get_current_device_resource_ref()); - - rows_deleted += compute_partial_deleted_row_count( + deleted_rows += compute_partial_deleted_row_count( row_index_column->view(), dv_queue, dv_row_counts_queue, stream); start_row += chunk_rows; - rows_remaining -= chunk_rows; + remaining_rows -= chunk_rows; } - return rows_deleted; + return deleted_rows; } } // namespace cudf::io::parquet::experimental From ed506a27c9dc806f1cb20ba2cc96331598e3f731 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 14:21:42 -0700 Subject: [PATCH 10/14] Update cpp/tests/io/parquet_deletion_vectors_test.cu Co-authored-by: David Wendt <45795991+davidwendt@users.noreply.github.com> --- cpp/tests/io/parquet_deletion_vectors_test.cu | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/tests/io/parquet_deletion_vectors_test.cu b/cpp/tests/io/parquet_deletion_vectors_test.cu index 18386cc9f22..5a1e1055f63 100644 --- a/cpp/tests/io/parquet_deletion_vectors_test.cu +++ b/cpp/tests/io/parquet_deletion_vectors_test.cu @@ -443,9 +443,7 @@ TYPED_TEST(RoaringBitmapBasicsTest, BitmapSerialization) // Validate stream.synchronize(); - EXPECT_TRUE(std::all_of(cuda::counting_iterator(0), - cuda::counting_iterator(num_keys), - [&](auto key) { return results[key] == is_even[key]; })); + EXPECT_TRUE(std::equal(results.begin(), results.end(), is_even)); } // Base test fixture for API tests From d4c734928f9993a7ad2d25d3fafc4ba63dd8fc3e Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 21:24:31 +0000 Subject: [PATCH 11/14] Apply suggestions --- .../parquet/experimental/deletion_vectors_helpers.cu | 11 +++++++++++ .../experimental/deletion_vectors_helpers.hpp | 12 ++---------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu index aa9ebf4665c..12fe06d711d 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu @@ -28,6 +28,17 @@ namespace cudf::io::parquet::experimental { +void chunked_parquet_reader::roaring_bitmap_impl::construct_roaring_bitmap( + rmm::mr::polymorphic_allocator const& allocator, rmm::cuda_stream_view stream) +{ + if (roaring_bitmap == nullptr) { + CUDF_EXPECTS(not roaring_bitmap_data.empty(), + "Encountered empty data while constructing roaring bitmap"); + roaring_bitmap = std::make_unique( + static_cast(roaring_bitmap_data.data()), allocator, stream); + } +} + void prepend_index_column_to_table_metadata(table_metadata& metadata) { auto updated_schema_info = std::vector{}; diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp index 49f8380f617..eae6086a20c 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp @@ -47,21 +47,13 @@ struct chunked_parquet_reader::roaring_bitmap_impl { roaring_bitmap_impl(roaring_bitmap_impl const&) = delete; /** - * @brief Constructs a roaring bitmap from the serialized data + * @brief Constructs a roaring bitmap from the serialized data if not already constructed * * @param allocator Memory allocator * @param stream CUDA stream to launch the query kernel */ void construct_roaring_bitmap(rmm::mr::polymorphic_allocator const& allocator, - rmm::cuda_stream_view stream) - { - if (roaring_bitmap == nullptr) { - CUDF_EXPECTS(not roaring_bitmap_data.empty(), - "Encountered empty data while constructing roaring bitmap"); - roaring_bitmap = std::make_unique( - static_cast(roaring_bitmap_data.data()), allocator, stream); - } - } + rmm::cuda_stream_view stream); }; /** From 3c9a027eca55674937785736c1a452c2a938b5a2 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 21:30:51 +0000 Subject: [PATCH 12/14] Remove typo --- cpp/src/io/parquet/experimental/deletion_vectors.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index da6d9fa77a3..45ff5712961 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -4,7 +4,7 @@ */ #include "deletion_vectors_helpers.hpp" -# + #include #include #include From caaaeaaa68f67539972626a0b20175bb5c6cf50a Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 31 Mar 2026 22:27:09 +0000 Subject: [PATCH 13/14] Java tests --- cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu index 12fe06d711d..ee30f916fa9 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu @@ -200,9 +200,9 @@ void query_deletion_vectors( auto const num_deletion_vectors = static_cast(deletion_vector_refs.size()); if (num_deletion_vectors == 1) { - CUDF_EXPECTS(rows_per_deletion_vector.front() == num_rows, - "Encountered a mismatch in the number of rows in the row index column and the " - "number of rows in the deletion vector"); + CUDF_EXPECTS( + std::cmp_greater_equal(rows_per_deletion_vector.front(), num_rows), + "Encountered insufficient deletion vector size to query the entire row index column"); deletion_vector_refs.front().get().contains( row_index_column.begin(), row_index_column.end(), output, stream); return; From d7d02429fd55c04fb20aa861f4fdd9766cac0555 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Wed, 1 Apr 2026 20:16:31 +0000 Subject: [PATCH 14/14] Refactor --- .../parquet/experimental/deletion_vectors.cu | 299 +++++++++--------- 1 file changed, 158 insertions(+), 141 deletions(-) diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index 45ff5712961..41883f7cc7c 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -20,147 +20,13 @@ namespace cudf::io::parquet::experimental { -/** - * @copydoc - * cudf::io::parquet::experimental::chunked_parquet_reader::chunked_parquet_reader - */ -chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, - std::size_t pass_read_limit, - parquet_reader_options const& options, - deletion_vector_info const& deletion_vector_info, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) - : _start_row{0}, - _is_unspecified_row_group_data{deletion_vector_info.row_group_offsets.empty()}, - _stream{stream}, - _mr{mr}, - // Use default mr for the internal chunked reader and row index column if we will - // be applying the deletion vector to produce the output table - _table_mr{deletion_vector_info.serialized_roaring_bitmaps.empty() - ? mr - : cudf::get_current_device_resource_ref()} -{ - auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; - auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; - auto const& row_group_offsets = deletion_vector_info.row_group_offsets; - auto const& row_group_num_rows = deletion_vector_info.row_group_num_rows; - - CUDF_EXPECTS( - row_group_offsets.size() == row_group_num_rows.size(), - "Encountered a mismatch in the number of row group offsets and row group row counts"); - CUDF_EXPECTS( - not options.get_filter().has_value(), - "Encountered a non-empty AST filter expression. Use a roaring64 bitmap deletion vector to " - "filter the table instead"); - CUDF_EXPECTS(serialized_roaring_bitmaps.empty() or - serialized_roaring_bitmaps.size() == deletion_vector_row_counts.size(), - "Encountered a mismatch in the number of deletion vectors and the number of rows " - "per deletion vector"); +namespace detail { - // Initialize the internal chunked parquet reader - _reader = std::make_unique( - chunk_read_limit, pass_read_limit, options, _stream, _table_mr); - - // Push row group offsets and counts to the internal queues - if (not row_group_offsets.empty()) { - auto iter = cuda::make_zip_iterator(row_group_offsets.begin(), row_group_num_rows.begin()); - std::for_each(iter, iter + row_group_offsets.size(), [&](auto const& elem) { - _row_group_row_offsets.push(cuda::std::get<0>(elem)); - _row_group_row_counts.push(cuda::std::get<1>(elem)); - }); - } - - // Push deletion vector data spans and row counts to the internal queues - if (not serialized_roaring_bitmaps.empty()) { - auto iter = cuda::make_zip_iterator(serialized_roaring_bitmaps.begin(), - deletion_vector_row_counts.begin()); - std::for_each(iter, iter + serialized_roaring_bitmaps.size(), [&](auto const& elem) { - _deletion_vectors.emplace(cuda::std::get<0>(elem)); - _deletion_vector_row_counts.push(cuda::std::get<1>(elem)); - }); - } -} - -/** - * @copydoc - * cudf::io::parquet::experimental::chunked_parquet_reader::chunked_parquet_reader - */ -chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, - parquet_reader_options const& options, +[[nodiscard]] table_with_metadata read_parquet(parquet_reader_options const& options, deletion_vector_info const& deletion_vector_info, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) - : chunked_parquet_reader( - chunk_read_limit, std::size_t{0}, options, deletion_vector_info, stream, mr) -{ -} - -/** - * @copydoc cudf::io::parquet::experimental::chunked_parquet_reader::~chunked_parquet_reader - */ -chunked_parquet_reader::~chunked_parquet_reader() = default; - -/** - * @copydoc cudf::io::parquet::experimental::chunked_parquet_reader::has_next - */ -bool chunked_parquet_reader::has_next() const { return _reader->has_next(); } - -/** - * @copydoc cudf::io::parquet::experimental::chunked_parquet_reader::read_chunk - */ -table_with_metadata chunked_parquet_reader::read_chunk() { - CUDF_FUNC_RANGE(); - - // Read a chunk of the parquet table - auto [table, metadata] = _reader->read_chunk(); - auto const num_rows = table->num_rows(); - - // Compute a chunk of the row index column from the specified row group offsets and counts - auto row_index_column = compute_partial_row_index_column(_row_group_row_offsets, - _row_group_row_counts, - _start_row, - num_rows, - _is_unspecified_row_group_data, - _stream, - _table_mr); - // Update the start row index for the next chunk - _start_row += num_rows; - - // Prepend row index column to the table columns - auto table_with_index = - prepend_index_column_to_table(std::move(table), std::move(row_index_column)); - - // Also prepend the row index column's metadata to the table schema - prepend_index_column_to_table_metadata(metadata); - - // Return early if deletion vector is not present - if (_deletion_vectors.empty()) { - return table_with_metadata{std::move(table_with_index), std::move(metadata)}; - } - - // Filter the table using the deletion vectors - auto row_mask = compute_partial_row_mask_column(table_with_index->get_column(0).view(), - _deletion_vectors, - _deletion_vector_row_counts, - _stream, - cudf::get_current_device_resource_ref()); - return table_with_metadata{ - // Supply user-provided mr to apply_boolean_mask to allocate output table's memory - cudf::apply_boolean_mask(table_with_index->view(), row_mask->view(), _stream, _mr), - std::move(metadata)}; -} - -/** - * @copydoc cudf::io::parquet::experimental::read_parquet - */ -table_with_metadata read_parquet(parquet_reader_options const& options, - deletion_vector_info const& deletion_vector_info, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; auto const& row_group_offsets = deletion_vector_info.row_group_offsets; @@ -232,15 +98,10 @@ table_with_metadata read_parquet(parquet_reader_options const& options, std::move(metadata)}; } -/** - * @copydoc cudf::io::parquet::experimental::compute_num_deleted_rows - */ [[nodiscard]] size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, cudf::size_type max_chunk_rows, rmm::cuda_stream_view stream) { - CUDF_FUNC_RANGE(); - auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; auto const& row_group_offsets = deletion_vector_info.row_group_offsets; @@ -311,4 +172,160 @@ table_with_metadata read_parquet(parquet_reader_options const& options, return deleted_rows; } +} // namespace detail + +/** + * @copydoc + * cudf::io::parquet::experimental::chunked_parquet_reader::chunked_parquet_reader + */ +chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, + std::size_t pass_read_limit, + parquet_reader_options const& options, + deletion_vector_info const& deletion_vector_info, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + : _start_row{0}, + _is_unspecified_row_group_data{deletion_vector_info.row_group_offsets.empty()}, + _stream{stream}, + _mr{mr}, + // Use default mr for the internal chunked reader and row index column if we will + // be applying the deletion vector to produce the output table + _table_mr{deletion_vector_info.serialized_roaring_bitmaps.empty() + ? mr + : cudf::get_current_device_resource_ref()} +{ + auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; + auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; + auto const& row_group_offsets = deletion_vector_info.row_group_offsets; + auto const& row_group_num_rows = deletion_vector_info.row_group_num_rows; + + CUDF_EXPECTS( + row_group_offsets.size() == row_group_num_rows.size(), + "Encountered a mismatch in the number of row group offsets and row group row counts"); + CUDF_EXPECTS( + not options.get_filter().has_value(), + "Encountered a non-empty AST filter expression. Use a roaring64 bitmap deletion vector to " + "filter the table instead"); + CUDF_EXPECTS(serialized_roaring_bitmaps.empty() or + serialized_roaring_bitmaps.size() == deletion_vector_row_counts.size(), + "Encountered a mismatch in the number of deletion vectors and the number of rows " + "per deletion vector"); + + // Initialize the internal chunked parquet reader + _reader = std::make_unique( + chunk_read_limit, pass_read_limit, options, _stream, _table_mr); + + // Push row group offsets and counts to the internal queues + if (not row_group_offsets.empty()) { + auto iter = cuda::make_zip_iterator(row_group_offsets.begin(), row_group_num_rows.begin()); + std::for_each(iter, iter + row_group_offsets.size(), [&](auto const& elem) { + _row_group_row_offsets.push(cuda::std::get<0>(elem)); + _row_group_row_counts.push(cuda::std::get<1>(elem)); + }); + } + + // Push deletion vector data spans and row counts to the internal queues + if (not serialized_roaring_bitmaps.empty()) { + auto iter = cuda::make_zip_iterator(serialized_roaring_bitmaps.begin(), + deletion_vector_row_counts.begin()); + std::for_each(iter, iter + serialized_roaring_bitmaps.size(), [&](auto const& elem) { + _deletion_vectors.emplace(cuda::std::get<0>(elem)); + _deletion_vector_row_counts.push(cuda::std::get<1>(elem)); + }); + } +} + +/** + * @copydoc + * cudf::io::parquet::experimental::chunked_parquet_reader::chunked_parquet_reader + */ +chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, + parquet_reader_options const& options, + deletion_vector_info const& deletion_vector_info, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + : chunked_parquet_reader( + chunk_read_limit, std::size_t{0}, options, deletion_vector_info, stream, mr) +{ +} + +/** + * @copydoc cudf::io::parquet::experimental::chunked_parquet_reader::~chunked_parquet_reader + */ +chunked_parquet_reader::~chunked_parquet_reader() = default; + +/** + * @copydoc cudf::io::parquet::experimental::chunked_parquet_reader::has_next + */ +bool chunked_parquet_reader::has_next() const { return _reader->has_next(); } + +/** + * @copydoc cudf::io::parquet::experimental::chunked_parquet_reader::read_chunk + */ +table_with_metadata chunked_parquet_reader::read_chunk() +{ + CUDF_FUNC_RANGE(); + + // Read a chunk of the parquet table + auto [table, metadata] = _reader->read_chunk(); + auto const num_rows = table->num_rows(); + + // Compute a chunk of the row index column from the specified row group offsets and counts + auto row_index_column = compute_partial_row_index_column(_row_group_row_offsets, + _row_group_row_counts, + _start_row, + num_rows, + _is_unspecified_row_group_data, + _stream, + _table_mr); + // Update the start row index for the next chunk + _start_row += num_rows; + + // Prepend row index column to the table columns + auto table_with_index = + prepend_index_column_to_table(std::move(table), std::move(row_index_column)); + + // Also prepend the row index column's metadata to the table schema + prepend_index_column_to_table_metadata(metadata); + + // Return early if deletion vector is not present + if (_deletion_vectors.empty()) { + return table_with_metadata{std::move(table_with_index), std::move(metadata)}; + } + + // Filter the table using the deletion vectors + auto row_mask = compute_partial_row_mask_column(table_with_index->get_column(0).view(), + _deletion_vectors, + _deletion_vector_row_counts, + _stream, + cudf::get_current_device_resource_ref()); + return table_with_metadata{ + // Supply user-provided mr to apply_boolean_mask to allocate output table's memory + cudf::apply_boolean_mask(table_with_index->view(), row_mask->view(), _stream, _mr), + std::move(metadata)}; +} + +/** + * @copydoc cudf::io::parquet::experimental::read_parquet + */ +table_with_metadata read_parquet(parquet_reader_options const& options, + deletion_vector_info const& deletion_vector_info, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::read_parquet(options, deletion_vector_info, stream, mr); +} + +/** + * @copydoc cudf::io::parquet::experimental::compute_num_deleted_rows + */ +size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, + cudf::size_type max_chunk_rows, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + return detail::compute_num_deleted_rows(deletion_vector_info, max_chunk_rows, stream); +} + } // namespace cudf::io::parquet::experimental