Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,8 @@ pub fn iceberg::metadata_columns::partition_field(partition_fields: alloc::vec::
pub fn iceberg::metadata_columns::pos_field() -> &'static iceberg::spec::NestedFieldRef
pub fn iceberg::metadata_columns::row_id_field() -> &'static iceberg::spec::NestedFieldRef
pub fn iceberg::metadata_columns::spec_id_field() -> &'static iceberg::spec::NestedFieldRef
pub mod iceberg::partitioning
pub fn iceberg::partitioning::compute_unified_partition_type<'a>(partition_specs: impl core::iter::traits::iterator::Iterator<Item = &'a iceberg::spec::PartitionSpec>, schema: &iceberg::spec::Schema) -> iceberg::Result<iceberg::spec::StructType>
pub mod iceberg::puffin
pub enum iceberg::puffin::CompressionCodec
pub iceberg::puffin::CompressionCodec::Gzip(u8)
Expand Down Expand Up @@ -1234,6 +1236,7 @@ pub iceberg::scan::FileScanTask::project_field_ids: alloc::vec::Vec<i32>
pub iceberg::scan::FileScanTask::record_count: core::option::Option<u64>
pub iceberg::scan::FileScanTask::schema: iceberg::spec::SchemaRef
pub iceberg::scan::FileScanTask::start: u64
pub iceberg::scan::FileScanTask::unified_partition_type: core::option::Option<alloc::sync::Arc<iceberg::spec::StructType>>
impl iceberg::scan::FileScanTask
pub fn iceberg::scan::FileScanTask::data_file_path(&self) -> &str
pub fn iceberg::scan::FileScanTask::predicate(&self) -> core::option::Option<&iceberg::expr::BoundPredicate>
Expand All @@ -1248,7 +1251,7 @@ impl core::fmt::Debug for iceberg::scan::FileScanTask
pub fn iceberg::scan::FileScanTask::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl core::marker::StructuralPartialEq for iceberg::scan::FileScanTask
impl iceberg::scan::FileScanTask
pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), ())>
pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), (), ())>
impl serde_core::ser::Serialize for iceberg::scan::FileScanTask
pub fn iceberg::scan::FileScanTask::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer
impl<'de> serde_core::de::Deserialize<'de> for iceberg::scan::FileScanTask
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod reader;
/// RecordBatch projection utilities
pub mod record_batch_projector;
pub(crate) mod record_batch_transformer;
pub(crate) use record_batch_transformer::build_partition_column_constant;
mod scan_metrics;
mod value;

Expand Down
33 changes: 31 additions & 2 deletions crates/iceberg/src/arrow/reader/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ use super::{
ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
apply_name_mapping_to_arrow_schema,
};
use crate::arrow::build_partition_column_constant;
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
use crate::arrow::int96::coerce_int96_timestamps;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::record_batch_transformer::{
PartitionColumnConstant, RecordBatchTransformerBuilder,
};
use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
use crate::error::Result;
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::metadata_columns::{
RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_PARTITION, is_metadata_field,
};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::Datum;
use crate::{Error, ErrorKind};
Expand Down Expand Up @@ -255,6 +260,30 @@ impl FileScanTaskReader {
record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
}

// Add the _partition metadata struct column if it's in the projected fields.
// Computed lazily here at read time from the unified partition type + task's spec + data.
if task
.project_field_ids()
.contains(&RESERVED_FIELD_ID_PARTITION)
&& let Some(unified_type) = &task.unified_partition_type
{
if unified_type.fields().is_empty() {
// Unpartitioned table: empty struct rendered as null
let empty_constant = PartitionColumnConstant {
fields: arrow_schema::Fields::empty(),
child_values: vec![],
};
record_batch_transformer_builder = record_batch_transformer_builder
.with_partition_column_precomputed(empty_constant);
} else if let (Some(spec), Some(partition_data)) =
(&task.partition_spec, &task.partition)
{
let constant = build_partition_column_constant(unified_type, spec, partition_data)?;
record_batch_transformer_builder =
record_batch_transformer_builder.with_partition_column_precomputed(constant);
}
}

let mut record_batch_transformer = record_batch_transformer_builder.build();

if let Some(batch_size) = self.batch_size {
Expand Down
Loading
Loading