diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index 653649e6cf..b26408d349 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -1119,6 +1119,8 @@ pub fn iceberg::metadata_columns::partition_field(partition_fields: alloc::vec:: pub fn iceberg::metadata_columns::pos_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::row_id_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::spec_id_field() -> &'static iceberg::spec::NestedFieldRef +pub mod iceberg::partitioning +pub fn iceberg::partitioning::compute_unified_partition_type<'a>(partition_specs: impl core::iter::traits::iterator::Iterator, schema: &iceberg::spec::Schema) -> iceberg::Result pub mod iceberg::puffin pub enum iceberg::puffin::CompressionCodec pub iceberg::puffin::CompressionCodec::Gzip(u8) @@ -1234,6 +1236,7 @@ pub iceberg::scan::FileScanTask::project_field_ids: alloc::vec::Vec pub iceberg::scan::FileScanTask::record_count: core::option::Option pub iceberg::scan::FileScanTask::schema: iceberg::spec::SchemaRef pub iceberg::scan::FileScanTask::start: u64 +pub iceberg::scan::FileScanTask::unified_partition_type: core::option::Option> impl iceberg::scan::FileScanTask pub fn iceberg::scan::FileScanTask::data_file_path(&self) -> &str pub fn iceberg::scan::FileScanTask::predicate(&self) -> core::option::Option<&iceberg::expr::BoundPredicate> @@ -1248,7 +1251,7 @@ impl core::fmt::Debug for iceberg::scan::FileScanTask pub fn iceberg::scan::FileScanTask::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl core::marker::StructuralPartialEq for iceberg::scan::FileScanTask impl iceberg::scan::FileScanTask -pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), ())> +pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), (), ())> impl serde_core::ser::Serialize for iceberg::scan::FileScanTask pub fn iceberg::scan::FileScanTask::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer impl<'de> serde_core::de::Deserialize<'de> for iceberg::scan::FileScanTask diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index bf53633cfc..2330fc6940 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -32,6 +32,7 @@ mod reader; /// RecordBatch projection utilities pub mod record_batch_projector; pub(crate) mod record_batch_transformer; +pub(crate) use record_batch_transformer::build_partition_column_constant; mod scan_metrics; mod value; diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index ef38bc8b7c..ad88fe7072 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -31,13 +31,18 @@ use super::{ ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema, apply_name_mapping_to_arrow_schema, }; +use crate::arrow::build_partition_column_constant; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::int96::coerce_int96_timestamps; -use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; +use crate::arrow::record_batch_transformer::{ + PartitionColumnConstant, RecordBatchTransformerBuilder, +}; use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; use crate::error::Result; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; +use crate::metadata_columns::{ + RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_PARTITION, is_metadata_field, +}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::Datum; use crate::{Error, ErrorKind}; @@ -255,6 +260,30 @@ impl FileScanTaskReader { record_batch_transformer_builder.with_partition(partition_spec, partition_data)?; } + // Add the _partition metadata struct column if it's in the projected fields. + // Computed lazily here at read time from the unified partition type + task's spec + data. + if task + .project_field_ids() + .contains(&RESERVED_FIELD_ID_PARTITION) + && let Some(unified_type) = &task.unified_partition_type + { + if unified_type.fields().is_empty() { + // Unpartitioned table: empty struct rendered as null + let empty_constant = PartitionColumnConstant { + fields: arrow_schema::Fields::empty(), + child_values: vec![], + }; + record_batch_transformer_builder = record_batch_transformer_builder + .with_partition_column_precomputed(empty_constant); + } else if let (Some(spec), Some(partition_data)) = + (&task.partition_spec, &task.partition) + { + let constant = build_partition_column_constant(unified_type, spec, partition_data)?; + record_batch_transformer_builder = + record_batch_transformer_builder.with_partition_column_precomputed(constant); + } + } + let mut record_batch_transformer = record_batch_transformer_builder.build(); if let Some(batch_size) = self.batch_size { diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 439358435c..afa403f33a 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -20,18 +20,21 @@ use std::sync::Arc; use arrow_array::{ Array as ArrowArray, ArrayRef, Int32Array, RecordBatch, RecordBatchOptions, RunArray, + StructArray, }; use arrow_cast::cast; use arrow_schema::{ - DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, + DataType, Field, FieldRef, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + SchemaRef, }; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::value::{create_primitive_array_repeated, create_primitive_array_single_element}; use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema}; -use crate::metadata_columns::get_metadata_field; +use crate::metadata_columns::{RESERVED_FIELD_ID_PARTITION, get_metadata_field}; use crate::spec::{ - Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, + Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, StructType, + Transform, }; use crate::{Error, ErrorKind, Result}; @@ -59,11 +62,22 @@ fn constants_map( for (pos, field) in partition_spec.fields().iter().enumerate() { // Only identity transforms should use constant values from partition metadata if matches!(field.transform, Transform::Identity) { - // Get the field from schema to extract its type - let iceberg_field = schema.field_by_id(field.source_id).ok_or(Error::new( - ErrorKind::Unexpected, - format!("Field {} not found in schema", field.source_id), - ))?; + // Get the field from schema to extract its type. + // If the source column isn't in the schema (not projected), skip it -- + // the constant is only needed when the column is projected but can be + // served from partition metadata instead of reading from the data file. + let iceberg_field = match schema.field_by_id(field.source_id) { + Some(f) => f, + None => { + tracing::trace!( + "Skipping identity constant for partition field '{}' \ + (source_id {} not in projected schema)", + field.name, + field.source_id + ); + continue; + } + }; // Ensure the field type is primitive let prim_type = match &*iceberg_field.field_type { @@ -140,6 +154,13 @@ pub(crate) enum ColumnSource { target_type: DataType, value: Option, }, + + // A struct column where each child is a constant primitive value. + // Used for the _partition metadata column. + AddStructConstant { + fields: Fields, + child_values: Vec>, + }, // The iceberg spec refers to other permissible schema evolution actions // (see https://iceberg.apache.org/spec/#schema-evolution): // renaming fields, deleting fields and reordering fields. @@ -193,6 +214,16 @@ pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, constant_fields: HashMap, + partition_column: Option, +} + +/// Pre-computed data for the _partition struct constant. +#[derive(Debug, Clone, PartialEq)] +pub struct PartitionColumnConstant { + /// Arrow struct fields (names, types, nullability) for the _partition column. + pub fields: Fields, + /// Constant value for each child field. None means null (e.g., partition evolution). + pub child_values: Vec>, } impl RecordBatchTransformerBuilder { @@ -204,6 +235,7 @@ impl RecordBatchTransformerBuilder { snapshot_schema, projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(), constant_fields: HashMap::new(), + partition_column: None, } } @@ -240,11 +272,46 @@ impl RecordBatchTransformerBuilder { Ok(self) } + /// Set the _partition metadata column constant. + /// + /// This builds the struct constant for the _partition column from the unified partition + /// type (across all specs) and the current file's partition data. + /// + /// # Arguments + /// * `unified_partition_type` - The unified partition type across all specs + /// * `partition_spec` - The partition spec for this specific file + /// * `partition_data` - The partition values for this file + #[cfg(test)] + pub(crate) fn with_partition_column( + mut self, + unified_partition_type: &StructType, + partition_spec: &PartitionSpec, + partition_data: &Struct, + ) -> Result { + let partition_column = build_partition_column_constant( + unified_partition_type, + partition_spec, + partition_data, + )?; + self.partition_column = Some(partition_column); + Ok(self) + } + + /// Set a pre-computed _partition column constant directly. + pub(crate) fn with_partition_column_precomputed( + mut self, + partition_column: PartitionColumnConstant, + ) -> Self { + self.partition_column = Some(partition_column); + self + } + pub(crate) fn build(self) -> RecordBatchTransformer { RecordBatchTransformer { snapshot_schema: self.snapshot_schema, projected_iceberg_field_ids: self.projected_iceberg_field_ids, constant_fields: self.constant_fields, + partition_column: self.partition_column, batch_transform: None, } } @@ -288,6 +355,8 @@ pub(crate) struct RecordBatchTransformer { // Includes both virtual/metadata fields (like _file) and identity-partitioned fields // Datum holds both the Iceberg type and the value constant_fields: HashMap, + // Pre-computed _partition struct constant + partition_column: Option, // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file @@ -330,6 +399,7 @@ impl RecordBatchTransformer { self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, &self.constant_fields, + &self.partition_column, )?); self.process_record_batch(record_batch)? @@ -349,6 +419,7 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], constant_fields: &HashMap, + partition_column: &Option, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -359,6 +430,20 @@ impl RecordBatchTransformer { let fields: Result> = projected_iceberg_field_ids .iter() .map(|field_id| { + // Handle _partition struct column + if *field_id == RESERVED_FIELD_ID_PARTITION + && let Some(pc) = partition_column + { + let struct_type = DataType::Struct(pc.fields.clone()); + let nullable = pc.fields.is_empty(); + let arrow_field = Field::new("_partition", struct_type, nullable) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_PARTITION.to_string(), + )])); + return Ok(Arc::new(arrow_field)); + } + // Check if this is a constant field if constant_fields.contains_key(field_id) { // For metadata/virtual fields (like _file), get name from metadata_columns @@ -417,6 +502,7 @@ impl RecordBatchTransformer { projected_iceberg_field_ids, field_id_to_mapped_schema_map, constant_fields, + partition_column, )?, target_schema, }), @@ -474,6 +560,7 @@ impl RecordBatchTransformer { projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, constant_fields: &HashMap, + partition_column: &Option, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -481,6 +568,16 @@ impl RecordBatchTransformer { projected_iceberg_field_ids .iter() .map(|field_id| { + // Handle _partition struct column + if *field_id == RESERVED_FIELD_ID_PARTITION + && let Some(pc) = partition_column + { + return Ok(ColumnSource::AddStructConstant { + fields: pc.fields.clone(), + child_values: pc.child_values.clone(), + }); + } + // Check if this is a constant field (metadata/virtual or identity-partitioned) // Constant fields always use their pre-computed constant values, regardless of whether // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata @@ -615,6 +712,11 @@ impl RecordBatchTransformer { ColumnSource::Add { target_type, value } => { Self::create_column(target_type, value, num_rows)? } + + ColumnSource::AddStructConstant { + fields, + child_values, + } => Self::create_struct_column(fields, child_values, num_rows)?, }) }) .collect() @@ -656,6 +758,84 @@ impl RecordBatchTransformer { create_primitive_array_repeated(target_type, prim_lit, num_rows) } } + + fn create_struct_column( + fields: &Fields, + child_values: &[Option], + num_rows: usize, + ) -> Result { + if fields.is_empty() { + let nulls = arrow_buffer::NullBuffer::new_null(num_rows); + return Ok(Arc::new(StructArray::new_empty_fields( + num_rows, + Some(nulls), + ))); + } + + let child_arrays: Vec = fields + .iter() + .zip(child_values.iter()) + .map(|(field, value)| { + create_primitive_array_repeated(field.data_type(), value, num_rows) + }) + .collect::>()?; + + Ok(Arc::new(StructArray::new( + fields.clone(), + child_arrays, + None, + ))) + } +} + +/// Builds a [`PartitionColumnConstant`] from the unified partition type and a file's +/// partition spec/data. +/// +/// For each field in the unified partition type: +/// - If it corresponds to a field in this file's partition spec, use the value from partition_data +/// - Otherwise (partition evolution), use null +pub fn build_partition_column_constant( + unified_partition_type: &StructType, + partition_spec: &PartitionSpec, + partition_data: &Struct, +) -> Result { + use crate::arrow::type_to_arrow_type; + + let spec_fields = partition_spec.fields(); + + let mut arrow_fields = Vec::with_capacity(unified_partition_type.fields().len()); + let mut child_values = Vec::with_capacity(unified_partition_type.fields().len()); + + for unified_field in unified_partition_type.fields() { + let arrow_type = type_to_arrow_type(&unified_field.field_type)?; + // Don't attach PARQUET_FIELD_ID_META_KEY metadata to child fields -- + // Spark's output schema for _partition doesn't include it, and Arrow's + // Field::eq checks metadata, causing the schema adapter to insert an + // unnecessary cast operation per batch. + // Always nullable: partition evolution means any field may be absent + // for files written under a different spec. + let arrow_field = Field::new(&unified_field.name, arrow_type, true); + arrow_fields.push(Arc::new(arrow_field)); + + // Find matching field in this file's partition spec by field_id + let value = spec_fields + .iter() + .position(|f| f.field_id == unified_field.id) + .and_then(|pos| { + // Get the value from partition_data at this position + match &partition_data[pos] { + Some(Literal::Primitive(prim)) => Some(prim.clone()), + _ => None, + } + }); + + child_values.push(value); + } + + Ok(PartitionColumnConstant { + fields: Fields::from(arrow_fields), + child_values, + }) } #[cfg(test)] @@ -1675,4 +1855,199 @@ mod test { assert!(data_col.is_null(1)); assert!(data_col.is_null(2)); } + + #[test] + fn partition_column_struct_constant() { + use arrow_array::StructArray; + + use crate::metadata_columns::RESERVED_FIELD_ID_PARTITION; + use crate::spec::Transform; + + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: identity(id) + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("id", "id", Transform::Identity) + .unwrap() + .build() + .unwrap(), + ); + + // Unified partition type: just one field (same as the spec's type) + let unified_partition_type = partition_spec.partition_type(&snapshot_schema).unwrap(); + + // Partition data: id=42 + let partition_data = Struct::from_iter(vec![Some(Literal::int(42))]); + + // Parquet file has both columns + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "2"), + ])); + + // Project id, name, and _partition + let projected_field_ids = [1, 2, RESERVED_FIELD_ID_PARTITION]; + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) + .with_partition_column(&unified_partition_type, &partition_spec, &partition_data) + .unwrap() + .build(); + + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200, 300])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + assert_eq!(result.num_columns(), 3); + assert_eq!(result.num_rows(), 3); + + // id column from file + let id_col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_col.values(), &[100, 200, 300]); + + // name column from file + let name_col = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_col.value(0), "a"); + + // _partition struct column + let partition_col = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(partition_col.num_columns(), 1); + assert_eq!(partition_col.len(), 3); + + let inner = partition_col + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(inner.value(0), 42); + assert_eq!(inner.value(1), 42); + assert_eq!(inner.value(2), 42); + } + + #[test] + fn partition_column_with_evolution() { + use arrow_array::StructArray; + + use crate::metadata_columns::RESERVED_FIELD_ID_PARTITION; + use crate::partitioning::compute_unified_partition_type; + use crate::spec::Transform; + + // Schema with two fields that could be partition sources + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "year", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "month", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "data", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Old spec: partition by year only + let spec_v0 = crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("year", "year", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // New spec: partition by year and month + let spec_v1 = crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(1) + .add_partition_field("year", "year", Transform::Identity) + .unwrap() + .add_partition_field("month", "month", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Unified type includes both year and month + let unified_partition_type = + compute_unified_partition_type([&spec_v0, &spec_v1].into_iter(), &snapshot_schema) + .unwrap(); + + assert_eq!(unified_partition_type.fields().len(), 2); + + // File written with spec_v0 (only has year=2023) + let partition_data = Struct::from_iter(vec![Some(Literal::int(2023))]); + + let parquet_schema = Arc::new(ArrowSchema::new(vec![simple_field( + "data", + DataType::Utf8, + true, + "3", + )])); + + let projected_field_ids = [3, RESERVED_FIELD_ID_PARTITION]; + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) + .with_partition_column(&unified_partition_type, &spec_v0, &partition_data) + .unwrap() + .build(); + + let parquet_batch = + RecordBatch::try_new(parquet_schema, vec![Arc::new(StringArray::from(vec![ + "hello", "world", + ]))]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 2); + + // _partition struct has 2 fields: year (present) and month (null for this old spec file) + let partition_col = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(partition_col.num_columns(), 2); + + let year_col = partition_col + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(year_col.value(0), 2023); + assert_eq!(year_col.value(1), 2023); + + let month_col = partition_col + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(month_col.is_null(0)); + assert!(month_col.is_null(1)); + } } diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index d07233c420..221bf38820 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -909,6 +909,38 @@ pub(crate) fn create_primitive_array_repeated( let vals: Vec> = vec![None; num_rows]; Arc::new(BinaryArray::from_opt_vec(vals)) } + (DataType::LargeBinary, Some(PrimitiveLiteral::Binary(value))) => { + Arc::new(LargeBinaryArray::from_vec(vec![value; num_rows])) + } + (DataType::LargeBinary, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(LargeBinaryArray::from_opt_vec(vals)) + } + (DataType::FixedSizeBinary(len), Some(PrimitiveLiteral::Binary(value))) => { + let repeated: Vec<&[u8]> = vec![value.as_slice(); num_rows]; + Arc::new(FixedSizeBinaryArray::try_from_iter(repeated.into_iter()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to create FixedSizeBinary({len}) array: {e}"), + ) + })?) + } + (DataType::FixedSizeBinary(len), None) => { + let repeated: Vec> = vec![None; num_rows]; + Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(repeated.into_iter(), *len).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to create null FixedSizeBinary({len}) array: {e}"), + ) + })?) + } + (DataType::Time64(TimeUnit::Microsecond), Some(PrimitiveLiteral::Long(value))) => { + Arc::new(Time64MicrosecondArray::from(vec![*value; num_rows])) + } + (DataType::Time64(TimeUnit::Microsecond), None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Time64MicrosecondArray::from(vals)) + } (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(value))) => { Arc::new( Decimal128Array::from(vec![*value; num_rows]) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 4e346460f5..301992d15e 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -100,6 +100,7 @@ pub mod writer; mod delete_vector; pub mod metadata_columns; +pub mod partitioning; pub mod puffin; /// Utility functions and modules. pub mod util; diff --git a/crates/iceberg/src/partitioning.rs b/crates/iceberg/src/partitioning.rs new file mode 100644 index 0000000000..7b749c58b6 --- /dev/null +++ b/crates/iceberg/src/partitioning.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Partition type utilities for Iceberg tables. + +use crate::spec::{NestedField, NestedFieldRef, PartitionSpec, Schema, StructType, Transform}; +use crate::{Error, ErrorKind, Result}; + +/// Computes the unified partition type across all partition specs in the table. +/// +/// This is equivalent to Java's `Partitioning.partitionType(table)`. The result is a +/// StructType containing all partition fields ever used across all specs, enabling correct +/// representation of the `_partition` metadata column when partition evolution has occurred. +/// +/// Matches Java's behavior: +/// - Specs are sorted by spec_id in descending order (newer specs first), so newer field +/// names take precedence when deduplicating by field_id. +/// - Void transform fields (dropped partition columns) are skipped. +/// - Fields are deduplicated by field_id — each unique field_id appears exactly once. +/// +/// # Arguments +/// * `partition_specs` - Iterator over all partition specs in the table +/// * `schema` - The current table schema (needed to determine result types of transforms) +pub fn compute_unified_partition_type<'a>( + partition_specs: impl Iterator, + schema: &Schema, +) -> Result { + let mut seen_field_ids = std::collections::HashSet::new(); + let mut struct_fields: Vec = Vec::new(); + + // Sort specs by spec_id descending (newer first) to match Java's behavior: + // newer field names take precedence when deduplicating by field_id. + let mut specs: Vec<&PartitionSpec> = partition_specs.collect(); + specs.sort_by_key(|s| std::cmp::Reverse(s.spec_id())); + + for spec in specs { + for field in spec.fields() { + if seen_field_ids.contains(&field.field_id) { + continue; + } + + // Skip void transforms (dropped partition columns) + if matches!(field.transform, Transform::Void) { + continue; + } + + seen_field_ids.insert(field.field_id); + + let source_field = schema.field_by_id(field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "No column with source column id {} in schema for partition field {}", + field.source_id, field.name + ), + ) + })?; + + let res_type = field.transform.result_type(&source_field.field_type)?; + let nested = NestedField::optional(field.field_id, &field.name, res_type).into(); + struct_fields.push(nested); + } + } + + Ok(StructType::new(struct_fields)) +} diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 75672d9cbb..c1e63cf020 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -29,7 +29,7 @@ use crate::scan::{ }; use crate::spec::{ ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, NameMapping, SchemaRef, - SnapshotRef, TableMetadataRef, + SnapshotRef, StructType, TableMetadataRef, }; use crate::{Error, ErrorKind, Result}; @@ -48,6 +48,7 @@ pub(crate) struct ManifestFileContext { delete_file_index: DeleteFileIndex, name_mapping: Option>, case_sensitive: bool, + unified_partition_type: Option>, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -63,6 +64,7 @@ pub(crate) struct ManifestEntryContext { pub delete_file_index: DeleteFileIndex, pub name_mapping: Option>, pub case_sensitive: bool, + pub unified_partition_type: Option>, } impl ManifestFileContext { @@ -80,6 +82,7 @@ impl ManifestFileContext { delete_file_index, name_mapping, case_sensitive, + unified_partition_type, } = self; let manifest = object_cache.get_manifest(&manifest_file).await?; @@ -96,6 +99,7 @@ impl ManifestFileContext { delete_file_index: delete_file_index.clone(), name_mapping: name_mapping.clone(), case_sensitive, + unified_partition_type: unified_partition_type.clone(), }; sender @@ -138,6 +142,7 @@ impl ManifestEntryContext { // TODO: Pass actual PartitionSpec through context chain for native flow .with_partition_spec(None) .with_name_mapping(self.name_mapping) + .with_unified_partition_type(self.unified_partition_type.clone()) .with_case_sensitive(self.case_sensitive) .build()) } @@ -161,6 +166,8 @@ pub(crate) struct PlanContext { pub partition_filter_cache: Arc, pub manifest_evaluator_cache: Arc, pub expression_evaluator_cache: Arc, + + pub unified_partition_type: Option>, } impl PlanContext { @@ -284,6 +291,7 @@ impl PlanContext { delete_file_index, name_mapping: self.name_mapping.clone(), case_sensitive: self.case_sensitive, + unified_partition_type: self.unified_partition_type.clone(), } } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 86092313c6..779b01c06d 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -37,7 +37,10 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name}; +use crate::metadata_columns::{ + RESERVED_FIELD_ID_PARTITION, get_metadata_field_id, is_metadata_column_name, +}; +use crate::partitioning::compute_unified_partition_type; use crate::runtime::Runtime; use crate::spec::{DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, NameMapping, SnapshotRef}; use crate::table::Table; @@ -300,6 +303,20 @@ impl<'a> TableScanBuilder<'a> { .transpose()? .map(Arc::new); + // Compute unified partition type if _partition is projected + let unified_partition_type = if field_ids.contains(&RESERVED_FIELD_ID_PARTITION) { + let upt = compute_unified_partition_type( + self.table + .metadata() + .partition_specs_iter() + .map(|s| s.as_ref()), + &schema, + )?; + Some(Arc::new(upt)) + } else { + None + }; + let plan_context = PlanContext { snapshot, table_metadata: self.table.metadata_ref(), @@ -313,6 +330,7 @@ impl<'a> TableScanBuilder<'a> { partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + unified_partition_type, }; Ok(TableScan { diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index f3b556bcbf..0c36dd812c 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -25,7 +25,7 @@ use crate::Result; use crate::expr::BoundPredicate; use crate::spec::{ DataContentType, DataFileFormat, ManifestEntryRef, NameMapping, PartitionSpec, Schema, - SchemaRef, Struct, + SchemaRef, Struct, StructType, }; /// A stream of [`FileScanTask`]. @@ -116,6 +116,17 @@ pub struct FileScanTask { #[builder(default)] pub name_mapping: Option>, + /// The unified partition type across all specs in the table. + /// When `RESERVED_FIELD_ID_PARTITION` is in the projected field IDs, the reader + /// uses this type along with the task's partition_spec and partition data to + /// materialize the `_partition` struct column at read time. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + #[builder(default)] + pub unified_partition_type: Option>, + /// Whether this scan task should treat column names as case-sensitive when binding predicates. pub case_sensitive: bool, }