diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 428878e9456..1c7d4b01fc2 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -530,6 +530,7 @@ add_library( src/io/parquet/decode_preprocess.cu src/io/parquet/experimental/dictionary_page_filter.cu src/io/parquet/experimental/deletion_vectors.cu + src/io/parquet/experimental/deletion_vectors_helpers.cu src/io/parquet/experimental/hybrid_scan.cpp src/io/parquet/experimental/hybrid_scan_chunking.cu src/io/parquet/experimental/hybrid_scan_helpers.cpp diff --git a/cpp/include/cudf/io/experimental/deletion_vectors.hpp b/cpp/include/cudf/io/experimental/deletion_vectors.hpp index 8624c0d364e..d2101cc139c 100644 --- a/cpp/include/cudf/io/experimental/deletion_vectors.hpp +++ b/cpp/include/cudf/io/experimental/deletion_vectors.hpp @@ -178,6 +178,20 @@ table_with_metadata read_parquet( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref()); +/** + * @brief Computes the number of rows deleted by the serialized 64-bit roaring bitmap deletion + * vectors + * + * @param deletion_vector_info Information about the deletion vectors and the index column + * @param max_chunk_rows Maximum number of rows to process at a time + * @param stream CUDA stream used for device memory operations and kernel launches + * @return Number of rows deleted by the specified 64-bit roaring bitmap deletion vectors + */ +[[nodiscard]] size_t compute_num_deleted_rows( + deletion_vector_info const& deletion_vector_info, + cudf::size_type max_chunk_rows = std::numeric_limits::max(), + rmm::cuda_stream_view stream = cudf::get_default_stream()); + /** @} */ // end of group } // namespace io::parquet::experimental diff --git a/cpp/src/io/parquet/experimental/deletion_vectors.cu b/cpp/src/io/parquet/experimental/deletion_vectors.cu index 380968b9d9c..0bad35ea03a 100644 --- a/cpp/src/io/parquet/experimental/deletion_vectors.cu +++ b/cpp/src/io/parquet/experimental/deletion_vectors.cu @@ -3,401 +3,178 @@ * SPDX-License-Identifier: Apache-2.0 */ -#include -#include +#include "deletion_vectors_helpers.hpp" + +#include #include #include -#include #include #include -#include -#include #include -#include #include #include #include #include -#include -#include -#include -#include #include -#include namespace cudf::io::parquet::experimental { -// Type alias for the cuco 64-bit roaring bitmap -using roaring_bitmap_type = - cuco::experimental::roaring_bitmap>; - -/** - * @brief Opaque wrapper class for cuco's 64-bit roaring bitmap - */ -struct chunked_parquet_reader::roaring_bitmap_impl { - std::unique_ptr roaring_bitmap; - cudf::host_span const roaring_bitmap_data; - - explicit roaring_bitmap_impl( - cudf::host_span const& serialized_roaring_bitmap) - : roaring_bitmap_data{serialized_roaring_bitmap} - { - } - - roaring_bitmap_impl(roaring_bitmap_impl&&) = default; - roaring_bitmap_impl(roaring_bitmap_impl const&) = delete; +namespace detail { - void construct_roaring_bitmap(rmm::mr::polymorphic_allocator const& allocator, - rmm::cuda_stream_view stream) - { - if (roaring_bitmap == nullptr) { - CUDF_EXPECTS(not roaring_bitmap_data.empty(), - "Encountered empty data while constructing roaring bitmap"); - roaring_bitmap = std::make_unique( - static_cast(roaring_bitmap_data.data()), allocator, stream); - } - } -}; +[[nodiscard]] table_with_metadata read_parquet(parquet_reader_options const& options, + deletion_vector_info const& deletion_vector_info, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; + auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; + auto const& row_group_offsets = deletion_vector_info.row_group_offsets; + auto const& row_group_num_rows = deletion_vector_info.row_group_num_rows; -namespace { + CUDF_EXPECTS( + row_group_offsets.size() == row_group_num_rows.size(), + "Encountered a mismatch in the number of row group offsets and row group row counts"); + CUDF_EXPECTS( + not options.get_filter().has_value(), + "Encountered a non-empty AST filter expression. Use a roaring64 bitmap deletion vector to " + "filter the table instead"); + CUDF_EXPECTS(serialized_roaring_bitmaps.empty() or + serialized_roaring_bitmaps.size() == deletion_vector_row_counts.size(), + "Encountered a mismatch in the number of deletion vectors and the number of rows " + "per deletion vector"); -/** - * @brief Prepends the index column information to the table metadata - * - * @param[in,out] metadata Table metadata - */ -void prepend_index_column_to_table_metadata(table_metadata& metadata) -{ - auto updated_schema_info = std::vector{}; - updated_schema_info.reserve(metadata.schema_info.size() + 1); - updated_schema_info.emplace_back("index"); - updated_schema_info.insert(updated_schema_info.end(), - std::make_move_iterator(metadata.schema_info.begin()), - std::make_move_iterator(metadata.schema_info.end())); - metadata.schema_info = std::move(updated_schema_info); -} + // Use default mr to read parquet table and build row index column if we will be applying the + // deletion vector to produce a new table later + auto const table_mr = + serialized_roaring_bitmaps.empty() ? mr : cudf::get_current_device_resource_ref(); -/** - * @brief Prepends the index column to the table columns - * - * @param table Input table - * @param row_index_column row index column to prepend - * - * @return A table with the index column prepended to the table columns - */ -std::unique_ptr prepend_index_column_to_table( - std::unique_ptr&& table, std::unique_ptr&& row_index_column) -{ - auto index_and_table_columns = std::vector>{}; - index_and_table_columns.reserve(table->num_columns() + 1); - index_and_table_columns.push_back(std::move(row_index_column)); - auto table_columns = table->release(); - index_and_table_columns.insert(index_and_table_columns.end(), - std::make_move_iterator(table_columns.begin()), - std::make_move_iterator(table_columns.end())); - return std::make_unique(std::move(index_and_table_columns)); -} + auto [table, metadata] = cudf::io::read_parquet(options, stream, table_mr); + auto const num_rows = table->num_rows(); -/** - * @brief Computes a row index column from the specified row group row offsets and counts - * - * @param row_group_offsets Host span of row offsets of each row group - * @param row_group_num_rows Host span of number of rows in each row group - * @param start_row Starting row index if the row group offsets and counts are empty - * @param num_rows Number of rows in the table - * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row index column - * - * @return UINT64 column containing row indices - */ -std::unique_ptr compute_row_index_column( - cudf::host_span row_group_offsets, - cudf::host_span row_group_num_rows, - std::optional start_row, - size_type num_rows, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto const num_row_groups = static_cast(row_group_num_rows.size()); - - if (row_group_offsets.empty()) { - auto row_indices = rmm::device_buffer(num_rows * sizeof(size_t), stream, mr); - auto row_indices_iter = static_cast(row_indices.data()); - thrust::sequence(rmm::exec_policy_nosync(stream), - row_indices_iter, - row_indices_iter + num_rows, - start_row.value_or(size_t{0})); - return std::make_unique(cudf::data_type{cudf::type_id::UINT64}, - num_rows, - std::move(row_indices), - rmm::device_buffer{0, stream, mr}, - 0); - } + CUDF_EXPECTS( + row_group_num_rows.empty() or + std::cmp_equal(metadata.num_input_row_groups, row_group_num_rows.size()), + "Encountered a mismatch in the number of row groups in parquet file and the specified " + "row group offsets/row counts vectors"); - // Convert number of rows in each row group to row group span offsets. Here, a span is the range - // of (table) rows that belong to a row group - auto row_group_span_offsets = - cudf::detail::make_host_vector(num_row_groups + 1, stream); - row_group_span_offsets[0] = 0; - std::inclusive_scan( - row_group_num_rows.begin(), row_group_num_rows.end(), row_group_span_offsets.begin() + 1); - - CUDF_EXPECTS(row_group_span_offsets.back() == num_rows, - "Encountered a mismatch in the number of rows in the row index column and the " - "number of rows in the row group(s)"); - - auto row_indices = rmm::device_buffer(num_rows * sizeof(size_t), stream, mr); - auto row_indices_iter = static_cast(row_indices.data()); - thrust::fill(rmm::exec_policy_nosync(stream), row_indices_iter, row_indices_iter + num_rows, 1); - - auto row_group_keys = rmm::device_uvector(num_rows, stream); - thrust::fill(rmm::exec_policy_nosync(stream), row_group_keys.begin(), row_group_keys.end(), 0); - - // Scatter row group offsets and row group indices (or span indices) to their corresponding - // row group span offsets - auto d_row_group_offsets = cudf::detail::make_device_uvector_async( - row_group_offsets, stream, cudf::get_current_device_resource_ref()); - auto d_row_group_span_offsets = cudf::detail::make_device_uvector_async( - row_group_span_offsets, stream, cudf::get_current_device_resource_ref()); - auto in_iter = - thrust::make_zip_iterator(d_row_group_offsets.begin(), cuda::counting_iterator{0}); - auto out_iter = thrust::make_zip_iterator(row_indices_iter, row_group_keys.begin()); - thrust::scatter(rmm::exec_policy_nosync(stream), - in_iter, - in_iter + num_row_groups, - d_row_group_span_offsets.begin(), - out_iter); - - // Fill in the the rest of the row group span indices - thrust::inclusive_scan(rmm::exec_policy_nosync(stream), - row_group_keys.begin(), - row_group_keys.end(), - row_group_keys.begin(), - cuda::maximum()); - - // Segmented inclusive scan to compute the rest of the row indices - thrust::inclusive_scan_by_key(rmm::exec_policy_nosync(stream), - row_group_keys.begin(), - row_group_keys.end(), - row_indices_iter, - row_indices_iter); - - return std::make_unique(cudf::data_type{cudf::type_id::UINT64}, - num_rows, - std::move(row_indices), - rmm::device_buffer{0, stream, mr}, - 0); -} + // Compute a row index column from the specified row group offsets and counts + auto row_index_column = compute_row_index_column( + row_group_offsets, row_group_num_rows, std::nullopt, num_rows, stream, table_mr); -/** - * @brief Computes a chunk of the row index column from the specified row group offsets and counts - * - * @param row_group_offsets Queue of row offsets of eachrow group - * @param row_group_num_rows Queue of number of rows in each row group - * @param start_row Starting row index of the current table chunk - * @param num_rows Total number of rows in the current table chunk - * @param is_unspecified_row_group_data Whether the row group offsets and counts are unspecified - * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row index column - * - * @return UINT64 column containing row indices - */ -std::unique_ptr compute_partial_row_index_column( - std::queue& row_group_offsets, - std::queue& row_group_num_rows, - size_t start_row, - size_type num_rows, - bool is_unspecified_row_group_data, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - // Build a simple row index column if the row group offsets and counts are unspecified - if (is_unspecified_row_group_data) { - return compute_row_index_column({}, {}, start_row, num_rows, stream, mr); - } + // Prepend row index column to the table columns + auto table_with_index = + prepend_index_column_to_table(std::move(table), std::move(row_index_column)); - // Compute current table chunk's vectors of row group row offsets and counts from the input queues - std::vector row_offsets; - std::vector row_counts; - size_type rows_filled = 0; - while (std::cmp_less(rows_filled, num_rows)) { - CUDF_EXPECTS( - not(row_group_offsets.empty() or row_group_num_rows.empty()), - "Unable to compute the row index column from the specified row group offsets and counts"); - - // Compute how many rows can be extracted from the current row group - auto const row_count = std::min(num_rows - rows_filled, row_group_num_rows.front()); - row_counts.emplace_back(row_count); - row_offsets.emplace_back(row_group_offsets.front()); - - // If we still have remaining rows in the current row group, update its offset and row count - if (std::cmp_less(row_count, row_group_num_rows.front())) { - row_group_offsets.front() = row_group_offsets.front() + row_count; - row_group_num_rows.front() = row_group_num_rows.front() - row_count; - } else { - // Else if the row group is fully consumed, pop it from the queues - row_group_offsets.pop(); - row_group_num_rows.pop(); - } + prepend_index_column_to_table_metadata(metadata); - rows_filled += row_count; + if (serialized_roaring_bitmaps.empty()) { + return table_with_metadata{std::move(table_with_index), std::move(metadata)}; } - // Compute the row index column with the computed row group row offsets and counts - return compute_row_index_column(row_offsets, row_counts, std::nullopt, num_rows, stream, mr); + // Construct all deletion vectors and store their references + auto deletion_vectors = std::vector{}; + auto deletion_vectors_refs = std::vector>{}; + deletion_vectors.reserve(serialized_roaring_bitmaps.size()); + deletion_vectors_refs.reserve(serialized_roaring_bitmaps.size()); + std::transform(serialized_roaring_bitmaps.begin(), + serialized_roaring_bitmaps.end(), + std::back_inserter(deletion_vectors_refs), + [&](auto const& serialized_roaring_bitmap) { + deletion_vectors.emplace_back(serialized_roaring_bitmap.data(), + rmm::mr::polymorphic_allocator{}, + stream); + return std::ref(deletion_vectors.back()); + }); + auto row_mask = compute_row_mask_column(table_with_index->get_column(0).view(), + deletion_vectors_refs, + deletion_vector_row_counts, + stream, + cudf::get_current_device_resource_ref()); + // Filter the table using the deletion vector + return table_with_metadata{ + // Supply user-provided mr to apply_boolean_mask to allocate output table's memory + cudf::apply_boolean_mask(table_with_index->view(), row_mask->view(), stream, mr), + std::move(metadata)}; } -/** - * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors - * - * @note This function synchronizes the stream before returning the row mask column - * - * @param row_index_column View of the row index column - * @param deletion_vector_refs Host span of cuco roaring bitmap references - * @param rows_per_deletion_vector Host span of number of rows per deletion vector - * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row mask column - * - * @return Unique pointer to the row mask column - */ -std::unique_ptr compute_row_mask_column( - cudf::column_view const& row_index_column, - cudf::host_span const> deletion_vector_refs, - cudf::host_span rows_per_deletion_vector, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +[[nodiscard]] size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, + cudf::size_type max_chunk_rows, + rmm::cuda_stream_view stream) { - auto const num_rows = row_index_column.size(); - auto const num_deletion_vectors = static_cast(deletion_vector_refs.size()); - auto row_mask = rmm::device_buffer(num_rows * sizeof(bool), stream, mr); - - // Iterator to negate and store the output value from `contains_async` - auto row_mask_iter = thrust::make_transform_output_iterator( - static_cast(row_mask.data()), [] __device__(auto b) { return not b; }); - - if (num_deletion_vectors == 1) { - deletion_vector_refs.front().get().contains( - row_index_column.begin(), row_index_column.end(), row_mask_iter, stream); - return std::make_unique(cudf::data_type{cudf::type_id::BOOL8}, - num_rows, - std::move(row_mask), - rmm::device_buffer{0, stream, mr}, - 0); - } - - auto deletion_vector_row_offsets = std::vector(deletion_vector_refs.size() + 1); - deletion_vector_row_offsets[0] = 0; - std::inclusive_scan(rows_per_deletion_vector.begin(), - rows_per_deletion_vector.end(), - deletion_vector_row_offsets.begin() + 1); - - CUDF_EXPECTS(deletion_vector_row_offsets.back() == num_rows, - "Encountered a mismatch in the number of rows in the row index column and the " - "number of rows in the deletion vector(s)"); - - // Fork the stream if the number of deletion vectors is greater than the threshold - constexpr auto stream_fork_threshold = 8; - if (num_deletion_vectors >= stream_fork_threshold) { - auto streams = cudf::detail::fork_streams(stream, num_deletion_vectors); - std::for_each(cuda::counting_iterator{0}, - cuda::counting_iterator{num_deletion_vectors}, - [&](auto const dv_idx) { - deletion_vector_refs[dv_idx].get().contains_async( - row_index_column.begin() + deletion_vector_row_offsets[dv_idx], - row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], - row_mask_iter + deletion_vector_row_offsets[dv_idx], - streams[dv_idx]); - }); - cudf::detail::join_streams(streams, stream); - } else { - // Otherwise, launch the queries on the same stream - std::for_each(cuda::counting_iterator{0}, - cuda::counting_iterator{num_deletion_vectors}, - [&](auto const dv_idx) { - deletion_vector_refs[dv_idx].get().contains_async( - row_index_column.begin() + deletion_vector_row_offsets[dv_idx], - row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], - row_mask_iter + deletion_vector_row_offsets[dv_idx], - stream); - }); - stream.synchronize(); - } + auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; + auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; + auto const& row_group_offsets = deletion_vector_info.row_group_offsets; + auto const& row_group_num_rows = deletion_vector_info.row_group_num_rows; - return std::make_unique(cudf::data_type{cudf::type_id::BOOL8}, - num_rows, - std::move(row_mask), - rmm::device_buffer{0, stream, mr}, - 0); -} + if (serialized_roaring_bitmaps.empty()) { return 0; } -/** - * @brief Computes a chunk of the BOOL8 row mask column from the row index column and the deletion - * vectors - * - * @param row_index_column View of the row index column - * @param deletion_vectors Queue of roaring bitmap wrappers - * @param deletion_vector_row_counts Queue of number of rows in each deletion vector - * @param start_row Starting row index of the current table chunk - * @param stream CUDA stream for kernel launches and data transfers - * @param mr Device memory resource to allocate device memory for the row mask column - * - * @return Unique pointer to the row mask column - */ -std::unique_ptr compute_partial_row_mask_column( - cudf::column_view const& row_index_column, - std::queue& deletion_vectors, - std::queue& deletion_vector_row_counts, - size_t start_row, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto const num_rows = row_index_column.size(); - - std::vector row_counts; - std::vector deletion_vectors_impls; - std::vector> deletion_vector_refs; - size_type rows_filled = 0; - - while (std::cmp_less(rows_filled, num_rows)) { - CUDF_EXPECTS( - not(deletion_vector_row_counts.empty() or deletion_vectors.empty()), - "Encountered insufficient number of deletion vector row counts or deletion vectors: " + - std::to_string(deletion_vector_row_counts.size()) + " " + - std::to_string(deletion_vectors.size())); - - // Compute how many rows can be queried from the current roaring bitmap - auto const row_count = - std::min(num_rows - rows_filled, deletion_vector_row_counts.front()); - row_counts.emplace_back(row_count); - - auto& deletion_vector = deletion_vectors.front(); - deletion_vector.construct_roaring_bitmap(rmm::mr::polymorphic_allocator{}, stream); - CUDF_EXPECTS(deletion_vector.roaring_bitmap != nullptr, "Failed to construct roaring_bitmap"); - deletion_vector_refs.emplace_back(std::ref(*(deletion_vector.roaring_bitmap))); - - // If we still have remaining rows to query from the current roaring bitmap, update its row - // count - if (std::cmp_less(row_count, deletion_vector_row_counts.front())) { - deletion_vector_row_counts.front() = deletion_vector_row_counts.front() - row_count; - } else { - // Else if the deletion vector is fully queried, move it to the temporary vector and pop it - // from the queue - deletion_vectors_impls.emplace_back(std::move(deletion_vectors.front())); - deletion_vectors.pop(); - deletion_vector_row_counts.pop(); + // Validate input + CUDF_EXPECTS(std::cmp_equal(serialized_roaring_bitmaps.size(), deletion_vector_row_counts.size()), + "Encountered a mismatch in the number of deletion vector data spans and the number " + "of rows per deletion vector"); + CUDF_EXPECTS( + row_group_offsets.size() == row_group_num_rows.size(), + "Encountered a mismatch in the number of row group offsets and row group row counts"); + CUDF_EXPECTS(max_chunk_rows > 0, "Encountered an invalid chunk size"); + + auto const is_row_group_data_unspecified = row_group_offsets.empty(); + + auto const num_rows = [&]() { + auto const rows_in_dvs = std::accumulate( + deletion_vector_row_counts.begin(), deletion_vector_row_counts.end(), size_t{0}); + if (not is_row_group_data_unspecified) { + auto const rows_in_rgs = + std::accumulate(row_group_num_rows.begin(), row_group_num_rows.end(), size_t{0}); + CUDF_EXPECTS(std::cmp_equal(rows_in_dvs, rows_in_rgs), + "Encountered a mismatch in the number of rows across deletion vectors and the " + "number of rows across row groups"); } + return rows_in_dvs; + }(); + + std::queue rg_offsets_queue; + std::queue rg_counts_queue; + for (size_t i = 0; i < row_group_offsets.size(); ++i) { + rg_offsets_queue.push(row_group_offsets[i]); + rg_counts_queue.push(row_group_num_rows[i]); + } - rows_filled += row_count; + std::queue dv_queue; + std::queue dv_row_counts_queue; + for (size_t i = 0; i < serialized_roaring_bitmaps.size(); ++i) { + dv_queue.emplace(serialized_roaring_bitmaps[i]); + dv_row_counts_queue.push(deletion_vector_row_counts[i]); } - // Compute the row index column with the computed row group row offsets and counts - return compute_row_mask_column(row_index_column, deletion_vector_refs, row_counts, stream, mr); + size_t deleted_rows = 0; + size_t remaining_rows = num_rows; + size_t start_row = 0; + + while (remaining_rows > 0) { + // Maximum number of rows to process in this chunk + auto const chunk_rows = std::min(remaining_rows, max_chunk_rows); + auto row_index_column = + compute_partial_row_index_column(rg_offsets_queue, + rg_counts_queue, + start_row, + static_cast(chunk_rows), + is_row_group_data_unspecified, + stream, + cudf::get_current_device_resource_ref()); + deleted_rows += compute_partial_deleted_row_count( + row_index_column->view(), dv_queue, dv_row_counts_queue, stream); + + start_row += chunk_rows; + remaining_rows -= chunk_rows; + } + + return deleted_rows; } -} // namespace +} // namespace detail /** * @copydoc @@ -417,7 +194,7 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, // be applying the deletion vector to produce the output table _table_mr{deletion_vector_info.serialized_roaring_bitmaps.empty() ? mr - : rmm::mr::get_current_device_resource_ref()} + : cudf::get_current_device_resource_ref()} { auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; @@ -442,7 +219,7 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, // Push row group offsets and counts to the internal queues if (not row_group_offsets.empty()) { - auto iter = thrust::make_zip_iterator(row_group_offsets.begin(), row_group_num_rows.begin()); + auto iter = cuda::make_zip_iterator(row_group_offsets.begin(), row_group_num_rows.begin()); std::for_each(iter, iter + row_group_offsets.size(), [&](auto const& elem) { _row_group_row_offsets.push(cuda::std::get<0>(elem)); _row_group_row_counts.push(cuda::std::get<1>(elem)); @@ -451,8 +228,8 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, // Push deletion vector data spans and row counts to the internal queues if (not serialized_roaring_bitmaps.empty()) { - auto iter = thrust::make_zip_iterator(serialized_roaring_bitmaps.begin(), - deletion_vector_row_counts.begin()); + auto iter = cuda::make_zip_iterator(serialized_roaring_bitmaps.begin(), + deletion_vector_row_counts.begin()); std::for_each(iter, iter + serialized_roaring_bitmaps.size(), [&](auto const& elem) { _deletion_vectors.emplace(cuda::std::get<0>(elem)); _deletion_vector_row_counts.push(cuda::std::get<1>(elem)); @@ -489,6 +266,8 @@ bool chunked_parquet_reader::has_next() const { return _reader->has_next(); } */ table_with_metadata chunked_parquet_reader::read_chunk() { + CUDF_FUNC_RANGE(); + // Read a chunk of the parquet table auto [table, metadata] = _reader->read_chunk(); auto const num_rows = table->num_rows(); @@ -520,7 +299,6 @@ table_with_metadata chunked_parquet_reader::read_chunk() auto row_mask = compute_partial_row_mask_column(table_with_index->get_column(0).view(), _deletion_vectors, _deletion_vector_row_counts, - _start_row, _stream, cudf::get_current_device_resource_ref()); return table_with_metadata{ @@ -537,79 +315,19 @@ table_with_metadata read_parquet(parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto const& serialized_roaring_bitmaps = deletion_vector_info.serialized_roaring_bitmaps; - auto const& deletion_vector_row_counts = deletion_vector_info.deletion_vector_row_counts; - auto const& row_group_offsets = deletion_vector_info.row_group_offsets; - auto const& row_group_num_rows = deletion_vector_info.row_group_num_rows; - - CUDF_EXPECTS( - row_group_offsets.size() == row_group_num_rows.size(), - "Encountered a mismatch in the number of row group offsets and row group row counts"); - CUDF_EXPECTS( - not options.get_filter().has_value(), - "Encountered a non-empty AST filter expression. Use a roaring64 bitmap deletion vector to " - "filter the table instead"); - CUDF_EXPECTS(serialized_roaring_bitmaps.empty() or - serialized_roaring_bitmaps.size() == deletion_vector_row_counts.size(), - "Encountered a mismatch in the number of deletion vectors and the number of rows " - "per deletion vector"); - - // Use default mr to read parquet table and build row index column if we will be applying the - // deletion vector to produce a new table later - auto const table_mr = - serialized_roaring_bitmaps.empty() ? mr : rmm::mr::get_current_device_resource_ref(); - - // Read the parquet table - auto [table, metadata] = cudf::io::read_parquet(options, stream, table_mr); - auto const num_rows = table->num_rows(); - - CUDF_EXPECTS( - row_group_num_rows.empty() or - std::cmp_equal(metadata.num_input_row_groups, row_group_num_rows.size()), - "Encountered a mismatch in the number of row groups in parquet file and the specified " - "row group offsets/row counts vectors"); - - // Compute a row index column from the specified row group offsets and counts - auto row_index_column = compute_row_index_column( - row_group_offsets, row_group_num_rows, std::nullopt, num_rows, stream, table_mr); - - // Prepend row index column to the table columns - auto table_with_index = - prepend_index_column_to_table(std::move(table), std::move(row_index_column)); - - // Also prepend the row index column's metadata to the table schema - prepend_index_column_to_table_metadata(metadata); - - // Return early if roaring64 data is empty - if (serialized_roaring_bitmaps.empty()) { - return table_with_metadata{std::move(table_with_index), std::move(metadata)}; - } + CUDF_FUNC_RANGE(); + return detail::read_parquet(options, deletion_vector_info, stream, mr); +} - // Construct all deletion vectors and store their references - auto deletion_vectors = std::vector{}; - auto deletion_vectors_refs = std::vector>{}; - deletion_vectors.reserve(serialized_roaring_bitmaps.size()); - deletion_vectors_refs.reserve(serialized_roaring_bitmaps.size()); - std::transform(serialized_roaring_bitmaps.begin(), - serialized_roaring_bitmaps.end(), - std::back_inserter(deletion_vectors_refs), - [&](auto const& serialized_roaring_bitmap) { - deletion_vectors.emplace_back(serialized_roaring_bitmap.data(), - rmm::mr::polymorphic_allocator{}, - stream); - return std::ref(deletion_vectors.back()); - }); - // Compute the row mask column from the deletion vectors - auto row_mask = compute_row_mask_column(table_with_index->get_column(0).view(), - deletion_vectors_refs, - deletion_vector_row_counts, - stream, - cudf::get_current_device_resource_ref()); - // Filter the table using the deletion vector - return table_with_metadata{ - // Supply user-provided mr to apply_boolean_mask to allocate output table's memory - cudf::apply_boolean_mask(table_with_index->view(), row_mask->view(), stream, mr), - std::move(metadata)}; +/** + * @copydoc cudf::io::parquet::experimental::compute_num_deleted_rows + */ +size_t compute_num_deleted_rows(deletion_vector_info const& deletion_vector_info, + cudf::size_type max_chunk_rows, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + return detail::compute_num_deleted_rows(deletion_vector_info, max_chunk_rows, stream); } } // namespace cudf::io::parquet::experimental diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu new file mode 100644 index 00000000000..ee30f916fa9 --- /dev/null +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.cu @@ -0,0 +1,365 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "deletion_vectors_helpers.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace cudf::io::parquet::experimental { + +void chunked_parquet_reader::roaring_bitmap_impl::construct_roaring_bitmap( + rmm::mr::polymorphic_allocator const& allocator, rmm::cuda_stream_view stream) +{ + if (roaring_bitmap == nullptr) { + CUDF_EXPECTS(not roaring_bitmap_data.empty(), + "Encountered empty data while constructing roaring bitmap"); + roaring_bitmap = std::make_unique( + static_cast(roaring_bitmap_data.data()), allocator, stream); + } +} + +void prepend_index_column_to_table_metadata(table_metadata& metadata) +{ + auto updated_schema_info = std::vector{}; + updated_schema_info.reserve(metadata.schema_info.size() + 1); + updated_schema_info.emplace_back("index"); + updated_schema_info.insert(updated_schema_info.end(), + std::make_move_iterator(metadata.schema_info.begin()), + std::make_move_iterator(metadata.schema_info.end())); + metadata.schema_info = std::move(updated_schema_info); +} + +std::unique_ptr prepend_index_column_to_table( + std::unique_ptr&& table, std::unique_ptr&& row_index_column) +{ + auto index_and_table_columns = std::vector>{}; + index_and_table_columns.reserve(table->num_columns() + 1); + index_and_table_columns.push_back(std::move(row_index_column)); + auto table_columns = table->release(); + index_and_table_columns.insert(index_and_table_columns.end(), + std::make_move_iterator(table_columns.begin()), + std::make_move_iterator(table_columns.end())); + return std::make_unique(std::move(index_and_table_columns)); +} + +std::unique_ptr compute_row_index_column( + cudf::host_span row_group_offsets, + cudf::host_span row_group_num_rows, + std::optional start_row, + size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto const num_row_groups = static_cast(row_group_num_rows.size()); + + if (row_group_offsets.empty()) { + auto row_indices = rmm::device_buffer(num_rows * sizeof(size_t), stream, mr); + auto row_indices_iter = static_cast(row_indices.data()); + thrust::sequence(rmm::exec_policy_nosync(stream), + row_indices_iter, + row_indices_iter + num_rows, + start_row.value_or(size_t{0})); + return std::make_unique(cudf::data_type{cudf::type_id::UINT64}, + num_rows, + std::move(row_indices), + rmm::device_buffer{0, stream, mr}, + 0); + } + + auto row_group_span_offsets = + cudf::detail::make_host_vector(num_row_groups + 1, stream); + row_group_span_offsets[0] = 0; + std::inclusive_scan( + row_group_num_rows.begin(), row_group_num_rows.end(), row_group_span_offsets.begin() + 1); + + CUDF_EXPECTS(row_group_span_offsets.back() == num_rows, + "Encountered a mismatch in the number of rows in the row index column and the " + "number of rows in the row group(s)"); + + auto row_indices = rmm::device_buffer(num_rows * sizeof(size_t), stream, mr); + auto row_indices_iter = static_cast(row_indices.data()); + thrust::fill(rmm::exec_policy_nosync(stream), row_indices_iter, row_indices_iter + num_rows, 1); + + auto row_group_keys = rmm::device_uvector(num_rows, stream); + thrust::fill(rmm::exec_policy_nosync(stream), row_group_keys.begin(), row_group_keys.end(), 0); + + auto d_row_group_offsets = cudf::detail::make_device_uvector_async( + row_group_offsets, stream, cudf::get_current_device_resource_ref()); + auto d_row_group_span_offsets = cudf::detail::make_device_uvector_async( + row_group_span_offsets, stream, cudf::get_current_device_resource_ref()); + auto in_iter = + cuda::make_zip_iterator(d_row_group_offsets.begin(), cuda::counting_iterator(0)); + auto out_iter = cuda::make_zip_iterator(row_indices_iter, row_group_keys.begin()); + thrust::scatter(rmm::exec_policy_nosync(stream), + in_iter, + in_iter + num_row_groups, + d_row_group_span_offsets.begin(), + out_iter); + + thrust::inclusive_scan(rmm::exec_policy_nosync(stream), + row_group_keys.begin(), + row_group_keys.end(), + row_group_keys.begin(), + cuda::maximum()); + + thrust::inclusive_scan_by_key(rmm::exec_policy_nosync(stream), + row_group_keys.begin(), + row_group_keys.end(), + row_indices_iter, + row_indices_iter); + + return std::make_unique(cudf::data_type{cudf::type_id::UINT64}, + num_rows, + std::move(row_indices), + rmm::device_buffer{0, stream, mr}, + 0); +} + +std::unique_ptr compute_partial_row_index_column( + std::queue& row_group_offsets, + std::queue& row_group_num_rows, + size_t start_row, + size_type num_rows, + bool is_unspecified_row_group_data, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + if (is_unspecified_row_group_data) { + return compute_row_index_column({}, {}, start_row, num_rows, stream, mr); + } + + std::vector row_offsets; + std::vector row_counts; + size_type rows_filled = 0; + while (std::cmp_less(rows_filled, num_rows)) { + CUDF_EXPECTS( + not(row_group_offsets.empty() or row_group_num_rows.empty()), + "Unable to compute the row index column from the specified row group offsets and counts"); + + auto const row_count = std::min(num_rows - rows_filled, row_group_num_rows.front()); + row_counts.emplace_back(row_count); + row_offsets.emplace_back(row_group_offsets.front()); + + if (std::cmp_less(row_count, row_group_num_rows.front())) { + row_group_offsets.front() = row_group_offsets.front() + row_count; + row_group_num_rows.front() = row_group_num_rows.front() - row_count; + } else { + row_group_offsets.pop(); + row_group_num_rows.pop(); + } + + rows_filled += row_count; + } + + return compute_row_index_column(row_offsets, row_counts, std::nullopt, num_rows, stream, mr); +} + +namespace { + +/** + * @brief Queries the deletion vectors and writes the result to the output iterator + * + * @tparam OutputIterator The output iterator type + * + * @param row_index_column Row index column + * @param deletion_vector_refs Deletion vector refs + * @param rows_per_deletion_vector Rows per deletion vector + * @param output Output iterator + * @param stream CUDA stream to launch the query kernel + */ +template +void query_deletion_vectors( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span rows_per_deletion_vector, + OutputIterator output, + rmm::cuda_stream_view stream) +{ + auto const num_rows = row_index_column.size(); + auto const num_deletion_vectors = static_cast(deletion_vector_refs.size()); + + if (num_deletion_vectors == 1) { + CUDF_EXPECTS( + std::cmp_greater_equal(rows_per_deletion_vector.front(), num_rows), + "Encountered insufficient deletion vector size to query the entire row index column"); + deletion_vector_refs.front().get().contains( + row_index_column.begin(), row_index_column.end(), output, stream); + return; + } + + auto deletion_vector_row_offsets = std::vector(deletion_vector_refs.size() + 1); + deletion_vector_row_offsets[0] = 0; + std::inclusive_scan(rows_per_deletion_vector.begin(), + rows_per_deletion_vector.end(), + deletion_vector_row_offsets.begin() + 1); + + CUDF_EXPECTS(deletion_vector_row_offsets.back() == num_rows, + "Encountered a mismatch in the number of rows in the row index column and the " + "number of rows in the deletion vector(s)"); + + constexpr auto stream_fork_threshold = 8; + if (num_deletion_vectors >= stream_fork_threshold) { + auto streams = cudf::detail::fork_streams(stream, num_deletion_vectors); + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), + [&](auto const dv_idx) { + deletion_vector_refs[dv_idx].get().contains_async( + row_index_column.begin() + deletion_vector_row_offsets[dv_idx], + row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], + output + deletion_vector_row_offsets[dv_idx], + streams[dv_idx]); + }); + cudf::detail::join_streams(streams, stream); + } else { + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), + [&](auto const dv_idx) { + deletion_vector_refs[dv_idx].get().contains_async( + row_index_column.begin() + deletion_vector_row_offsets[dv_idx], + row_index_column.begin() + deletion_vector_row_offsets[dv_idx + 1], + output + deletion_vector_row_offsets[dv_idx], + stream); + }); + stream.synchronize(); + } +} + +} // namespace + +std::unique_ptr compute_row_mask_column( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span rows_per_deletion_vector, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto const num_rows = row_index_column.size(); + auto row_mask = rmm::device_buffer(num_rows * sizeof(bool), stream, mr); + auto row_mask_iter = cuda::make_transform_output_iterator( + static_cast(row_mask.data()), [] __device__(auto b) { return not b; }); + query_deletion_vectors( + row_index_column, deletion_vector_refs, rows_per_deletion_vector, row_mask_iter, stream); + return std::make_unique(cudf::data_type{cudf::type_id::BOOL8}, + num_rows, + std::move(row_mask), + rmm::device_buffer{0, stream, mr}, + 0); +} + +size_t compute_deleted_row_count( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span deletion_vector_row_counts, + rmm::cuda_stream_view stream) +{ + auto const num_rows = row_index_column.size(); + auto row_mask = + rmm::device_buffer(num_rows * sizeof(bool), stream, cudf::get_current_device_resource_ref()); + auto row_mask_iter = static_cast(row_mask.data()); + query_deletion_vectors( + row_index_column, deletion_vector_refs, deletion_vector_row_counts, row_mask_iter, stream); + return static_cast( + thrust::count(rmm::exec_policy_nosync(stream), row_mask_iter, row_mask_iter + num_rows, true)); +} + +namespace { + +/** + * @brief Consumes a specified number of rows from a queue of deletion vectors into vectors of + * currently active deletion vectors, their corresponding refs, and row counts + * + * @param num_rows The number of rows + * @param deletion_vectors The deletion vectors + * @param deletion_vector_row_counts The deletion vector row counts + * @param stream The stream + * @return A tuple of currently active deletion vectors, their corresponding refs, and row counts + */ +auto consume_deletion_vectors( + size_type num_rows, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream) +{ + std::vector row_counts; + std::vector deletion_vectors_impls; + std::vector> deletion_vector_refs; + size_type rows_filled = 0; + + while (std::cmp_less(rows_filled, num_rows)) { + CUDF_EXPECTS( + not(deletion_vector_row_counts.empty() or deletion_vectors.empty()), + "Encountered insufficient number of deletion vector row counts or deletion vectors: " + + std::to_string(deletion_vector_row_counts.size()) + " " + + std::to_string(deletion_vectors.size())); + + auto const row_count = + std::min(num_rows - rows_filled, deletion_vector_row_counts.front()); + row_counts.emplace_back(row_count); + + auto& deletion_vector = deletion_vectors.front(); + deletion_vector.construct_roaring_bitmap(rmm::mr::polymorphic_allocator{}, stream); + CUDF_EXPECTS(deletion_vector.roaring_bitmap != nullptr, "Failed to construct roaring_bitmap"); + deletion_vector_refs.emplace_back(std::ref(*(deletion_vector.roaring_bitmap))); + + if (std::cmp_less(row_count, deletion_vector_row_counts.front())) { + deletion_vector_row_counts.front() = deletion_vector_row_counts.front() - row_count; + } else { + deletion_vectors_impls.emplace_back(std::move(deletion_vectors.front())); + deletion_vectors.pop(); + deletion_vector_row_counts.pop(); + } + + rows_filled += row_count; + } + + return std::tuple{ + std::move(deletion_vectors_impls), std::move(deletion_vector_refs), std::move(row_counts)}; +} + +} // namespace + +std::unique_ptr compute_partial_row_mask_column( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto [_, dv_refs, dv_row_counts] = consume_deletion_vectors( + row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); + return compute_row_mask_column(row_index_column, dv_refs, dv_row_counts, stream, mr); +} + +size_t compute_partial_deleted_row_count( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream) +{ + auto [_, dv_refs, dv_row_counts] = consume_deletion_vectors( + row_index_column.size(), deletion_vectors, deletion_vector_row_counts, stream); + return compute_deleted_row_count(row_index_column, dv_refs, dv_row_counts, stream); +} + +} // namespace cudf::io::parquet::experimental diff --git a/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp new file mode 100644 index 00000000000..eae6086a20c --- /dev/null +++ b/cpp/src/io/parquet/experimental/deletion_vectors_helpers.hpp @@ -0,0 +1,186 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include + +namespace cudf::io::parquet::experimental { + +// Type alias for the cuco 64-bit roaring bitmap +using roaring_bitmap_type = + cuco::experimental::roaring_bitmap>; + +/** + * @brief Opaque wrapper class for cuco's 64-bit roaring bitmap + */ +struct chunked_parquet_reader::roaring_bitmap_impl { + /// Unique pointer to the roaring bitmap + std::unique_ptr roaring_bitmap; + /// Host span of the serialized roaring bitmap data + cudf::host_span const roaring_bitmap_data; + + explicit roaring_bitmap_impl( + cudf::host_span const& serialized_roaring_bitmap) + : roaring_bitmap_data{serialized_roaring_bitmap} + { + } + + roaring_bitmap_impl(roaring_bitmap_impl&&) = default; + roaring_bitmap_impl(roaring_bitmap_impl const&) = delete; + + /** + * @brief Constructs a roaring bitmap from the serialized data if not already constructed + * + * @param allocator Memory allocator + * @param stream CUDA stream to launch the query kernel + */ + void construct_roaring_bitmap(rmm::mr::polymorphic_allocator const& allocator, + rmm::cuda_stream_view stream); +}; + +/** + * @brief Prepends the index column information to the table metadata + * + * @param[in,out] metadata Table metadata to update with the index column name + */ +void prepend_index_column_to_table_metadata(table_metadata& metadata); + +/** + * @brief Prepends the index column to the table columns + * + * @param table Input table to prepend the index column to + * @param row_index_column Row index column to prepend + * @return A new table with the index column prepended to the input table columns + */ +[[nodiscard]] std::unique_ptr prepend_index_column_to_table( + std::unique_ptr&& table, std::unique_ptr&& row_index_column); + +/** + * @brief Computes a row index column from the specified row group row offsets and counts + * + * @param row_group_offsets Host span of row offsets of each row group + * @param row_group_num_rows Host span of number of rows in each row group + * @param start_row Starting row index when row group offsets are empty + * @param num_rows Total number of rows in the output column + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to allocate the output column + * + * @return UINT64 column containing row indices + */ +[[nodiscard]] std::unique_ptr compute_row_index_column( + cudf::host_span row_group_offsets, + cudf::host_span row_group_num_rows, + std::optional start_row, + size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Computes a chunk of the row index column by consuming row group data from queues + * + * @param[in,out] row_group_offsets Queue of per-row-group starting row offsets + * @param[in,out] row_group_num_rows Queue of per-row-group row counts + * @param start_row Starting row index when row group data is unspecified + * @param num_rows Number of rows in the output column + * @param is_unspecified_row_group_data Whether to ignore queues and produce a simple sequence + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to allocate the output column + * + * @return UINT64 column containing row indices for this chunk + */ +[[nodiscard]] std::unique_ptr compute_partial_row_index_column( + std::queue& row_group_offsets, + std::queue& row_group_num_rows, + size_t start_row, + size_type num_rows, + bool is_unspecified_row_group_data, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Computes a BOOL8 row mask column from the specified row index column and deletion vectors + * + * @param row_index_column View of the row index column + * @param deletion_vector_refs Host span of cuco roaring bitmap references + * @param rows_per_deletion_vector Host span of number of rows per deletion vector + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to allocate the output column + * + * @return BOOL8 column where `true` indicates a non-deleted row + */ +[[nodiscard]] std::unique_ptr compute_row_mask_column( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span rows_per_deletion_vector, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Computes the number of rows deleted by the deletion vectors + * + * @param row_index_column View of the row index column + * @param deletion_vector_refs Host span of cuco roaring bitmap references + * @param deletion_vector_row_counts Host span of number of rows per deletion vector + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Number of rows marked as deleted + */ +[[nodiscard]] size_t compute_deleted_row_count( + cudf::column_view const& row_index_column, + cudf::host_span const> deletion_vector_refs, + cudf::host_span deletion_vector_row_counts, + rmm::cuda_stream_view stream); + +/** + * @brief Computes a chunk of the BOOL8 row mask column by consuming deletion vectors from queues + * + * @param row_index_column View of the row index column for the current chunk + * @param[in,out] deletion_vectors Queue of roaring bitmap wrappers to consume from + * @param[in,out] deletion_vector_row_counts Queue of per-deletion-vector row counts + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to allocate the output column + * + * @return BOOL8 column where `true` indicates a non-deleted row + */ +[[nodiscard]] std::unique_ptr compute_partial_row_mask_column( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Computes the number of deleted rows for a chunk by consuming deletion vectors from queues + * + * @param row_index_column View of the row index column for the current chunk + * @param[in,out] deletion_vectors Queue of roaring bitmap wrappers to consume from + * @param[in,out] deletion_vector_row_counts Queue of per-deletion-vector row counts + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Number of deleted rows in this chunk + */ +[[nodiscard]] size_t compute_partial_deleted_row_count( + cudf::column_view const& row_index_column, + std::queue& deletion_vectors, + std::queue& deletion_vector_row_counts, + rmm::cuda_stream_view stream); + +} // namespace cudf::io::parquet::experimental diff --git a/cpp/tests/io/parquet_deletion_vectors_test.cu b/cpp/tests/io/parquet_deletion_vectors_test.cu index 0c5d46301b7..98c9ccde61c 100644 --- a/cpp/tests/io/parquet_deletion_vectors_test.cu +++ b/cpp/tests/io/parquet_deletion_vectors_test.cu @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -16,6 +17,7 @@ #include #include +#include #include #include @@ -87,15 +89,13 @@ auto build_expected_row_indices(cudf::host_span row_group_off expected_row_indices.begin()); // Inclusive scan to compute the rest of the row indices - std::for_each(cuda::counting_iterator{0}, - cuda::counting_iterator{num_row_groups}, - [&](auto i) { - auto start_row_index = row_group_span_offsets[i]; - auto end_row_index = row_group_span_offsets[i + 1]; - std::inclusive_scan(expected_row_indices.begin() + start_row_index, - expected_row_indices.begin() + end_row_index, - expected_row_indices.begin() + start_row_index); - }); + std::for_each(cuda::counting_iterator(0), cuda::counting_iterator(num_row_groups), [&](auto i) { + auto start_row_index = row_group_span_offsets[i]; + auto end_row_index = row_group_span_offsets[i + 1]; + std::inclusive_scan(expected_row_indices.begin() + start_row_index, + expected_row_indices.begin() + end_row_index, + expected_row_indices.begin() + start_row_index); + }); return expected_row_indices; } @@ -154,8 +154,8 @@ auto build_deletion_vector_and_expected_row_mask(cudf::size_type num_rows, auto roaring64_context = roaring::api::roaring64_bulk_context_t{.high_bytes = {0, 0, 0, 0, 0, 0}, .leaf = nullptr}; - std::for_each(cuda::counting_iterator{0}, - cuda::counting_iterator{num_rows}, + std::for_each(cuda::counting_iterator(0), + cuda::counting_iterator(num_rows), [&](auto row_idx) { // Insert provided host row index if the row is deleted in the row mask if (not expected_row_mask[row_idx]) { @@ -201,8 +201,8 @@ std::unique_ptr build_expected_table( auto index_and_columns = std::vector{}; index_and_columns.reserve(input_table_view.num_columns() + 1); index_and_columns.push_back(expected_row_index_column); - std::transform(cuda::counting_iterator{0}, - cuda::counting_iterator{input_table_view.num_columns()}, + std::transform(cuda::counting_iterator(0), + cuda::counting_iterator(input_table_view.num_columns()), std::back_inserter(index_and_columns), [&](auto col_idx) { return input_table_view.column(col_idx); }); return cudf::apply_boolean_mask(cudf::table_view{index_and_columns}, row_mask_column, stream, mr); @@ -382,7 +382,7 @@ TYPED_TEST(RoaringBitmapBasicsTest, BitmapSerialization) roaring::api::roaring64_bulk_context_t{.high_bytes = {0, 0, 0, 0, 0, 0}, .leaf = nullptr}; std::for_each( - cuda::counting_iterator{0}, cuda::counting_iterator{num_keys}, [&](auto key) { + cuda::counting_iterator(0), cuda::counting_iterator(num_keys), [&](auto key) { if (is_even[key]) { roaring::api::roaring64_bitmap_add_bulk(roaring64_bitmap, &roaring64_context, key); } @@ -405,7 +405,7 @@ TYPED_TEST(RoaringBitmapBasicsTest, BitmapSerialization) auto roaring_context = roaring::api::roaring_bulk_context_t{}; std::for_each( - cuda::counting_iterator{0}, cuda::counting_iterator{num_keys}, [&](auto key) { + cuda::counting_iterator(0), cuda::counting_iterator(num_keys), [&](auto key) { if (is_even[key]) { roaring::api::roaring_bitmap_add_bulk(roaring_bitmap, &roaring_context, key); } @@ -435,17 +435,15 @@ TYPED_TEST(RoaringBitmapBasicsTest, BitmapSerialization) // Query the roaring bitmap auto contained = rmm::device_uvector(num_keys, stream, mr); - roaring_bitmap.contains_async(cuda::counting_iterator{0}, - cuda::counting_iterator{num_keys}, + roaring_bitmap.contains_async(cuda::counting_iterator(0), + cuda::counting_iterator(num_keys), contained.data(), stream); auto results = cudf::detail::make_host_vector_async(contained, stream); // Validate stream.synchronize(); - EXPECT_TRUE(std::all_of(cuda::counting_iterator{0}, - cuda::counting_iterator{num_keys}, - [&](auto key) { return results[key] == is_even[key]; })); + EXPECT_TRUE(std::equal(results.begin(), results.end(), is_even)); } // Base test fixture for API tests @@ -542,14 +540,13 @@ TEST_F(ParquetDeletionVectorsTest, CustomRowIndexColumn) } // Row offsets for each row group - arbitrary, only used to build the UINT64 `index` column - auto row_group_offsets = std::vector(num_row_groups); - row_group_offsets[0] = static_cast(std::llround(1e9)); - std::transform(cuda::counting_iterator{1}, - cuda::counting_iterator{num_row_groups}, - row_group_offsets.begin() + 1, - [&](auto i) { - return static_cast(std::llround(row_group_offsets[i - 1] + 0.5e9)); - }); + auto row_group_offsets = std::vector(num_row_groups); + row_group_offsets[0] = static_cast(std::llround(1e9)); + std::transform( + cuda::counting_iterator(1), + cuda::counting_iterator(num_row_groups), + row_group_offsets.begin() + 1, + [&](auto i) { return static_cast(std::llround(row_group_offsets[i - 1] + 0.5e9)); }); // Split the `num_rows` into `num_row_groups` spans auto row_group_splits = std::vector(num_row_groups - 1); @@ -636,3 +633,177 @@ TEST_F(ParquetDeletionVectorsTest, CustomRowIndexColumn) mr); } } + +// Test fixture for deletion vectors delete count API +struct DeletionVectorsCountTests : public ParquetDeletionVectorsTest {}; + +TEST_F(DeletionVectorsCountTests, EmptyDeletionVector) +{ + auto const stream = cudf::get_default_stream(); + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{}; + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, std::numeric_limits::max(), stream); + EXPECT_EQ(result, 0); +} + +TEST_F(DeletionVectorsCountTests, NoRowIndex) +{ + auto constexpr num_rows = 50'000; + auto constexpr deletion_probability = 0.5; + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + auto row_indices = thrust::host_vector(num_rows); + std::iota(row_indices.begin(), row_indices.end(), size_t{0}); + + auto [deletion_vector, expected_row_mask_column] = build_deletion_vector_and_expected_row_mask( + num_rows, deletion_probability, row_indices, stream, mr); + + auto const expected_row_mask = cudf::detail::make_host_vector( + cudf::device_span(expected_row_mask_column->view().data(), num_rows), stream); + auto const expected_deleted = + std::count(expected_row_mask.begin(), expected_row_mask.end(), false); + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ + .serialized_roaring_bitmaps = {deletion_vector}, + .deletion_vector_row_counts = {num_rows}, + .row_group_offsets = {}, + .row_group_num_rows = {}}; + + for (auto chunk_size : {num_rows, num_rows / 2}) { + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, chunk_size, stream); + EXPECT_EQ(result, expected_deleted); + } +} + +TEST_F(DeletionVectorsCountTests, CustomRowIndex) +{ + auto constexpr num_rows = 25'000; + auto constexpr num_row_groups = 5; + auto constexpr deletion_probability = 0.4; + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Arbitrary row group offsets + auto row_group_offsets = std::vector(num_row_groups); + row_group_offsets[0] = static_cast(std::llround(1e9)); + std::transform( + cuda::counting_iterator(1), + cuda::counting_iterator(num_row_groups), + row_group_offsets.begin() + 1, + [&](auto i) { return static_cast(std::llround(row_group_offsets[i - 1] + 0.5e9)); }); + + // Split num_rows into spans + auto row_group_splits = std::vector(num_row_groups - 1); + { + std::mt19937 engine{0xdeaL}; + std::uniform_int_distribution dist{1, num_rows}; + std::generate(row_group_splits.begin(), row_group_splits.end(), [&]() { return dist(engine); }); + std::sort(row_group_splits.begin(), row_group_splits.end()); + } + + auto row_group_num_rows = std::vector{}; + { + row_group_num_rows.reserve(num_row_groups); + auto previous_split = cudf::size_type{0}; + std::transform(row_group_splits.begin(), + row_group_splits.end(), + std::back_inserter(row_group_num_rows), + [&](auto current_split) { + auto current_split_size = current_split - previous_split; + previous_split = current_split; + return current_split_size; + }); + row_group_num_rows.push_back(num_rows - row_group_splits.back()); + } + + auto expected_row_indices = + build_expected_row_indices(row_group_offsets, row_group_num_rows, num_rows); + + auto [deletion_vector, expected_row_mask_column] = build_deletion_vector_and_expected_row_mask( + num_rows, deletion_probability, expected_row_indices, stream, mr); + + auto const expected_row_mask = cudf::detail::make_host_vector( + cudf::device_span(expected_row_mask_column->view().data(), num_rows), stream); + size_t const expected_deleted = + std::count(expected_row_mask.begin(), expected_row_mask.end(), false); + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ + .serialized_roaring_bitmaps = {deletion_vector}, + .deletion_vector_row_counts = {num_rows}, + .row_group_offsets = row_group_offsets, + .row_group_num_rows = row_group_num_rows}; + + for (auto chunk_size : + {num_rows, static_cast(std::llround(row_group_splits.front() * 1.2))}) { + auto const num_deleted_rows = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, chunk_size, stream); + EXPECT_EQ(num_deleted_rows, expected_deleted); + } +} + +TEST_F(DeletionVectorsCountTests, MultipleDeletionVectors) +{ + auto constexpr num_rows_per_dv = 10'000; + auto constexpr num_deletion_vectors = 5; + auto constexpr num_rows = num_rows_per_dv * num_deletion_vectors; + auto constexpr deletion_probability = 0.3; + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Each deletion vector covers a separate row group with a distinct offset + auto row_group_offsets = std::vector(num_deletion_vectors); + auto row_group_num_rows = std::vector(num_deletion_vectors, num_rows_per_dv); + std::transform(cuda::counting_iterator(0), + cuda::counting_iterator(num_deletion_vectors), + row_group_offsets.begin(), + [](auto i) { return static_cast(i) * 100'000; }); + + auto expected_row_indices = + build_expected_row_indices(row_group_offsets, row_group_num_rows, num_rows); + + size_t total_expected_deleted = 0; + auto serialized_bitmaps = std::vector>{}; + auto bitmap_spans = std::vector>{}; + auto dv_row_counts = std::vector{}; + + for (int i = 0; i < num_deletion_vectors; ++i) { + auto span_start = i * num_rows_per_dv; + auto local_indices = + cudf::host_span(expected_row_indices.data() + span_start, num_rows_per_dv); + + auto [dv, mask_col] = build_deletion_vector_and_expected_row_mask( + num_rows_per_dv, deletion_probability, local_indices, stream, mr); + + auto const host_mask = cudf::detail::make_host_vector( + cudf::device_span(mask_col->view().data(), num_rows_per_dv), stream); + total_expected_deleted += std::count(host_mask.begin(), host_mask.end(), false); + + serialized_bitmaps.emplace_back(std::move(dv)); + dv_row_counts.push_back(num_rows_per_dv); + } + + bitmap_spans.reserve(num_deletion_vectors); + for (auto const& bm : serialized_bitmaps) { + bitmap_spans.emplace_back(bm); + } + + auto deletion_vector_info = cudf::io::parquet::experimental::deletion_vector_info{ + .serialized_roaring_bitmaps = bitmap_spans, + .deletion_vector_row_counts = dv_row_counts, + .row_group_offsets = row_group_offsets, + .row_group_num_rows = row_group_num_rows}; + + // All rows in one chunk + for (auto chunk_size : + {num_rows, static_cast(std::llround(num_rows_per_dv * 1.2))}) { + auto const result = cudf::io::parquet::experimental::compute_num_deleted_rows( + deletion_vector_info, chunk_size, stream); + EXPECT_EQ(result, total_expected_deleted); + } +}