Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion crates/iceberg/src/arrow/reader/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
use crate::error::Result;
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::metadata_columns::{
RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_SPEC_ID, is_metadata_field,
};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::Datum;
use crate::{Error, ErrorKind};
Expand Down Expand Up @@ -248,6 +250,16 @@ impl FileScanTaskReader {
record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
}

if task
.project_field_ids()
.contains(&RESERVED_FIELD_ID_SPEC_ID)
&& let Some(partition_spec) = &task.partition_spec

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should always add spec_id into the constant map if it's selected/projected.

The task.partition_spec should be present even for unpartitioned table(spec_id = 0).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy already raised this; this is a +1 to second it, to post as a reply on that thread:

"+1. If the invariant is that task.partition_spec is always present (spec_id = 0 even for unpartitioned tables), then gating the column on let Some(partition_spec) means a projected _spec_id can silently produce no column when that invariant is ever violated, which would be a confusing 'column vanished' bug rather than a clean error. Adding _spec_id to the constant map whenever it's projected (per @advancedxy) is the safer contract. Worth a test on an unpartitioned table to lock in spec_id = 0."

{
let spec_id_datum = Datum::int(partition_spec.spec_id());
record_batch_transformer_builder = record_batch_transformer_builder

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Routing _spec_id through with_constant makes it RunEndEncoded (metadata-field constants get datum_to_arrow_type_with_ree), same as _file. The spec types _spec_id as a plain int (https://iceberg.apache.org/spec/#reserved-field-ids). For the Comet/Spark consumer, does REE get decoded cheaply downstream, or would a plain Int32 avoid a cast? Not necessarily a change, just confirming the consumer is happy with REE here as it is for _file.

.with_constant(RESERVED_FIELD_ID_SPEC_ID, spec_id_datum);
}

if let (Some(partition_spec), Some(partition_data)) =
(task.partition_spec.clone(), task.partition.clone())
{
Expand Down
21 changes: 15 additions & 6 deletions crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,7 @@ impl RecordBatchTransformer {
.get(field_id)
.ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
.0;
let datum = constant_fields.get(field_id).ok_or(Error::new(
ErrorKind::Unexpected,
"constant field not found",
))?;
let arrow_type = datum_to_arrow_type_with_ree(datum);
let arrow_type = field.data_type().clone();
// Use the type from constant_fields (REE for constants)
let constant_field =
Field::new(field.name(), arrow_type, field.is_nullable())
Expand Down Expand Up @@ -486,7 +482,20 @@ impl RecordBatchTransformer {
// they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata
// is authoritative and should be preferred over file data.
if let Some(datum) = constant_fields.get(field_id) {
let arrow_type = datum_to_arrow_type_with_ree(datum);
let arrow_type = if get_metadata_field(*field_id).is_ok() {
datum_to_arrow_type_with_ree(datum)
} else {
field_id_to_mapped_schema_map
.get(field_id)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separating "metadata field becomes REE" from "identity-partition field becomes schema type" looks like a real correctness improvement: identity-partition constants shouldn't be forced to RunEndEncoded. Worth calling out to the #2668/#2746 authors so they rebase on top rather than reintroducing the old derivation. This branch is the scalar arm that #2668 would fold into a MetadataColumnSource::ScalarConstant; keeping it in this shape makes that easy.

.ok_or(Error::new(
ErrorKind::Unexpected,
"could not find field in schema",
))?
.0
.data_type()
.clone()
};

return Ok(ColumnSource::Add {
value: Some(datum.literal().clone()),
target_type: arrow_type,
Expand Down
5 changes: 4 additions & 1 deletion crates/iceberg/src/arrow/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,9 @@ pub(crate) fn create_primitive_array_repeated(
let vals: Vec<Option<i32>> = vec![None; num_rows];
Arc::new(Date32Array::from(vals))
}
(DataType::Int64, Some(PrimitiveLiteral::Int(value))) => {
Arc::new(Int64Array::from(vec![i64::from(*value); num_rows]))
}
(DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
Arc::new(Int64Array::from(vec![*value; num_rows]))
}
Expand Down Expand Up @@ -969,7 +972,7 @@ pub(crate) fn create_primitive_array_repeated(
(dt, _) => {
return Err(Error::new(
ErrorKind::Unexpected,
format!("unexpected target column type {dt}"),
format!("unexpected target column type {dt}, prim_lit {:?}", prim_lit),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixed styles in one string; ...{prim_lit:?} matches the inline-arg convention used right above it.

));
}
})
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,12 @@ mod tests {
-- child 2: "lower_bound" (Utf8)
StringArray
[
"100",
"1",

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This expected-output edit only exists because the fixture changed; if the fixture stays, this revert isn't needed. Flagging so reviewers know it's coupled to the above, not an independent behavior change.

]
-- child 3: "upper_bound" (Utf8)
StringArray
[
"300",
"1",
]
],
]"#]],
Expand Down
15 changes: 11 additions & 4 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::scan::{
PartitionFilterCache,
};
use crate::spec::{
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, NameMapping, SchemaRef,
SnapshotRef, TableMetadataRef,
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, NameMapping,
PartitionSpecRef, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::{Error, ErrorKind, Result};

Expand All @@ -48,6 +48,7 @@ pub(crate) struct ManifestFileContext {
delete_file_index: DeleteFileIndex,
name_mapping: Option<Arc<NameMapping>>,
case_sensitive: bool,
partition_spec: Option<PartitionSpecRef>,
}

/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
Expand All @@ -63,6 +64,7 @@ pub(crate) struct ManifestEntryContext {
pub delete_file_index: DeleteFileIndex,
pub name_mapping: Option<Arc<NameMapping>>,
pub case_sensitive: bool,
pub partition_spec: Option<PartitionSpecRef>,
}

impl ManifestFileContext {
Expand All @@ -80,6 +82,7 @@ impl ManifestFileContext {
delete_file_index,
name_mapping,
case_sensitive,
partition_spec,
} = self;

let manifest = object_cache.get_manifest(&manifest_file).await?;
Expand All @@ -96,6 +99,7 @@ impl ManifestFileContext {
delete_file_index: delete_file_index.clone(),
name_mapping: name_mapping.clone(),
case_sensitive,
partition_spec: partition_spec.clone(),
};

sender
Expand Down Expand Up @@ -135,8 +139,7 @@ impl ManifestEntryContext {
)
.with_deletes(deletes)
.with_partition(Some(self.manifest_entry.data_file.partition.clone()))
// TODO: Pass actual PartitionSpec through context chain for native flow
.with_partition_spec(None)
.with_partition_spec(self.partition_spec.clone())
.with_name_mapping(self.name_mapping)
.with_case_sensitive(self.case_sensitive)
.build())
Expand Down Expand Up @@ -284,6 +287,10 @@ impl PlanContext {
delete_file_index,
name_mapping: self.name_mapping.clone(),
case_sensitive: self.case_sensitive,
partition_spec: self
.table_metadata
.partition_spec_by_id(manifest_file.partition_spec_id)
.cloned(),
}
}
}
125 changes: 117 additions & 8 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,8 +625,9 @@ pub mod tests {
use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{
Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, RunArray,
StringArray,
};
use futures::{TryStreamExt, stream};
Expand All @@ -641,7 +642,7 @@ pub mod tests {
use crate::arrow::ArrowReaderBuilder;
use crate::expr::{BoundPredicate, Reference};
use crate::io::{FileIO, OutputFile};
use crate::metadata_columns::RESERVED_COL_NAME_FILE;
use crate::metadata_columns::{RESERVED_COL_NAME_FILE, RESERVED_COL_NAME_SPEC_ID};
use crate::scan::FileScanTask;
use crate::spec::{
DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, DataFileBuilder, DataFileFormat, Datum,
Expand Down Expand Up @@ -862,7 +863,7 @@ pub mod tests {
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.partition(Struct::from_iter([Some(Literal::long(1))]))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collapsing three distinct partition values to 1/1/1 reduces distinctness for every test on setup_manifest_files and forces the inspect/manifests.rs bound edits below. Was there a failure with the original values that motivated this? If the new constant-type handling needs specific values, a separate fixture might keep the shared one intact. But if the old values actually broke, that itself sounds like a test worth keeping. What did you hit here?

.key_metadata(None)
.build()
.unwrap(),
Expand All @@ -885,7 +886,7 @@ pub mod tests {
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.partition(Struct::from_iter([Some(Literal::long(1))]))
.build()
.unwrap(),
)
Expand All @@ -907,7 +908,7 @@ pub mod tests {
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.partition(Struct::from_iter([Some(Literal::long(1))]))
.build()
.unwrap(),
)
Expand Down Expand Up @@ -2025,8 +2026,6 @@ pub mod tests {

#[tokio::test]
async fn test_select_with_file_column() {
use arrow_array::cast::AsArray;

let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

Expand Down Expand Up @@ -2070,7 +2069,7 @@ pub mod tests {
// Decode the RunArray to verify it contains the file path
let run_array = file_col
.as_any()
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
.downcast_ref::<RunArray<Int32Type>>()
.expect("_file column should be a RunArray");

let values = run_array.values();
Expand Down Expand Up @@ -2369,4 +2368,114 @@ pub mod tests {
// Assert it finished (didn't timeout)
assert!(result.is_ok(), "Scan timed out - deadlock detected");
}

#[tokio::test]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both new tests assert 0 on a single-spec table. The whole point of _spec_id (https://iceberg.apache.org/spec/#reserved-field-ids) is distinguishing files written under different specs. A partition-evolved fixture where two files carry different spec ids would validate that directly. Would that be easy to add on top of the existing fixtures?

async fn test_select_with_spec_id_column() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

// Select regular columns plus the _spec_id column
let table_scan = fixture
.table
.scan()
.select(["x", RESERVED_COL_NAME_SPEC_ID, "z"])
.with_row_selection_enabled(true)
.build()
.unwrap();

let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

// Verify we have 3 columns: x, _spec_id, and z
assert_eq!(batches[0].num_columns(), 3);

// Verify the x column exists and has correct data
let col1 = batches[0].column_by_name("x").unwrap();
let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);

// Verify the _spec_id column exists
let spec_id_col = batches[0].column_by_name(RESERVED_COL_NAME_SPEC_ID);
assert!(
spec_id_col.is_some(),
"_spec_id column should be present in the batch"
);

// Verify the _spec_id data type
let spec_id_col = spec_id_col.unwrap();
assert!(
matches!(
spec_id_col.data_type(),
arrow_schema::DataType::RunEndEncoded(_, _)
),
"_spec_id column should use RunEndEncoded type"
);

// Decode the RunArray to verify it contains the spec id
let run_array = spec_id_col
.as_any()
.downcast_ref::<RunArray<Int32Type>>()
.expect("_spec_id column should be a RunArray");

let values = run_array.values();
let int_values = values.as_primitive::<Int32Type>();
assert_eq!(int_values.len(), 1, "Should have a single _spec_id");

let spec_id = int_values.value(0);
assert_eq!(spec_id, 0, "_spec_id should be 0, got: {spec_id}");

// Verify 'z' column exists
assert!(batches[0].column_by_name("z").is_some());
}

#[tokio::test]
async fn test_select_with_spec_id_column_from_unpartitioned_table() {
let mut fixture = TableTestFixture::new_unpartitioned();
fixture.setup_unpartitioned_manifest_files().await;

// Select regular columns plus the _spec_id column
let table_scan = fixture
.table
.scan()
.select(["x", RESERVED_COL_NAME_SPEC_ID])
.with_row_selection_enabled(true)
.build()
.unwrap();

let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

// Verify we have 2 columns: x and _spec_id
assert_eq!(batches[0].num_columns(), 2);

// Verify the _spec_id column exists
let spec_id_col = batches[0].column_by_name(RESERVED_COL_NAME_SPEC_ID);
assert!(
spec_id_col.is_some(),
"_spec_id column should be present in the batch"
);

// Verify the _spec_id data type
let spec_id_col = spec_id_col.unwrap();
assert!(
matches!(
spec_id_col.data_type(),
arrow_schema::DataType::RunEndEncoded(_, _)
),
"_spec_id column should use RunEndEncoded type"
);

// Decode the RunArray to verify it contains the spec id
let run_array = spec_id_col
.as_any()
.downcast_ref::<RunArray<Int32Type>>()
.expect("_spec_id column should be a RunArray");

let values = run_array.values();
let int_values = values.as_primitive::<Int32Type>();
assert_eq!(int_values.len(), 1, "Should have a single _spec_id");

let spec_id = int_values.value(0);
assert_eq!(spec_id, 0, "_spec_id should be 0, got: {spec_id}");
}
}
Loading