From 26c8eb0d682d11c5dba4d2720a91c8a364d32383 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 15 Jun 2026 11:47:09 -0700 Subject: [PATCH 1/5] feat: add support for _partition metadata column --- crates/iceberg/src/arrow/mod.rs | 1 + crates/iceberg/src/arrow/reader/pipeline.rs | 10 +- .../src/arrow/record_batch_transformer.rs | 390 +++++++++++++++++- crates/iceberg/src/arrow/value.rs | 32 ++ crates/iceberg/src/metadata_columns.rs | 49 ++- crates/iceberg/src/scan/context.rs | 37 +- crates/iceberg/src/scan/mod.rs | 17 +- crates/iceberg/src/scan/task.rs | 16 + crates/iceberg/src/spec/schema/_serde.rs | 132 ++++-- 9 files changed, 640 insertions(+), 44 deletions(-) diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index bf53633cfc..fa99944dca 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -32,6 +32,7 @@ mod reader; /// RecordBatch projection utilities pub mod record_batch_projector; pub(crate) mod record_batch_transformer; +pub use record_batch_transformer::{PartitionColumnConstant, build_partition_column_constant}; mod scan_metrics; mod value; diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index ef38bc8b7c..d4d919fe1d 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -37,7 +37,7 @@ use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; use crate::error::Result; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; +use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_PARTITION, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::Datum; use crate::{Error, ErrorKind}; @@ -255,6 +255,14 @@ impl FileScanTaskReader { record_batch_transformer_builder.with_partition(partition_spec, partition_data)?; } + // Add the _partition metadata struct column if it's in the projected fields + if task.project_field_ids().contains(&RESERVED_FIELD_ID_PARTITION) + && let Some(partition_column_constant) = &task.partition_column_constant + { + record_batch_transformer_builder = record_batch_transformer_builder + .with_partition_column_precomputed((**partition_column_constant).clone()); + } + let mut record_batch_transformer = record_batch_transformer_builder.build(); if let Some(batch_size) = self.batch_size { diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 439358435c..4b9fb82bf5 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -20,18 +20,21 @@ use std::sync::Arc; use arrow_array::{ Array as ArrowArray, ArrayRef, Int32Array, RecordBatch, RecordBatchOptions, RunArray, + StructArray, }; use arrow_cast::cast; use arrow_schema::{ - DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, + DataType, Field, FieldRef, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + SchemaRef, }; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::value::{create_primitive_array_repeated, create_primitive_array_single_element}; use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema}; -use crate::metadata_columns::get_metadata_field; +use crate::metadata_columns::{RESERVED_FIELD_ID_PARTITION, get_metadata_field}; use crate::spec::{ - Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, + Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, + Struct, StructType, Transform, }; use crate::{Error, ErrorKind, Result}; @@ -59,11 +62,22 @@ fn constants_map( for (pos, field) in partition_spec.fields().iter().enumerate() { // Only identity transforms should use constant values from partition metadata if matches!(field.transform, Transform::Identity) { - // Get the field from schema to extract its type - let iceberg_field = schema.field_by_id(field.source_id).ok_or(Error::new( - ErrorKind::Unexpected, - format!("Field {} not found in schema", field.source_id), - ))?; + // Get the field from schema to extract its type. + // If the source column isn't in the schema (not projected), skip it -- + // the constant is only needed when the column is projected but can be + // served from partition metadata instead of reading from the data file. + let iceberg_field = match schema.field_by_id(field.source_id) { + Some(f) => f, + None => { + tracing::trace!( + "Skipping identity constant for partition field '{}' \ + (source_id {} not in projected schema)", + field.name, + field.source_id + ); + continue; + } + }; // Ensure the field type is primitive let prim_type = match &*iceberg_field.field_type { @@ -140,6 +154,13 @@ pub(crate) enum ColumnSource { target_type: DataType, value: Option, }, + + // A struct column where each child is a constant primitive value. + // Used for the _partition metadata column. + AddStructConstant { + fields: Fields, + child_values: Vec>, + }, // The iceberg spec refers to other permissible schema evolution actions // (see https://iceberg.apache.org/spec/#schema-evolution): // renaming fields, deleting fields and reordering fields. @@ -193,6 +214,16 @@ pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, constant_fields: HashMap, + partition_column: Option, +} + +/// Pre-computed data for the _partition struct constant. +#[derive(Debug, Clone, PartialEq)] +pub struct PartitionColumnConstant { + /// Arrow struct fields (names, types, nullability) for the _partition column. + pub fields: Fields, + /// Constant value for each child field. None means null (e.g., partition evolution). + pub child_values: Vec>, } impl RecordBatchTransformerBuilder { @@ -204,6 +235,7 @@ impl RecordBatchTransformerBuilder { snapshot_schema, projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(), constant_fields: HashMap::new(), + partition_column: None, } } @@ -240,11 +272,43 @@ impl RecordBatchTransformerBuilder { Ok(self) } + /// Set the _partition metadata column constant. + /// + /// This builds the struct constant for the _partition column from the unified partition + /// type (across all specs) and the current file's partition data. + /// + /// # Arguments + /// * `unified_partition_type` - The unified partition type across all specs + /// * `partition_spec` - The partition spec for this specific file + /// * `partition_data` - The partition values for this file + #[cfg(test)] + pub(crate) fn with_partition_column( + mut self, + unified_partition_type: &StructType, + partition_spec: &PartitionSpec, + partition_data: &Struct, + ) -> Result { + let partition_column = + build_partition_column_constant(unified_partition_type, partition_spec, partition_data)?; + self.partition_column = Some(partition_column); + Ok(self) + } + + /// Set a pre-computed _partition column constant directly. + pub(crate) fn with_partition_column_precomputed( + mut self, + partition_column: PartitionColumnConstant, + ) -> Self { + self.partition_column = Some(partition_column); + self + } + pub(crate) fn build(self) -> RecordBatchTransformer { RecordBatchTransformer { snapshot_schema: self.snapshot_schema, projected_iceberg_field_ids: self.projected_iceberg_field_ids, constant_fields: self.constant_fields, + partition_column: self.partition_column, batch_transform: None, } } @@ -288,6 +352,8 @@ pub(crate) struct RecordBatchTransformer { // Includes both virtual/metadata fields (like _file) and identity-partitioned fields // Datum holds both the Iceberg type and the value constant_fields: HashMap, + // Pre-computed _partition struct constant + partition_column: Option, // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file @@ -330,6 +396,7 @@ impl RecordBatchTransformer { self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, &self.constant_fields, + &self.partition_column, )?); self.process_record_batch(record_batch)? @@ -349,6 +416,7 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], constant_fields: &HashMap, + partition_column: &Option, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -359,6 +427,20 @@ impl RecordBatchTransformer { let fields: Result> = projected_iceberg_field_ids .iter() .map(|field_id| { + // Handle _partition struct column + if *field_id == RESERVED_FIELD_ID_PARTITION + && let Some(pc) = partition_column + { + let struct_type = DataType::Struct(pc.fields.clone()); + let nullable = pc.fields.is_empty(); + let arrow_field = Field::new("_partition", struct_type, nullable) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_PARTITION.to_string(), + )])); + return Ok(Arc::new(arrow_field)); + } + // Check if this is a constant field if constant_fields.contains_key(field_id) { // For metadata/virtual fields (like _file), get name from metadata_columns @@ -417,6 +499,7 @@ impl RecordBatchTransformer { projected_iceberg_field_ids, field_id_to_mapped_schema_map, constant_fields, + partition_column, )?, target_schema, }), @@ -474,6 +557,7 @@ impl RecordBatchTransformer { projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, constant_fields: &HashMap, + partition_column: &Option, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -481,6 +565,16 @@ impl RecordBatchTransformer { projected_iceberg_field_ids .iter() .map(|field_id| { + // Handle _partition struct column + if *field_id == RESERVED_FIELD_ID_PARTITION + && let Some(pc) = partition_column + { + return Ok(ColumnSource::AddStructConstant { + fields: pc.fields.clone(), + child_values: pc.child_values.clone(), + }); + } + // Check if this is a constant field (metadata/virtual or identity-partitioned) // Constant fields always use their pre-computed constant values, regardless of whether // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata @@ -615,6 +709,11 @@ impl RecordBatchTransformer { ColumnSource::Add { target_type, value } => { Self::create_column(target_type, value, num_rows)? } + + ColumnSource::AddStructConstant { + fields, + child_values, + } => Self::create_struct_column(fields, child_values, num_rows)?, }) }) .collect() @@ -656,6 +755,85 @@ impl RecordBatchTransformer { create_primitive_array_repeated(target_type, prim_lit, num_rows) } } + + fn create_struct_column( + fields: &Fields, + child_values: &[Option], + num_rows: usize, + ) -> Result { + if fields.is_empty() { + let nulls = arrow_buffer::NullBuffer::new_null(num_rows); + return Ok(Arc::new(StructArray::new_empty_fields(num_rows, Some(nulls)))); + } + + let child_arrays: Vec = fields + .iter() + .zip(child_values.iter()) + .map(|(field, value)| { + create_primitive_array_repeated(field.data_type(), value, num_rows) + }) + .collect::>()?; + + Ok(Arc::new(StructArray::new( + fields.clone(), + child_arrays, + None, + ))) + } +} + +/// Builds a [`PartitionColumnConstant`] from the unified partition type and a file's +/// partition spec/data. +/// +/// For each field in the unified partition type: +/// - If it corresponds to a field in this file's partition spec, use the value from partition_data +/// - Otherwise (partition evolution), use null +pub fn build_partition_column_constant( + unified_partition_type: &StructType, + partition_spec: &PartitionSpec, + partition_data: &Struct, +) -> Result { + use crate::arrow::type_to_arrow_type; + + let spec_fields = partition_spec.fields(); + + let mut arrow_fields = Vec::with_capacity(unified_partition_type.fields().len()); + let mut child_values = Vec::with_capacity(unified_partition_type.fields().len()); + + for unified_field in unified_partition_type.fields() { + let arrow_type = type_to_arrow_type(&unified_field.field_type)?; + // Don't attach PARQUET_FIELD_ID_META_KEY metadata to child fields -- + // Spark's output schema for _partition doesn't include it, and Arrow's + // Field::eq checks metadata, causing the schema adapter to insert an + // unnecessary cast operation per batch. + // Always nullable: partition evolution means any field may be absent + // for files written under a different spec. + let arrow_field = Field::new( + &unified_field.name, + arrow_type, + true, + ); + arrow_fields.push(Arc::new(arrow_field)); + + // Find matching field in this file's partition spec by field_id + let value = spec_fields + .iter() + .position(|f| f.field_id == unified_field.id) + .and_then(|pos| { + // Get the value from partition_data at this position + match &partition_data[pos] { + Some(Literal::Primitive(prim)) => Some(prim.clone()), + _ => None, + } + }); + + child_values.push(value); + } + + Ok(PartitionColumnConstant { + fields: Fields::from(arrow_fields), + child_values, + }) } #[cfg(test)] @@ -1675,4 +1853,200 @@ mod test { assert!(data_col.is_null(1)); assert!(data_col.is_null(2)); } + + #[test] + fn partition_column_struct_constant() { + use arrow_array::StructArray; + + use crate::metadata_columns::RESERVED_FIELD_ID_PARTITION; + use crate::spec::Transform; + + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: identity(id) + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("id", "id", Transform::Identity) + .unwrap() + .build() + .unwrap(), + ); + + // Unified partition type: just one field (same as the spec's type) + let unified_partition_type = + partition_spec.partition_type(&snapshot_schema).unwrap(); + + // Partition data: id=42 + let partition_data = Struct::from_iter(vec![Some(Literal::int(42))]); + + // Parquet file has both columns + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "2"), + ])); + + // Project id, name, and _partition + let projected_field_ids = [1, 2, RESERVED_FIELD_ID_PARTITION]; + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) + .with_partition_column(&unified_partition_type, &partition_spec, &partition_data) + .unwrap() + .build(); + + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200, 300])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + assert_eq!(result.num_columns(), 3); + assert_eq!(result.num_rows(), 3); + + // id column from file + let id_col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_col.values(), &[100, 200, 300]); + + // name column from file + let name_col = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_col.value(0), "a"); + + // _partition struct column + let partition_col = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(partition_col.num_columns(), 1); + assert_eq!(partition_col.len(), 3); + + let inner = partition_col + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(inner.value(0), 42); + assert_eq!(inner.value(1), 42); + assert_eq!(inner.value(2), 42); + } + + #[test] + fn partition_column_with_evolution() { + use arrow_array::StructArray; + + use crate::metadata_columns::{RESERVED_FIELD_ID_PARTITION, compute_unified_partition_type}; + use crate::spec::Transform; + + // Schema with two fields that could be partition sources + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "year", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "month", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "data", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Old spec: partition by year only + let spec_v0 = crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("year", "year", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // New spec: partition by year and month + let spec_v1 = crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(1) + .add_partition_field("year", "year", Transform::Identity) + .unwrap() + .add_partition_field("month", "month", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Unified type includes both year and month + let unified_partition_type = compute_unified_partition_type( + [&spec_v0, &spec_v1].into_iter(), + &snapshot_schema, + ) + .unwrap(); + + assert_eq!(unified_partition_type.fields().len(), 2); + + // File written with spec_v0 (only has year=2023) + let partition_data = Struct::from_iter(vec![Some(Literal::int(2023))]); + + let parquet_schema = Arc::new(ArrowSchema::new(vec![simple_field( + "data", + DataType::Utf8, + true, + "3", + )])); + + let projected_field_ids = [3, RESERVED_FIELD_ID_PARTITION]; + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) + .with_partition_column(&unified_partition_type, &spec_v0, &partition_data) + .unwrap() + .build(); + + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![Arc::new( + StringArray::from(vec!["hello", "world"]), + )]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 2); + + // _partition struct has 2 fields: year (present) and month (null for this old spec file) + let partition_col = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(partition_col.num_columns(), 2); + + let year_col = partition_col + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(year_col.value(0), 2023); + assert_eq!(year_col.value(1), 2023); + + let month_col = partition_col + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(month_col.is_null(0)); + assert!(month_col.is_null(1)); + } } diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index d07233c420..221bf38820 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -909,6 +909,38 @@ pub(crate) fn create_primitive_array_repeated( let vals: Vec> = vec![None; num_rows]; Arc::new(BinaryArray::from_opt_vec(vals)) } + (DataType::LargeBinary, Some(PrimitiveLiteral::Binary(value))) => { + Arc::new(LargeBinaryArray::from_vec(vec![value; num_rows])) + } + (DataType::LargeBinary, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(LargeBinaryArray::from_opt_vec(vals)) + } + (DataType::FixedSizeBinary(len), Some(PrimitiveLiteral::Binary(value))) => { + let repeated: Vec<&[u8]> = vec![value.as_slice(); num_rows]; + Arc::new(FixedSizeBinaryArray::try_from_iter(repeated.into_iter()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to create FixedSizeBinary({len}) array: {e}"), + ) + })?) + } + (DataType::FixedSizeBinary(len), None) => { + let repeated: Vec> = vec![None; num_rows]; + Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(repeated.into_iter(), *len).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to create null FixedSizeBinary({len}) array: {e}"), + ) + })?) + } + (DataType::Time64(TimeUnit::Microsecond), Some(PrimitiveLiteral::Long(value))) => { + Arc::new(Time64MicrosecondArray::from(vec![*value; num_rows])) + } + (DataType::Time64(TimeUnit::Microsecond), None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Time64MicrosecondArray::from(vals)) + } (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(value))) => { Arc::new( Decimal128Array::from(vec![*value; num_rows]) diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs index b622a76edc..d272c38a98 100644 --- a/crates/iceberg/src/metadata_columns.rs +++ b/crates/iceberg/src/metadata_columns.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use once_cell::sync::Lazy; -use crate::spec::{NestedField, NestedFieldRef, PrimitiveType, Type}; +use crate::spec::{NestedField, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, StructType, Type}; use crate::{Error, ErrorKind, Result}; /// Reserved field ID for the file path (_file) column per Iceberg spec @@ -379,6 +379,53 @@ pub fn partition_field(partition_fields: Vec) -> NestedFieldRef ) } +/// Computes the unified partition type across all partition specs in the table. +/// +/// This is equivalent to Java's `Partitioning.partitionType(table)`. The result is a +/// StructType containing all partition fields ever used across all specs, enabling correct +/// representation of the `_partition` metadata column when partition evolution has occurred. +/// +/// For each spec, the partition fields are derived from the transform applied to the source +/// column. Fields are deduplicated by field_id - each unique field_id appears exactly once +/// in the result. +/// +/// # Arguments +/// * `partition_specs` - Iterator over all partition specs in the table +/// * `schema` - The current table schema (needed to determine result types of transforms) +pub fn compute_unified_partition_type<'a>( + partition_specs: impl Iterator, + schema: &Schema, +) -> Result { + let mut seen_field_ids = std::collections::HashSet::new(); + let mut struct_fields: Vec = Vec::new(); + + for spec in partition_specs { + for field in spec.fields() { + if seen_field_ids.contains(&field.field_id) { + continue; + } + seen_field_ids.insert(field.field_id); + + let source_field = schema.field_by_id(field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "No column with source column id {} in schema for partition field {}", + field.source_id, field.name + ), + ) + })?; + + let res_type = field.transform.result_type(&source_field.field_type)?; + let nested = + NestedField::optional(field.field_id, &field.name, res_type).into(); + struct_fields.push(nested); + } + } + + Ok(StructType::new(struct_fields)) +} + /// Returns the Iceberg field definition for a metadata field ID. /// /// Note: This function does not support `_partition` (field ID `i32::MAX - 5`) because diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 75672d9cbb..732f318844 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use futures::channel::mpsc::Sender; use futures::{SinkExt, TryFutureExt}; +use crate::arrow::record_batch_transformer::build_partition_column_constant; use crate::delete_file_index::DeleteFileIndex; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::object_cache::ObjectCache; @@ -29,7 +30,7 @@ use crate::scan::{ }; use crate::spec::{ ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, NameMapping, SchemaRef, - SnapshotRef, TableMetadataRef, + SnapshotRef, StructType, TableMetadataRef, }; use crate::{Error, ErrorKind, Result}; @@ -48,6 +49,8 @@ pub(crate) struct ManifestFileContext { delete_file_index: DeleteFileIndex, name_mapping: Option>, case_sensitive: bool, + table_metadata: TableMetadataRef, + unified_partition_type: Option>, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -63,6 +66,8 @@ pub(crate) struct ManifestEntryContext { pub delete_file_index: DeleteFileIndex, pub name_mapping: Option>, pub case_sensitive: bool, + pub table_metadata: TableMetadataRef, + pub unified_partition_type: Option>, } impl ManifestFileContext { @@ -79,7 +84,9 @@ impl ManifestFileContext { expression_evaluator_cache, delete_file_index, name_mapping, - case_sensitive, + table_metadata, + unified_partition_type, + .. } = self; let manifest = object_cache.get_manifest(&manifest_file).await?; @@ -96,6 +103,8 @@ impl ManifestFileContext { delete_file_index: delete_file_index.clone(), name_mapping: name_mapping.clone(), case_sensitive, + table_metadata: table_metadata.clone(), + unified_partition_type: unified_partition_type.clone(), }; sender @@ -120,6 +129,25 @@ impl ManifestEntryContext { ) .await; + // Compute the _partition struct constant if the unified partition type is available + let partition_column_constant = if let Some(ref unified_partition_type) = self.unified_partition_type { + let partition_spec = self + .table_metadata + .partition_spec_by_id(self.partition_spec_id); + if let Some(spec) = partition_spec { + let constant = build_partition_column_constant( + unified_partition_type, + spec, + &self.manifest_entry.data_file.partition, + )?; + Some(Arc::new(constant)) + } else { + None + } + } else { + None + }; + Ok(FileScanTask::builder() .with_file_size_in_bytes(self.manifest_entry.file_size_in_bytes()) .with_start(0) @@ -138,6 +166,7 @@ impl ManifestEntryContext { // TODO: Pass actual PartitionSpec through context chain for native flow .with_partition_spec(None) .with_name_mapping(self.name_mapping) + .with_partition_column_constant(partition_column_constant) .with_case_sensitive(self.case_sensitive) .build()) } @@ -161,6 +190,8 @@ pub(crate) struct PlanContext { pub partition_filter_cache: Arc, pub manifest_evaluator_cache: Arc, pub expression_evaluator_cache: Arc, + + pub unified_partition_type: Option>, } impl PlanContext { @@ -284,6 +315,8 @@ impl PlanContext { delete_file_index, name_mapping: self.name_mapping.clone(), case_sensitive: self.case_sensitive, + table_metadata: self.table_metadata.clone(), + unified_partition_type: self.unified_partition_type.clone(), } } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 86092313c6..ba813a1267 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -37,7 +37,10 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name}; +use crate::metadata_columns::{ + RESERVED_FIELD_ID_PARTITION, compute_unified_partition_type, get_metadata_field_id, + is_metadata_column_name, +}; use crate::runtime::Runtime; use crate::spec::{DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, NameMapping, SnapshotRef}; use crate::table::Table; @@ -300,6 +303,17 @@ impl<'a> TableScanBuilder<'a> { .transpose()? .map(Arc::new); + // Compute unified partition type if _partition is projected + let unified_partition_type = if field_ids.contains(&RESERVED_FIELD_ID_PARTITION) { + let upt = compute_unified_partition_type( + self.table.metadata().partition_specs_iter().map(|s| s.as_ref()), + &schema, + )?; + Some(Arc::new(upt)) + } else { + None + }; + let plan_context = PlanContext { snapshot, table_metadata: self.table.metadata_ref(), @@ -313,6 +327,7 @@ impl<'a> TableScanBuilder<'a> { partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + unified_partition_type, }; Ok(TableScan { diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index f3b556bcbf..9a413d7fee 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize, Serializer}; use typed_builder::TypedBuilder; use crate::Result; +use crate::arrow::record_batch_transformer::PartitionColumnConstant; use crate::expr::BoundPredicate; use crate::spec::{ DataContentType, DataFileFormat, ManifestEntryRef, NameMapping, PartitionSpec, Schema, @@ -116,6 +117,21 @@ pub struct FileScanTask { #[builder(default)] pub name_mapping: Option>, + /// Pre-computed constant for the `_partition` metadata struct column. + /// Populated by scan planning (or externally by Comet's planner) when + /// `RESERVED_FIELD_ID_PARTITION` is in the projected field IDs. + /// + /// NOTE: This field is not serializable. FileScanTask cannot be + /// serialized/deserialized when this is populated. This is consistent with + /// partition_spec and name_mapping above, and is acceptable because Comet + /// builds tasks in-process (no serialization round-trip). + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + #[builder(default)] + pub partition_column_constant: Option>, + /// Whether this scan task should treat column names as case-sensitive when binding predicates. pub case_sensitive: bool, } diff --git a/crates/iceberg/src/spec/schema/_serde.rs b/crates/iceberg/src/spec/schema/_serde.rs index 4b0011835a..32832c5b89 100644 --- a/crates/iceberg/src/spec/schema/_serde.rs +++ b/crates/iceberg/src/spec/schema/_serde.rs @@ -15,15 +15,13 @@ // specific language governing permissions and limitations // under the License. -/// This is a helper module that defines types to help with serialization/deserialization. -/// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct -/// and then converted into the [Schema] struct. Serialization works the other way around. -/// [SchemaV1] and [SchemaV2] are internal struct that are only used for serialization and deserialization. +//! Helper types for Schema serialization/deserialization. +//! +//! For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct +//! and then converted into the [Schema] struct. Serialization works the other way around. +//! [SchemaV1] and [SchemaV2] are internal structs only used for serialization and deserialization. + use serde::Deserialize; -/// This is a helper module that defines types to help with serialization/deserialization. -/// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct -/// and then converted into the [Schema] struct. Serialization works the other way around. -/// [SchemaV1] and [SchemaV2] are internal struct that are only used for serialization and deserialization. use serde::Serialize; use super::{DEFAULT_SCHEMA_ID, Schema}; @@ -32,7 +30,9 @@ use crate::{Error, Result}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(untagged)] -/// Enum for Schema serialization/deserializaion +// IMPORTANT: V2 must precede V1. Serde untagged tries variants in declaration order; +// V2's required `schema_id: i32` distinguishes it from V1's optional field. +// Swapping the order will silently break deserialization. pub(super) enum SchemaEnum { V2(SchemaV2), V1(SchemaV1), @@ -51,7 +51,11 @@ pub(crate) struct SchemaV2 { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] -/// Defines the structure of a v1 schema for serialization/deserialization +/// Defines the structure of a v1 schema for serialization/deserialization. +/// +/// This is a permissive fallback shape for JSON lacking `schema-id`, +/// not a faithful V1 spec representation. It accepts `identifier-field-ids` +/// even though the V1 spec doesn't define them (Postel's law). pub(crate) struct SchemaV1 { #[serde(skip_serializing_if = "Option::is_none")] pub schema_id: Option, @@ -61,7 +65,6 @@ pub(crate) struct SchemaV1 { pub fields: StructType, } -/// Helper to serialize/deserializa Schema impl TryFrom for Schema { type Error = Error; fn try_from(value: SchemaEnum) -> Result { @@ -72,6 +75,9 @@ impl TryFrom for Schema { } } +// Always serialize as V2. The Iceberg spec treats V2 as the canonical format; +// V1 schemas are upgraded on read. Schema-id defaults to 0 when absent from +// the source JSON. impl From for SchemaEnum { fn from(value: Schema) -> Self { SchemaEnum::V2(value.into()) @@ -100,6 +106,9 @@ impl TryFrom for Schema { } } +// Note: alias_to_id is intentionally excluded from serialization. +// Per Iceberg spec, aliases are not part of the JSON schema representation. +// They must be reconstructed from external sources after deserialization. impl From for SchemaV2 { fn from(value: Schema) -> Self { SchemaV2 { @@ -107,7 +116,9 @@ impl From for SchemaV2 { identifier_field_ids: if value.identifier_field_ids.is_empty() { None } else { - Some(value.identifier_field_ids.into_iter().collect()) + let mut ids: Vec = value.identifier_field_ids.into_iter().collect(); + ids.sort_unstable(); + Some(ids) }, fields: value.r#struct, } @@ -121,7 +132,9 @@ impl From for SchemaV1 { identifier_field_ids: if value.identifier_field_ids.is_empty() { None } else { - Some(value.identifier_field_ids.into_iter().collect()) + let mut ids: Vec = value.identifier_field_ids.into_iter().collect(); + ids.sort_unstable(); + Some(ids) }, fields: value.r#struct, } @@ -134,39 +147,48 @@ mod tests { use crate::spec::schema::tests::table_schema_simple; use crate::spec::{PrimitiveType, Type}; - fn check_schema_serde(json: &str, expected_type: Schema, _expected_enum: SchemaEnum) { - let desered_type: Schema = serde_json::from_str(json).unwrap(); - assert_eq!(desered_type, expected_type); - assert!(matches!(desered_type.clone(), _expected_enum)); + fn check_schema_serde(json: &str, expected_schema: Schema) { + let deserialized_schema: Schema = serde_json::from_str(json).unwrap(); + assert_eq!(deserialized_schema, expected_schema); - let sered_json = serde_json::to_string(&expected_type).unwrap(); - let parsed_json_value = serde_json::from_str::(&sered_json).unwrap(); + let serialized_json = serde_json::to_string(&expected_schema).unwrap(); + let round_tripped: Schema = serde_json::from_str(&serialized_json).unwrap(); - assert_eq!(parsed_json_value, desered_type); + assert_eq!(round_tripped, deserialized_schema); } #[test] fn test_serde_with_schema_id() { let (schema, record) = table_schema_simple(); - let x: SchemaV2 = serde_json::from_str(record).unwrap(); - check_schema_serde(record, schema, SchemaEnum::V2(x)); + check_schema_serde(record, schema.clone()); + + // Verify it deserializes as V2 (has schema-id) + let schema_enum: SchemaEnum = serde_json::from_str(record).unwrap(); + assert!(matches!(schema_enum, SchemaEnum::V2(_))); } #[test] fn test_serde_without_schema_id() { - let (mut schema, record) = table_schema_simple(); - // we remove the ""schema-id": 1," string from example - let new_record = record.replace("\"schema-id\":1,", ""); - // By default schema_id field is set to DEFAULT_SCHEMA_ID when no value is set in json - schema.schema_id = DEFAULT_SCHEMA_ID; - - let x: SchemaV1 = serde_json::from_str(new_record.as_str()).unwrap(); - check_schema_serde(&new_record, schema, SchemaEnum::V1(x)); + let (schema, _) = table_schema_simple(); + + // Construct a V1 JSON without schema-id using programmatic manipulation + let mut json_value: serde_json::Value = serde_json::to_value(&schema).unwrap(); + json_value.as_object_mut().unwrap().remove("schema-id"); + let v1_json = serde_json::to_string(&json_value).unwrap(); + + let mut expected = schema; + expected.schema_id = DEFAULT_SCHEMA_ID; + + check_schema_serde(&v1_json, expected); + + // Verify it deserializes as V1 (no schema-id) + let schema_enum: SchemaEnum = serde_json::from_str(&v1_json).unwrap(); + assert!(matches!(schema_enum, SchemaEnum::V1(_))); } #[test] - fn schema() { + fn test_schema_v2_fields() { let record = r#" { "type": "struct", @@ -201,4 +223,52 @@ mod tests { assert_eq!(2, result.fields[1].id); assert!(!result.fields[1].required); } + + #[test] + fn test_derived_fields_work_after_round_trip() { + let (schema, record) = table_schema_simple(); + let deserialized: Schema = serde_json::from_str(record).unwrap(); + + // Verify lookup by name works (exercises name_to_id index) + assert_eq!( + deserialized.field_by_name("foo").map(|f| f.id), + schema.field_by_name("foo").map(|f| f.id) + ); + assert_eq!( + deserialized.field_by_name("bar").map(|f| f.id), + schema.field_by_name("bar").map(|f| f.id) + ); + + // Verify field_by_id works (exercises id_to_field index) + assert!(deserialized.field_by_id(1).is_some()); + assert!(deserialized.field_by_id(2).is_some()); + assert!(deserialized.field_by_id(999).is_none()); + } + + #[test] + fn test_identifier_field_ids_sorted_on_serialization() { + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![3, 1, 2]) + .with_fields(vec![ + crate::spec::NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)) + .into(), + crate::spec::NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)) + .into(), + crate::spec::NestedField::required(3, "c", Type::Primitive(PrimitiveType::Int)) + .into(), + ]) + .build() + .unwrap(); + + let serialized = serde_json::to_string(&schema).unwrap(); + let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap(); + let ids = json_value["identifier-field-ids"] + .as_array() + .unwrap() + .iter() + .map(|v| v.as_i64().unwrap()) + .collect::>(); + assert_eq!(ids, vec![1, 2, 3]); + } } From ddfb4727dfc201ec0129c9f40650e95960bd2a33 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 17 Jun 2026 18:16:19 -0700 Subject: [PATCH 2/5] fix ci (public-api, and fmt) --- crates/iceberg/public-api.txt | 15 +++++- crates/iceberg/src/arrow/reader/pipeline.rs | 8 +++- .../src/arrow/record_batch_transformer.rs | 46 ++++++++++--------- crates/iceberg/src/metadata_columns.rs | 7 +-- crates/iceberg/src/scan/context.rs | 31 +++++++------ crates/iceberg/src/scan/mod.rs | 5 +- crates/iceberg/src/spec/schema/_serde.rs | 3 +- 7 files changed, 69 insertions(+), 46 deletions(-) diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index 653649e6cf..f259fbbf12 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -78,6 +78,16 @@ pub fn iceberg::arrow::ArrowReaderBuilder::with_range_coalesce_bytes(self, range pub fn iceberg::arrow::ArrowReaderBuilder::with_range_fetch_concurrency(self, range_fetch_concurrency: usize) -> Self pub fn iceberg::arrow::ArrowReaderBuilder::with_row_group_filtering_enabled(self, row_group_filtering_enabled: bool) -> Self pub fn iceberg::arrow::ArrowReaderBuilder::with_row_selection_enabled(self, row_selection_enabled: bool) -> Self +pub struct iceberg::arrow::PartitionColumnConstant +pub iceberg::arrow::PartitionColumnConstant::child_values: alloc::vec::Vec> +pub iceberg::arrow::PartitionColumnConstant::fields: arrow_schema::fields::Fields +impl core::clone::Clone for iceberg::arrow::PartitionColumnConstant +pub fn iceberg::arrow::PartitionColumnConstant::clone(&self) -> iceberg::arrow::PartitionColumnConstant +impl core::cmp::PartialEq for iceberg::arrow::PartitionColumnConstant +pub fn iceberg::arrow::PartitionColumnConstant::eq(&self, other: &iceberg::arrow::PartitionColumnConstant) -> bool +impl core::fmt::Debug for iceberg::arrow::PartitionColumnConstant +pub fn iceberg::arrow::PartitionColumnConstant::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::marker::StructuralPartialEq for iceberg::arrow::PartitionColumnConstant pub struct iceberg::arrow::PartitionValueCalculator impl iceberg::arrow::PartitionValueCalculator pub fn iceberg::arrow::PartitionValueCalculator::calculate(&self, batch: &arrow_array::record_batch::RecordBatch) -> iceberg::Result @@ -127,6 +137,7 @@ pub fn iceberg::arrow::arrow_schema_to_schema(schema: &arrow_schema::schema::Sch pub fn iceberg::arrow::arrow_schema_to_schema_auto_assign_ids(schema: &arrow_schema::schema::Schema) -> iceberg::Result pub fn iceberg::arrow::arrow_struct_to_literal(struct_array: &arrow_array::array::ArrayRef, ty: &iceberg::spec::StructType) -> iceberg::Result>> pub fn iceberg::arrow::arrow_type_to_type(ty: &arrow_schema::datatype::DataType) -> iceberg::Result +pub fn iceberg::arrow::build_partition_column_constant(unified_partition_type: &iceberg::spec::StructType, partition_spec: &iceberg::spec::PartitionSpec, partition_data: &iceberg::spec::Struct) -> iceberg::Result pub fn iceberg::arrow::datum_to_arrow_type_with_ree(datum: &iceberg::spec::Datum) -> arrow_schema::datatype::DataType pub fn iceberg::arrow::schema_to_arrow_schema(schema: &iceberg::spec::Schema) -> iceberg::Result pub fn iceberg::arrow::strip_metadata_from_schema(schema: &arrow_schema::schema::Schema) -> iceberg::Result @@ -1106,6 +1117,7 @@ pub const iceberg::metadata_columns::RESERVED_FIELD_ID_SPEC_ID: i32 pub fn iceberg::metadata_columns::change_ordinal_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::change_type_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::commit_snapshot_id_field() -> &'static iceberg::spec::NestedFieldRef +pub fn iceberg::metadata_columns::compute_unified_partition_type<'a>(partition_specs: impl core::iter::traits::iterator::Iterator, schema: &iceberg::spec::Schema) -> iceberg::Result pub fn iceberg::metadata_columns::delete_file_path_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::delete_file_pos_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::deleted_field() -> &'static iceberg::spec::NestedFieldRef @@ -1228,6 +1240,7 @@ pub iceberg::scan::FileScanTask::file_size_in_bytes: u64 pub iceberg::scan::FileScanTask::length: u64 pub iceberg::scan::FileScanTask::name_mapping: core::option::Option> pub iceberg::scan::FileScanTask::partition: core::option::Option +pub iceberg::scan::FileScanTask::partition_column_constant: core::option::Option> pub iceberg::scan::FileScanTask::partition_spec: core::option::Option> pub iceberg::scan::FileScanTask::predicate: core::option::Option pub iceberg::scan::FileScanTask::project_field_ids: alloc::vec::Vec @@ -1248,7 +1261,7 @@ impl core::fmt::Debug for iceberg::scan::FileScanTask pub fn iceberg::scan::FileScanTask::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl core::marker::StructuralPartialEq for iceberg::scan::FileScanTask impl iceberg::scan::FileScanTask -pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), ())> +pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), (), ())> impl serde_core::ser::Serialize for iceberg::scan::FileScanTask pub fn iceberg::scan::FileScanTask::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer impl<'de> serde_core::de::Deserialize<'de> for iceberg::scan::FileScanTask diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index d4d919fe1d..1ee11e9541 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -37,7 +37,9 @@ use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; use crate::error::Result; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_PARTITION, is_metadata_field}; +use crate::metadata_columns::{ + RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_PARTITION, is_metadata_field, +}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::Datum; use crate::{Error, ErrorKind}; @@ -256,7 +258,9 @@ impl FileScanTaskReader { } // Add the _partition metadata struct column if it's in the projected fields - if task.project_field_ids().contains(&RESERVED_FIELD_ID_PARTITION) + if task + .project_field_ids() + .contains(&RESERVED_FIELD_ID_PARTITION) && let Some(partition_column_constant) = &task.partition_column_constant { record_batch_transformer_builder = record_batch_transformer_builder diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 4b9fb82bf5..0d714f784e 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -33,8 +33,8 @@ use crate::arrow::value::{create_primitive_array_repeated, create_primitive_arra use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema}; use crate::metadata_columns::{RESERVED_FIELD_ID_PARTITION, get_metadata_field}; use crate::spec::{ - Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, - Struct, StructType, Transform, + Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, StructType, + Transform, }; use crate::{Error, ErrorKind, Result}; @@ -288,8 +288,11 @@ impl RecordBatchTransformerBuilder { partition_spec: &PartitionSpec, partition_data: &Struct, ) -> Result { - let partition_column = - build_partition_column_constant(unified_partition_type, partition_spec, partition_data)?; + let partition_column = build_partition_column_constant( + unified_partition_type, + partition_spec, + partition_data, + )?; self.partition_column = Some(partition_column); Ok(self) } @@ -763,7 +766,10 @@ impl RecordBatchTransformer { ) -> Result { if fields.is_empty() { let nulls = arrow_buffer::NullBuffer::new_null(num_rows); - return Ok(Arc::new(StructArray::new_empty_fields(num_rows, Some(nulls)))); + return Ok(Arc::new(StructArray::new_empty_fields( + num_rows, + Some(nulls), + ))); } let child_arrays: Vec = fields @@ -808,11 +814,7 @@ pub fn build_partition_column_constant( // unnecessary cast operation per batch. // Always nullable: partition evolution means any field may be absent // for files written under a different spec. - let arrow_field = Field::new( - &unified_field.name, - arrow_type, - true, - ); + let arrow_field = Field::new(&unified_field.name, arrow_type, true); arrow_fields.push(Arc::new(arrow_field)); // Find matching field in this file's partition spec by field_id @@ -1883,8 +1885,7 @@ mod test { ); // Unified partition type: just one field (same as the spec's type) - let unified_partition_type = - partition_spec.partition_type(&snapshot_schema).unwrap(); + let unified_partition_type = partition_spec.partition_type(&snapshot_schema).unwrap(); // Partition data: id=42 let partition_data = Struct::from_iter(vec![Some(Literal::int(42))]); @@ -1954,7 +1955,9 @@ mod test { fn partition_column_with_evolution() { use arrow_array::StructArray; - use crate::metadata_columns::{RESERVED_FIELD_ID_PARTITION, compute_unified_partition_type}; + use crate::metadata_columns::{ + RESERVED_FIELD_ID_PARTITION, compute_unified_partition_type, + }; use crate::spec::Transform; // Schema with two fields that could be partition sources @@ -1989,11 +1992,9 @@ mod test { .unwrap(); // Unified type includes both year and month - let unified_partition_type = compute_unified_partition_type( - [&spec_v0, &spec_v1].into_iter(), - &snapshot_schema, - ) - .unwrap(); + let unified_partition_type = + compute_unified_partition_type([&spec_v0, &spec_v1].into_iter(), &snapshot_schema) + .unwrap(); assert_eq!(unified_partition_type.fields().len(), 2); @@ -2015,10 +2016,11 @@ mod test { .unwrap() .build(); - let parquet_batch = RecordBatch::try_new(parquet_schema, vec![Arc::new( - StringArray::from(vec!["hello", "world"]), - )]) - .unwrap(); + let parquet_batch = + RecordBatch::try_new(parquet_schema, vec![Arc::new(StringArray::from(vec![ + "hello", "world", + ]))]) + .unwrap(); let result = transformer.process_record_batch(parquet_batch).unwrap(); diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs index d272c38a98..6673f367b4 100644 --- a/crates/iceberg/src/metadata_columns.rs +++ b/crates/iceberg/src/metadata_columns.rs @@ -26,7 +26,9 @@ use std::sync::Arc; use once_cell::sync::Lazy; -use crate::spec::{NestedField, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, StructType, Type}; +use crate::spec::{ + NestedField, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, StructType, Type, +}; use crate::{Error, ErrorKind, Result}; /// Reserved field ID for the file path (_file) column per Iceberg spec @@ -417,8 +419,7 @@ pub fn compute_unified_partition_type<'a>( })?; let res_type = field.transform.result_type(&source_field.field_type)?; - let nested = - NestedField::optional(field.field_id, &field.name, res_type).into(); + let nested = NestedField::optional(field.field_id, &field.name, res_type).into(); struct_fields.push(nested); } } diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 732f318844..29355057b4 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -130,23 +130,24 @@ impl ManifestEntryContext { .await; // Compute the _partition struct constant if the unified partition type is available - let partition_column_constant = if let Some(ref unified_partition_type) = self.unified_partition_type { - let partition_spec = self - .table_metadata - .partition_spec_by_id(self.partition_spec_id); - if let Some(spec) = partition_spec { - let constant = build_partition_column_constant( - unified_partition_type, - spec, - &self.manifest_entry.data_file.partition, - )?; - Some(Arc::new(constant)) + let partition_column_constant = + if let Some(ref unified_partition_type) = self.unified_partition_type { + let partition_spec = self + .table_metadata + .partition_spec_by_id(self.partition_spec_id); + if let Some(spec) = partition_spec { + let constant = build_partition_column_constant( + unified_partition_type, + spec, + &self.manifest_entry.data_file.partition, + )?; + Some(Arc::new(constant)) + } else { + None + } } else { None - } - } else { - None - }; + }; Ok(FileScanTask::builder() .with_file_size_in_bytes(self.manifest_entry.file_size_in_bytes()) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index ba813a1267..5491cd685d 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -306,7 +306,10 @@ impl<'a> TableScanBuilder<'a> { // Compute unified partition type if _partition is projected let unified_partition_type = if field_ids.contains(&RESERVED_FIELD_ID_PARTITION) { let upt = compute_unified_partition_type( - self.table.metadata().partition_specs_iter().map(|s| s.as_ref()), + self.table + .metadata() + .partition_specs_iter() + .map(|s| s.as_ref()), &schema, )?; Some(Arc::new(upt)) diff --git a/crates/iceberg/src/spec/schema/_serde.rs b/crates/iceberg/src/spec/schema/_serde.rs index 32832c5b89..cc537d453e 100644 --- a/crates/iceberg/src/spec/schema/_serde.rs +++ b/crates/iceberg/src/spec/schema/_serde.rs @@ -21,8 +21,7 @@ //! and then converted into the [Schema] struct. Serialization works the other way around. //! [SchemaV1] and [SchemaV2] are internal structs only used for serialization and deserialization. -use serde::Deserialize; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use super::{DEFAULT_SCHEMA_ID, Schema}; use crate::spec::StructType; From e95f9fc3882e7dea193ba6ad6ebb9cd01db27830 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 22 Jun 2026 11:18:54 -0700 Subject: [PATCH 3/5] address review comments --- crates/iceberg/src/arrow/mod.rs | 2 +- crates/iceberg/src/arrow/reader/pipeline.rs | 27 +++- .../src/arrow/record_batch_transformer.rs | 5 +- crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/metadata_columns.rs | 50 +------ crates/iceberg/src/partitioning.rs | 80 +++++++++++ crates/iceberg/src/scan/context.rs | 24 +--- crates/iceberg/src/scan/mod.rs | 4 +- crates/iceberg/src/scan/task.rs | 17 +-- crates/iceberg/src/spec/schema/_serde.rs | 135 +++++------------- 10 files changed, 149 insertions(+), 196 deletions(-) create mode 100644 crates/iceberg/src/partitioning.rs diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index fa99944dca..2330fc6940 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -32,7 +32,7 @@ mod reader; /// RecordBatch projection utilities pub mod record_batch_projector; pub(crate) mod record_batch_transformer; -pub use record_batch_transformer::{PartitionColumnConstant, build_partition_column_constant}; +pub(crate) use record_batch_transformer::build_partition_column_constant; mod scan_metrics; mod value; diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 1ee11e9541..8975b73dba 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -33,7 +33,8 @@ use super::{ }; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::int96::coerce_int96_timestamps; -use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; +use crate::arrow::record_batch_transformer::{PartitionColumnConstant, RecordBatchTransformerBuilder}; +use crate::arrow::build_partition_column_constant; use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; use crate::error::Result; use crate::io::{FileIO, FileMetadata, FileRead}; @@ -257,14 +258,30 @@ impl FileScanTaskReader { record_batch_transformer_builder.with_partition(partition_spec, partition_data)?; } - // Add the _partition metadata struct column if it's in the projected fields + // Add the _partition metadata struct column if it's in the projected fields. + // Computed lazily here at read time from the unified partition type + task's spec + data. if task .project_field_ids() .contains(&RESERVED_FIELD_ID_PARTITION) - && let Some(partition_column_constant) = &task.partition_column_constant { - record_batch_transformer_builder = record_batch_transformer_builder - .with_partition_column_precomputed((**partition_column_constant).clone()); + if let Some(unified_type) = &task.unified_partition_type { + if unified_type.fields().is_empty() { + // Unpartitioned table: empty struct rendered as null + let empty_constant = PartitionColumnConstant { + fields: arrow_schema::Fields::empty(), + child_values: vec![], + }; + record_batch_transformer_builder = record_batch_transformer_builder + .with_partition_column_precomputed(empty_constant); + } else if let (Some(spec), Some(partition_data)) = + (&task.partition_spec, &task.partition) + { + let constant = + build_partition_column_constant(unified_type, spec, partition_data)?; + record_batch_transformer_builder = record_batch_transformer_builder + .with_partition_column_precomputed(constant); + } + } } let mut record_batch_transformer = record_batch_transformer_builder.build(); diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 0d714f784e..afa403f33a 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -1955,9 +1955,8 @@ mod test { fn partition_column_with_evolution() { use arrow_array::StructArray; - use crate::metadata_columns::{ - RESERVED_FIELD_ID_PARTITION, compute_unified_partition_type, - }; + use crate::metadata_columns::RESERVED_FIELD_ID_PARTITION; + use crate::partitioning::compute_unified_partition_type; use crate::spec::Transform; // Schema with two fields that could be partition sources diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 4e346460f5..301992d15e 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -100,6 +100,7 @@ pub mod writer; mod delete_vector; pub mod metadata_columns; +pub mod partitioning; pub mod puffin; /// Utility functions and modules. pub mod util; diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs index 6673f367b4..b622a76edc 100644 --- a/crates/iceberg/src/metadata_columns.rs +++ b/crates/iceberg/src/metadata_columns.rs @@ -26,9 +26,7 @@ use std::sync::Arc; use once_cell::sync::Lazy; -use crate::spec::{ - NestedField, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, StructType, Type, -}; +use crate::spec::{NestedField, NestedFieldRef, PrimitiveType, Type}; use crate::{Error, ErrorKind, Result}; /// Reserved field ID for the file path (_file) column per Iceberg spec @@ -381,52 +379,6 @@ pub fn partition_field(partition_fields: Vec) -> NestedFieldRef ) } -/// Computes the unified partition type across all partition specs in the table. -/// -/// This is equivalent to Java's `Partitioning.partitionType(table)`. The result is a -/// StructType containing all partition fields ever used across all specs, enabling correct -/// representation of the `_partition` metadata column when partition evolution has occurred. -/// -/// For each spec, the partition fields are derived from the transform applied to the source -/// column. Fields are deduplicated by field_id - each unique field_id appears exactly once -/// in the result. -/// -/// # Arguments -/// * `partition_specs` - Iterator over all partition specs in the table -/// * `schema` - The current table schema (needed to determine result types of transforms) -pub fn compute_unified_partition_type<'a>( - partition_specs: impl Iterator, - schema: &Schema, -) -> Result { - let mut seen_field_ids = std::collections::HashSet::new(); - let mut struct_fields: Vec = Vec::new(); - - for spec in partition_specs { - for field in spec.fields() { - if seen_field_ids.contains(&field.field_id) { - continue; - } - seen_field_ids.insert(field.field_id); - - let source_field = schema.field_by_id(field.source_id).ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - format!( - "No column with source column id {} in schema for partition field {}", - field.source_id, field.name - ), - ) - })?; - - let res_type = field.transform.result_type(&source_field.field_type)?; - let nested = NestedField::optional(field.field_id, &field.name, res_type).into(); - struct_fields.push(nested); - } - } - - Ok(StructType::new(struct_fields)) -} - /// Returns the Iceberg field definition for a metadata field ID. /// /// Note: This function does not support `_partition` (field ID `i32::MAX - 5`) because diff --git a/crates/iceberg/src/partitioning.rs b/crates/iceberg/src/partitioning.rs new file mode 100644 index 0000000000..615499b031 --- /dev/null +++ b/crates/iceberg/src/partitioning.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Partition type utilities for Iceberg tables. + +use crate::spec::{NestedField, NestedFieldRef, PartitionSpec, Schema, StructType, Transform}; +use crate::{Error, ErrorKind, Result}; + +/// Computes the unified partition type across all partition specs in the table. +/// +/// This is equivalent to Java's `Partitioning.partitionType(table)`. The result is a +/// StructType containing all partition fields ever used across all specs, enabling correct +/// representation of the `_partition` metadata column when partition evolution has occurred. +/// +/// Matches Java's behavior: +/// - Specs are sorted by spec_id in descending order (newer specs first), so newer field +/// names take precedence when deduplicating by field_id. +/// - Void transform fields (dropped partition columns) are skipped. +/// - Fields are deduplicated by field_id — each unique field_id appears exactly once. +/// +/// # Arguments +/// * `partition_specs` - Iterator over all partition specs in the table +/// * `schema` - The current table schema (needed to determine result types of transforms) +pub fn compute_unified_partition_type<'a>( + partition_specs: impl Iterator, + schema: &Schema, +) -> Result { + let mut seen_field_ids = std::collections::HashSet::new(); + let mut struct_fields: Vec = Vec::new(); + + // Sort specs by spec_id descending (newer first) to match Java's behavior: + // newer field names take precedence when deduplicating by field_id. + let mut specs: Vec<&PartitionSpec> = partition_specs.collect(); + specs.sort_by(|a, b| b.spec_id().cmp(&a.spec_id())); + + for spec in specs { + for field in spec.fields() { + if seen_field_ids.contains(&field.field_id) { + continue; + } + + // Skip void transforms (dropped partition columns) + if matches!(field.transform, Transform::Void) { + continue; + } + + seen_field_ids.insert(field.field_id); + + let source_field = schema.field_by_id(field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "No column with source column id {} in schema for partition field {}", + field.source_id, field.name + ), + ) + })?; + + let res_type = field.transform.result_type(&source_field.field_type)?; + let nested = NestedField::optional(field.field_id, &field.name, res_type).into(); + struct_fields.push(nested); + } + } + + Ok(StructType::new(struct_fields)) +} diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 29355057b4..83d41e3ad4 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use futures::channel::mpsc::Sender; use futures::{SinkExt, TryFutureExt}; -use crate::arrow::record_batch_transformer::build_partition_column_constant; use crate::delete_file_index::DeleteFileIndex; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::object_cache::ObjectCache; @@ -129,26 +128,6 @@ impl ManifestEntryContext { ) .await; - // Compute the _partition struct constant if the unified partition type is available - let partition_column_constant = - if let Some(ref unified_partition_type) = self.unified_partition_type { - let partition_spec = self - .table_metadata - .partition_spec_by_id(self.partition_spec_id); - if let Some(spec) = partition_spec { - let constant = build_partition_column_constant( - unified_partition_type, - spec, - &self.manifest_entry.data_file.partition, - )?; - Some(Arc::new(constant)) - } else { - None - } - } else { - None - }; - Ok(FileScanTask::builder() .with_file_size_in_bytes(self.manifest_entry.file_size_in_bytes()) .with_start(0) @@ -167,7 +146,7 @@ impl ManifestEntryContext { // TODO: Pass actual PartitionSpec through context chain for native flow .with_partition_spec(None) .with_name_mapping(self.name_mapping) - .with_partition_column_constant(partition_column_constant) + .with_unified_partition_type(self.unified_partition_type.clone()) .with_case_sensitive(self.case_sensitive) .build()) } @@ -316,7 +295,6 @@ impl PlanContext { delete_file_index, name_mapping: self.name_mapping.clone(), case_sensitive: self.case_sensitive, - table_metadata: self.table_metadata.clone(), unified_partition_type: self.unified_partition_type.clone(), } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 5491cd685d..779b01c06d 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -38,9 +38,9 @@ use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluato use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::metadata_columns::{ - RESERVED_FIELD_ID_PARTITION, compute_unified_partition_type, get_metadata_field_id, - is_metadata_column_name, + RESERVED_FIELD_ID_PARTITION, get_metadata_field_id, is_metadata_column_name, }; +use crate::partitioning::compute_unified_partition_type; use crate::runtime::Runtime; use crate::spec::{DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, NameMapping, SnapshotRef}; use crate::table::Table; diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 9a413d7fee..0c36dd812c 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -22,11 +22,10 @@ use serde::{Deserialize, Serialize, Serializer}; use typed_builder::TypedBuilder; use crate::Result; -use crate::arrow::record_batch_transformer::PartitionColumnConstant; use crate::expr::BoundPredicate; use crate::spec::{ DataContentType, DataFileFormat, ManifestEntryRef, NameMapping, PartitionSpec, Schema, - SchemaRef, Struct, + SchemaRef, Struct, StructType, }; /// A stream of [`FileScanTask`]. @@ -117,20 +116,16 @@ pub struct FileScanTask { #[builder(default)] pub name_mapping: Option>, - /// Pre-computed constant for the `_partition` metadata struct column. - /// Populated by scan planning (or externally by Comet's planner) when - /// `RESERVED_FIELD_ID_PARTITION` is in the projected field IDs. - /// - /// NOTE: This field is not serializable. FileScanTask cannot be - /// serialized/deserialized when this is populated. This is consistent with - /// partition_spec and name_mapping above, and is acceptable because Comet - /// builds tasks in-process (no serialization round-trip). + /// The unified partition type across all specs in the table. + /// When `RESERVED_FIELD_ID_PARTITION` is in the projected field IDs, the reader + /// uses this type along with the task's partition_spec and partition data to + /// materialize the `_partition` struct column at read time. #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] #[serde(serialize_with = "serialize_not_implemented")] #[serde(deserialize_with = "deserialize_not_implemented")] #[builder(default)] - pub partition_column_constant: Option>, + pub unified_partition_type: Option>, /// Whether this scan task should treat column names as case-sensitive when binding predicates. pub case_sensitive: bool, diff --git a/crates/iceberg/src/spec/schema/_serde.rs b/crates/iceberg/src/spec/schema/_serde.rs index cc537d453e..4b0011835a 100644 --- a/crates/iceberg/src/spec/schema/_serde.rs +++ b/crates/iceberg/src/spec/schema/_serde.rs @@ -15,13 +15,16 @@ // specific language governing permissions and limitations // under the License. -//! Helper types for Schema serialization/deserialization. -//! -//! For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct -//! and then converted into the [Schema] struct. Serialization works the other way around. -//! [SchemaV1] and [SchemaV2] are internal structs only used for serialization and deserialization. - -use serde::{Deserialize, Serialize}; +/// This is a helper module that defines types to help with serialization/deserialization. +/// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct +/// and then converted into the [Schema] struct. Serialization works the other way around. +/// [SchemaV1] and [SchemaV2] are internal struct that are only used for serialization and deserialization. +use serde::Deserialize; +/// This is a helper module that defines types to help with serialization/deserialization. +/// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct +/// and then converted into the [Schema] struct. Serialization works the other way around. +/// [SchemaV1] and [SchemaV2] are internal struct that are only used for serialization and deserialization. +use serde::Serialize; use super::{DEFAULT_SCHEMA_ID, Schema}; use crate::spec::StructType; @@ -29,9 +32,7 @@ use crate::{Error, Result}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(untagged)] -// IMPORTANT: V2 must precede V1. Serde untagged tries variants in declaration order; -// V2's required `schema_id: i32` distinguishes it from V1's optional field. -// Swapping the order will silently break deserialization. +/// Enum for Schema serialization/deserializaion pub(super) enum SchemaEnum { V2(SchemaV2), V1(SchemaV1), @@ -50,11 +51,7 @@ pub(crate) struct SchemaV2 { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] -/// Defines the structure of a v1 schema for serialization/deserialization. -/// -/// This is a permissive fallback shape for JSON lacking `schema-id`, -/// not a faithful V1 spec representation. It accepts `identifier-field-ids` -/// even though the V1 spec doesn't define them (Postel's law). +/// Defines the structure of a v1 schema for serialization/deserialization pub(crate) struct SchemaV1 { #[serde(skip_serializing_if = "Option::is_none")] pub schema_id: Option, @@ -64,6 +61,7 @@ pub(crate) struct SchemaV1 { pub fields: StructType, } +/// Helper to serialize/deserializa Schema impl TryFrom for Schema { type Error = Error; fn try_from(value: SchemaEnum) -> Result { @@ -74,9 +72,6 @@ impl TryFrom for Schema { } } -// Always serialize as V2. The Iceberg spec treats V2 as the canonical format; -// V1 schemas are upgraded on read. Schema-id defaults to 0 when absent from -// the source JSON. impl From for SchemaEnum { fn from(value: Schema) -> Self { SchemaEnum::V2(value.into()) @@ -105,9 +100,6 @@ impl TryFrom for Schema { } } -// Note: alias_to_id is intentionally excluded from serialization. -// Per Iceberg spec, aliases are not part of the JSON schema representation. -// They must be reconstructed from external sources after deserialization. impl From for SchemaV2 { fn from(value: Schema) -> Self { SchemaV2 { @@ -115,9 +107,7 @@ impl From for SchemaV2 { identifier_field_ids: if value.identifier_field_ids.is_empty() { None } else { - let mut ids: Vec = value.identifier_field_ids.into_iter().collect(); - ids.sort_unstable(); - Some(ids) + Some(value.identifier_field_ids.into_iter().collect()) }, fields: value.r#struct, } @@ -131,9 +121,7 @@ impl From for SchemaV1 { identifier_field_ids: if value.identifier_field_ids.is_empty() { None } else { - let mut ids: Vec = value.identifier_field_ids.into_iter().collect(); - ids.sort_unstable(); - Some(ids) + Some(value.identifier_field_ids.into_iter().collect()) }, fields: value.r#struct, } @@ -146,48 +134,39 @@ mod tests { use crate::spec::schema::tests::table_schema_simple; use crate::spec::{PrimitiveType, Type}; - fn check_schema_serde(json: &str, expected_schema: Schema) { - let deserialized_schema: Schema = serde_json::from_str(json).unwrap(); - assert_eq!(deserialized_schema, expected_schema); + fn check_schema_serde(json: &str, expected_type: Schema, _expected_enum: SchemaEnum) { + let desered_type: Schema = serde_json::from_str(json).unwrap(); + assert_eq!(desered_type, expected_type); + assert!(matches!(desered_type.clone(), _expected_enum)); - let serialized_json = serde_json::to_string(&expected_schema).unwrap(); - let round_tripped: Schema = serde_json::from_str(&serialized_json).unwrap(); + let sered_json = serde_json::to_string(&expected_type).unwrap(); + let parsed_json_value = serde_json::from_str::(&sered_json).unwrap(); - assert_eq!(round_tripped, deserialized_schema); + assert_eq!(parsed_json_value, desered_type); } #[test] fn test_serde_with_schema_id() { let (schema, record) = table_schema_simple(); - check_schema_serde(record, schema.clone()); - - // Verify it deserializes as V2 (has schema-id) - let schema_enum: SchemaEnum = serde_json::from_str(record).unwrap(); - assert!(matches!(schema_enum, SchemaEnum::V2(_))); + let x: SchemaV2 = serde_json::from_str(record).unwrap(); + check_schema_serde(record, schema, SchemaEnum::V2(x)); } #[test] fn test_serde_without_schema_id() { - let (schema, _) = table_schema_simple(); - - // Construct a V1 JSON without schema-id using programmatic manipulation - let mut json_value: serde_json::Value = serde_json::to_value(&schema).unwrap(); - json_value.as_object_mut().unwrap().remove("schema-id"); - let v1_json = serde_json::to_string(&json_value).unwrap(); - - let mut expected = schema; - expected.schema_id = DEFAULT_SCHEMA_ID; - - check_schema_serde(&v1_json, expected); - - // Verify it deserializes as V1 (no schema-id) - let schema_enum: SchemaEnum = serde_json::from_str(&v1_json).unwrap(); - assert!(matches!(schema_enum, SchemaEnum::V1(_))); + let (mut schema, record) = table_schema_simple(); + // we remove the ""schema-id": 1," string from example + let new_record = record.replace("\"schema-id\":1,", ""); + // By default schema_id field is set to DEFAULT_SCHEMA_ID when no value is set in json + schema.schema_id = DEFAULT_SCHEMA_ID; + + let x: SchemaV1 = serde_json::from_str(new_record.as_str()).unwrap(); + check_schema_serde(&new_record, schema, SchemaEnum::V1(x)); } #[test] - fn test_schema_v2_fields() { + fn schema() { let record = r#" { "type": "struct", @@ -222,52 +201,4 @@ mod tests { assert_eq!(2, result.fields[1].id); assert!(!result.fields[1].required); } - - #[test] - fn test_derived_fields_work_after_round_trip() { - let (schema, record) = table_schema_simple(); - let deserialized: Schema = serde_json::from_str(record).unwrap(); - - // Verify lookup by name works (exercises name_to_id index) - assert_eq!( - deserialized.field_by_name("foo").map(|f| f.id), - schema.field_by_name("foo").map(|f| f.id) - ); - assert_eq!( - deserialized.field_by_name("bar").map(|f| f.id), - schema.field_by_name("bar").map(|f| f.id) - ); - - // Verify field_by_id works (exercises id_to_field index) - assert!(deserialized.field_by_id(1).is_some()); - assert!(deserialized.field_by_id(2).is_some()); - assert!(deserialized.field_by_id(999).is_none()); - } - - #[test] - fn test_identifier_field_ids_sorted_on_serialization() { - let schema = Schema::builder() - .with_schema_id(1) - .with_identifier_field_ids(vec![3, 1, 2]) - .with_fields(vec![ - crate::spec::NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)) - .into(), - crate::spec::NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)) - .into(), - crate::spec::NestedField::required(3, "c", Type::Primitive(PrimitiveType::Int)) - .into(), - ]) - .build() - .unwrap(); - - let serialized = serde_json::to_string(&schema).unwrap(); - let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap(); - let ids = json_value["identifier-field-ids"] - .as_array() - .unwrap() - .iter() - .map(|v| v.as_i64().unwrap()) - .collect::>(); - assert_eq!(ids, vec![1, 2, 3]); - } } From 33b690a78059e26d09ebda81bab4a52143708206 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 22 Jun 2026 13:39:14 -0700 Subject: [PATCH 4/5] more changes to address comments --- crates/iceberg/src/arrow/reader/pipeline.rs | 38 ++++++++++----------- crates/iceberg/src/scan/context.rs | 6 +--- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 8975b73dba..ad88fe7072 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -31,10 +31,12 @@ use super::{ ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema, apply_name_mapping_to_arrow_schema, }; +use crate::arrow::build_partition_column_constant; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::int96::coerce_int96_timestamps; -use crate::arrow::record_batch_transformer::{PartitionColumnConstant, RecordBatchTransformerBuilder}; -use crate::arrow::build_partition_column_constant; +use crate::arrow::record_batch_transformer::{ + PartitionColumnConstant, RecordBatchTransformerBuilder, +}; use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; use crate::error::Result; use crate::io::{FileIO, FileMetadata, FileRead}; @@ -263,24 +265,22 @@ impl FileScanTaskReader { if task .project_field_ids() .contains(&RESERVED_FIELD_ID_PARTITION) + && let Some(unified_type) = &task.unified_partition_type { - if let Some(unified_type) = &task.unified_partition_type { - if unified_type.fields().is_empty() { - // Unpartitioned table: empty struct rendered as null - let empty_constant = PartitionColumnConstant { - fields: arrow_schema::Fields::empty(), - child_values: vec![], - }; - record_batch_transformer_builder = record_batch_transformer_builder - .with_partition_column_precomputed(empty_constant); - } else if let (Some(spec), Some(partition_data)) = - (&task.partition_spec, &task.partition) - { - let constant = - build_partition_column_constant(unified_type, spec, partition_data)?; - record_batch_transformer_builder = record_batch_transformer_builder - .with_partition_column_precomputed(constant); - } + if unified_type.fields().is_empty() { + // Unpartitioned table: empty struct rendered as null + let empty_constant = PartitionColumnConstant { + fields: arrow_schema::Fields::empty(), + child_values: vec![], + }; + record_batch_transformer_builder = record_batch_transformer_builder + .with_partition_column_precomputed(empty_constant); + } else if let (Some(spec), Some(partition_data)) = + (&task.partition_spec, &task.partition) + { + let constant = build_partition_column_constant(unified_type, spec, partition_data)?; + record_batch_transformer_builder = + record_batch_transformer_builder.with_partition_column_precomputed(constant); } } diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 83d41e3ad4..c1e63cf020 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -48,7 +48,6 @@ pub(crate) struct ManifestFileContext { delete_file_index: DeleteFileIndex, name_mapping: Option>, case_sensitive: bool, - table_metadata: TableMetadataRef, unified_partition_type: Option>, } @@ -65,7 +64,6 @@ pub(crate) struct ManifestEntryContext { pub delete_file_index: DeleteFileIndex, pub name_mapping: Option>, pub case_sensitive: bool, - pub table_metadata: TableMetadataRef, pub unified_partition_type: Option>, } @@ -83,9 +81,8 @@ impl ManifestFileContext { expression_evaluator_cache, delete_file_index, name_mapping, - table_metadata, + case_sensitive, unified_partition_type, - .. } = self; let manifest = object_cache.get_manifest(&manifest_file).await?; @@ -102,7 +99,6 @@ impl ManifestFileContext { delete_file_index: delete_file_index.clone(), name_mapping: name_mapping.clone(), case_sensitive, - table_metadata: table_metadata.clone(), unified_partition_type: unified_partition_type.clone(), }; From c723ae2e67d9b5d0e9d045dec82116fb4e340226 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 22 Jun 2026 16:29:58 -0700 Subject: [PATCH 5/5] fix ci failures --- crates/iceberg/public-api.txt | 16 +++------------- crates/iceberg/src/partitioning.rs | 2 +- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index f259fbbf12..b26408d349 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -78,16 +78,6 @@ pub fn iceberg::arrow::ArrowReaderBuilder::with_range_coalesce_bytes(self, range pub fn iceberg::arrow::ArrowReaderBuilder::with_range_fetch_concurrency(self, range_fetch_concurrency: usize) -> Self pub fn iceberg::arrow::ArrowReaderBuilder::with_row_group_filtering_enabled(self, row_group_filtering_enabled: bool) -> Self pub fn iceberg::arrow::ArrowReaderBuilder::with_row_selection_enabled(self, row_selection_enabled: bool) -> Self -pub struct iceberg::arrow::PartitionColumnConstant -pub iceberg::arrow::PartitionColumnConstant::child_values: alloc::vec::Vec> -pub iceberg::arrow::PartitionColumnConstant::fields: arrow_schema::fields::Fields -impl core::clone::Clone for iceberg::arrow::PartitionColumnConstant -pub fn iceberg::arrow::PartitionColumnConstant::clone(&self) -> iceberg::arrow::PartitionColumnConstant -impl core::cmp::PartialEq for iceberg::arrow::PartitionColumnConstant -pub fn iceberg::arrow::PartitionColumnConstant::eq(&self, other: &iceberg::arrow::PartitionColumnConstant) -> bool -impl core::fmt::Debug for iceberg::arrow::PartitionColumnConstant -pub fn iceberg::arrow::PartitionColumnConstant::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result -impl core::marker::StructuralPartialEq for iceberg::arrow::PartitionColumnConstant pub struct iceberg::arrow::PartitionValueCalculator impl iceberg::arrow::PartitionValueCalculator pub fn iceberg::arrow::PartitionValueCalculator::calculate(&self, batch: &arrow_array::record_batch::RecordBatch) -> iceberg::Result @@ -137,7 +127,6 @@ pub fn iceberg::arrow::arrow_schema_to_schema(schema: &arrow_schema::schema::Sch pub fn iceberg::arrow::arrow_schema_to_schema_auto_assign_ids(schema: &arrow_schema::schema::Schema) -> iceberg::Result pub fn iceberg::arrow::arrow_struct_to_literal(struct_array: &arrow_array::array::ArrayRef, ty: &iceberg::spec::StructType) -> iceberg::Result>> pub fn iceberg::arrow::arrow_type_to_type(ty: &arrow_schema::datatype::DataType) -> iceberg::Result -pub fn iceberg::arrow::build_partition_column_constant(unified_partition_type: &iceberg::spec::StructType, partition_spec: &iceberg::spec::PartitionSpec, partition_data: &iceberg::spec::Struct) -> iceberg::Result pub fn iceberg::arrow::datum_to_arrow_type_with_ree(datum: &iceberg::spec::Datum) -> arrow_schema::datatype::DataType pub fn iceberg::arrow::schema_to_arrow_schema(schema: &iceberg::spec::Schema) -> iceberg::Result pub fn iceberg::arrow::strip_metadata_from_schema(schema: &arrow_schema::schema::Schema) -> iceberg::Result @@ -1117,7 +1106,6 @@ pub const iceberg::metadata_columns::RESERVED_FIELD_ID_SPEC_ID: i32 pub fn iceberg::metadata_columns::change_ordinal_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::change_type_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::commit_snapshot_id_field() -> &'static iceberg::spec::NestedFieldRef -pub fn iceberg::metadata_columns::compute_unified_partition_type<'a>(partition_specs: impl core::iter::traits::iterator::Iterator, schema: &iceberg::spec::Schema) -> iceberg::Result pub fn iceberg::metadata_columns::delete_file_path_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::delete_file_pos_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::deleted_field() -> &'static iceberg::spec::NestedFieldRef @@ -1131,6 +1119,8 @@ pub fn iceberg::metadata_columns::partition_field(partition_fields: alloc::vec:: pub fn iceberg::metadata_columns::pos_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::row_id_field() -> &'static iceberg::spec::NestedFieldRef pub fn iceberg::metadata_columns::spec_id_field() -> &'static iceberg::spec::NestedFieldRef +pub mod iceberg::partitioning +pub fn iceberg::partitioning::compute_unified_partition_type<'a>(partition_specs: impl core::iter::traits::iterator::Iterator, schema: &iceberg::spec::Schema) -> iceberg::Result pub mod iceberg::puffin pub enum iceberg::puffin::CompressionCodec pub iceberg::puffin::CompressionCodec::Gzip(u8) @@ -1240,13 +1230,13 @@ pub iceberg::scan::FileScanTask::file_size_in_bytes: u64 pub iceberg::scan::FileScanTask::length: u64 pub iceberg::scan::FileScanTask::name_mapping: core::option::Option> pub iceberg::scan::FileScanTask::partition: core::option::Option -pub iceberg::scan::FileScanTask::partition_column_constant: core::option::Option> pub iceberg::scan::FileScanTask::partition_spec: core::option::Option> pub iceberg::scan::FileScanTask::predicate: core::option::Option pub iceberg::scan::FileScanTask::project_field_ids: alloc::vec::Vec pub iceberg::scan::FileScanTask::record_count: core::option::Option pub iceberg::scan::FileScanTask::schema: iceberg::spec::SchemaRef pub iceberg::scan::FileScanTask::start: u64 +pub iceberg::scan::FileScanTask::unified_partition_type: core::option::Option> impl iceberg::scan::FileScanTask pub fn iceberg::scan::FileScanTask::data_file_path(&self) -> &str pub fn iceberg::scan::FileScanTask::predicate(&self) -> core::option::Option<&iceberg::expr::BoundPredicate> diff --git a/crates/iceberg/src/partitioning.rs b/crates/iceberg/src/partitioning.rs index 615499b031..7b749c58b6 100644 --- a/crates/iceberg/src/partitioning.rs +++ b/crates/iceberg/src/partitioning.rs @@ -45,7 +45,7 @@ pub fn compute_unified_partition_type<'a>( // Sort specs by spec_id descending (newer first) to match Java's behavior: // newer field names take precedence when deduplicating by field_id. let mut specs: Vec<&PartitionSpec> = partition_specs.collect(); - specs.sort_by(|a, b| b.spec_id().cmp(&a.spec_id())); + specs.sort_by_key(|s| std::cmp::Reverse(s.spec_id())); for spec in specs { for field in spec.fields() {