diff --git a/crates/integrations/datafusion/public-api.txt b/crates/integrations/datafusion/public-api.txt index d24bd9fc9e..11d4b3b915 100644 --- a/crates/integrations/datafusion/public-api.txt +++ b/crates/integrations/datafusion/public-api.txt @@ -105,6 +105,25 @@ impl datafusion_catalog::catalog::CatalogProvider for iceberg_datafusion::Iceber pub fn iceberg_datafusion::IcebergCatalogProvider::as_any(&self) -> &dyn core::any::Any pub fn iceberg_datafusion::IcebergCatalogProvider::schema(&self, name: &str) -> core::option::Option> pub fn iceberg_datafusion::IcebergCatalogProvider::schema_names(&self) -> alloc::vec::Vec +#[non_exhaustive] pub struct iceberg_datafusion::IcebergDataFusionConfig +pub iceberg_datafusion::IcebergDataFusionConfig::enable_eager_scan_planning: bool +impl core::clone::Clone for iceberg_datafusion::IcebergDataFusionConfig +pub fn iceberg_datafusion::IcebergDataFusionConfig::clone(&self) -> iceberg_datafusion::IcebergDataFusionConfig +impl core::default::Default for iceberg_datafusion::IcebergDataFusionConfig +pub fn iceberg_datafusion::IcebergDataFusionConfig::default() -> Self +impl core::fmt::Debug for iceberg_datafusion::IcebergDataFusionConfig +pub fn iceberg_datafusion::IcebergDataFusionConfig::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_common::config::ConfigExtension for iceberg_datafusion::IcebergDataFusionConfig +pub const iceberg_datafusion::IcebergDataFusionConfig::PREFIX: &'static str +impl datafusion_common::config::ConfigField for iceberg_datafusion::IcebergDataFusionConfig +pub fn iceberg_datafusion::IcebergDataFusionConfig::set(&mut self, key: &str, value: &str) -> datafusion_common::error::Result<()> +pub fn iceberg_datafusion::IcebergDataFusionConfig::visit(&self, v: &mut V, _key_prefix: &str, _description: &'static str) +impl datafusion_common::config::ExtensionOptions for iceberg_datafusion::IcebergDataFusionConfig +pub fn iceberg_datafusion::IcebergDataFusionConfig::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::IcebergDataFusionConfig::as_any_mut(&mut self) -> &mut dyn core::any::Any +pub fn iceberg_datafusion::IcebergDataFusionConfig::cloned(&self) -> alloc::boxed::Box +pub fn iceberg_datafusion::IcebergDataFusionConfig::entries(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::IcebergDataFusionConfig::set(&mut self, key: &str, value: &str) -> datafusion_common::error::Result<()> pub struct iceberg_datafusion::IcebergStaticTableProvider impl iceberg_datafusion::IcebergStaticTableProvider pub async fn iceberg_datafusion::IcebergStaticTableProvider::try_new_from_table(table: iceberg::table::Table) -> iceberg::error::Result diff --git a/crates/integrations/datafusion/src/config.rs b/crates/integrations/datafusion/src/config.rs new file mode 100644 index 0000000000..39d5f99119 --- /dev/null +++ b/crates/integrations/datafusion/src/config.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::common::config::ConfigExtension; +use datafusion::common::extensions_options; + +extensions_options! { + /// Configuration options for Iceberg's DataFusion integration. + pub struct IcebergDataFusionConfig { + /// Plan Iceberg file scan tasks during TableProvider::scan(). + pub enable_eager_scan_planning: bool, default = false + } +} + +impl ConfigExtension for IcebergDataFusionConfig { + const PREFIX: &'static str = "iceberg"; +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 4b0ea8606d..3e415336d3 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -18,6 +18,9 @@ mod catalog; pub use catalog::*; +mod config; +pub use config::IcebergDataFusionConfig; + mod error; pub use error::*; diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index aeac30de32..d83b6b9c4f 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -21,6 +21,7 @@ pub(crate) mod metadata_scan; pub(crate) mod project; pub(crate) mod repartition; pub(crate) mod scan; +pub(crate) mod scan_planning; pub(crate) mod sort; pub(crate) mod write; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 36539ae503..6cf94c2b82 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -28,12 +28,12 @@ use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; -use datafusion::prelude::Expr; use futures::{Stream, TryStreamExt}; use iceberg::expr::Predicate; +use iceberg::scan::{FileScanTask, FileScanTaskStream}; use iceberg::table::Table; -use super::expr_to_predicate::convert_filters_to_predicate; +use super::scan_planning::{IcebergScanConfig, build_table_scan}; use crate::to_datafusion_error; /// Manages the scanning process of an Iceberg [`Table`], encapsulating the @@ -42,44 +42,40 @@ use crate::to_datafusion_error; pub struct IcebergTableScan { /// A table in the catalog. table: Table, - /// Snapshot of the table to scan. - snapshot_id: Option, + /// Snapshot, projection, output schema, and pushed predicates for this scan. + scan_config: IcebergScanConfig, /// Stores certain, often expensive to compute, /// plan properties used in query optimization. plan_properties: Arc, - /// Projection column names, None means all columns - projection: Option>, - /// Filters to apply to the table scan - predicates: Option, /// Optional limit on the number of rows to return limit: Option, + /// Pre-planned file scan tasks, grouped by partition. `None` keeps planning lazy. + file_task_groups: Option>>, } impl IcebergTableScan { - /// Creates a new [`IcebergTableScan`] object. pub(crate) fn new( table: Table, - snapshot_id: Option, - schema: ArrowSchemaRef, - projection: Option<&Vec>, - filters: &[Expr], + scan_config: IcebergScanConfig, limit: Option, + file_task_groups: Option>>, ) -> Self { - let output_schema = match projection { - None => schema.clone(), - Some(projection) => Arc::new(schema.project(projection).unwrap()), - }; - let plan_properties = Self::compute_properties(output_schema.clone()); - let projection = get_column_names(schema.clone(), projection); - let predicates = convert_filters_to_predicate(filters); + let partition_count = file_task_groups.as_ref().map_or(1, |groups| groups.len()); + let plan_properties = + IcebergTableScan::compute_properties(scan_config.output_schema(), partition_count); + let file_task_groups = file_task_groups.map(|groups| { + groups + .into_iter() + .map(Arc::<[FileScanTask]>::from) + .collect() + }); Self { table, - snapshot_id, + scan_config, plan_properties, - projection, - predicates, limit, + file_task_groups, } } @@ -88,15 +84,15 @@ impl IcebergTableScan { } pub fn snapshot_id(&self) -> Option { - self.snapshot_id + self.scan_config.snapshot_id() } pub fn projection(&self) -> Option<&[String]> { - self.projection.as_deref() + self.scan_config.column_names() } pub fn predicates(&self) -> Option<&Predicate> { - self.predicates.as_ref() + self.scan_config.predicates() } pub fn limit(&self) -> Option { @@ -104,13 +100,10 @@ impl IcebergTableScan { } /// Computes [`PlanProperties`] used in query optimization. - fn compute_properties(schema: ArrowSchemaRef) -> Arc { - // TODO: - // This is more or less a placeholder, to be replaced - // once we support output-partitioning + fn compute_properties(schema: ArrowSchemaRef, partition_count: usize) -> Arc { Arc::new(PlanProperties::new( EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(1), + Partitioning::UnknownPartitioning(partition_count), EmissionType::Incremental, Boundedness::Bounded, )) @@ -146,13 +139,55 @@ impl ExecutionPlan for IcebergTableScan { _partition: usize, _context: Arc, ) -> DFResult { - let fut = get_batch_stream( - self.table.clone(), - self.snapshot_id, - self.projection.clone(), - self.predicates.clone(), - ); - let stream = futures::stream::once(fut).try_flatten(); + let stream: Pin> + Send>> = match &self + .file_task_groups + { + Some(file_task_groups) => { + let Some(file_task_group) = file_task_groups.get(_partition).cloned() else { + return Err(datafusion::common::DataFusionError::Internal(format!( + "IcebergTableScan partition {_partition} does not exist; scan has {} partitions", + file_task_groups.len() + ))); + }; + + let task_count = file_task_group.len(); + let tasks: FileScanTaskStream = Box::pin(futures::stream::iter( + (0..task_count).map(move |idx| Ok(file_task_group[idx].clone())), + )); + let stream = self + .table + .reader_builder() + // Eager planning lets DataFusion drive scan concurrency via output + // partitions. Match DataFusion's FileStream model, where each + // output partition owns one ScanState; keep one data file in + // flight per output partition here. + // https://github.com/apache/datafusion/blob/ad8e7b7f2babe3fcddc3a4f9b5cd1ac0d1b16ad9/datafusion/datasource/src/file_stream/scan_state.rs#L42-L43 + .with_data_file_concurrency_limit(1) + .build() + // TODO: Avoid cloning FileScanTasks here once ArrowReader can accept shared tasks. + .read(tasks) + .map_err(to_datafusion_error)? + .stream() + .map_err(to_datafusion_error); + + Box::pin(stream) + } + None => { + let table = self.table.clone(); + let scan_config = self.scan_config.clone(); + let fut = async move { + let table_scan = build_table_scan(&table, &scan_config)?; + let stream = table_scan + .to_arrow() + .await + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error); + Ok::<_, datafusion::common::DataFusionError>(stream) + }; + + Box::pin(futures::stream::once(fut).try_flatten()) + } + }; // Apply limit if specified let limited_stream: Pin> + Send>> = @@ -190,60 +225,22 @@ impl DisplayAs for IcebergTableScan { write!( f, "IcebergTableScan projection:[{}] predicate:[{}]", - self.projection - .clone() - .map_or(String::new(), |v| v.join(",")), - self.predicates - .clone() - .map_or(String::from(""), |p| format!("{p}")) + self.projection().map_or(String::new(), |v| v.join(",")), + self.predicates() + .map_or(String::from(""), |p| format!("{p}")), )?; + if let Some(file_task_groups) = &self.file_task_groups { + let task_count: usize = file_task_groups.iter().map(|group| group.len()).sum(); + write!( + f, + " task_groups:[{}] tasks:[{}]", + file_task_groups.len(), + task_count, + )?; + } if let Some(limit) = self.limit { write!(f, " limit:[{limit}]")?; } Ok(()) } } - -/// Asynchronously retrieves a stream of [`RecordBatch`] instances -/// from a given table. -/// -/// This function initializes a [`TableScan`], builds it, -/// and then converts it into a stream of Arrow [`RecordBatch`]es. -async fn get_batch_stream( - table: Table, - snapshot_id: Option, - column_names: Option>, - predicates: Option, -) -> DFResult> + Send>>> { - let scan_builder = match snapshot_id { - Some(snapshot_id) => table.scan().snapshot_id(snapshot_id), - None => table.scan(), - }; - - let mut scan_builder = match column_names { - Some(column_names) => scan_builder.select(column_names), - None => scan_builder.select_all(), - }; - if let Some(pred) = predicates { - scan_builder = scan_builder.with_filter(pred); - } - let table_scan = scan_builder.build().map_err(to_datafusion_error)?; - - let stream = table_scan - .to_arrow() - .await - .map_err(to_datafusion_error)? - .map_err(to_datafusion_error); - Ok(Box::pin(stream)) -} - -fn get_column_names( - schema: ArrowSchemaRef, - projection: Option<&Vec>, -) -> Option> { - projection.map(|v| { - v.iter() - .map(|p| schema.field(*p).name().clone()) - .collect::>() - }) -} diff --git a/crates/integrations/datafusion/src/physical_plan/scan_planning.rs b/crates/integrations/datafusion/src/physical_plan/scan_planning.rs new file mode 100644 index 0000000000..181e95fbf2 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/scan_planning.rs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::error::Result as DFResult; +use datafusion::prelude::Expr; +use futures::TryStreamExt; +use iceberg::expr::Predicate; +use iceberg::scan::{FileScanTask, TableScan}; +use iceberg::table::Table; + +use super::expr_to_predicate::convert_filters_to_predicate; +use crate::to_datafusion_error; + +#[derive(Debug, Clone)] +pub(crate) struct IcebergScanConfig { + /// Snapshot of the table to scan. + snapshot_id: Option, + /// Output schema after projection. + output_schema: ArrowSchemaRef, + /// Projection column names, None means all columns. + column_names: Option>, + /// Filters to apply to the table scan. + predicates: Option, +} + +impl IcebergScanConfig { + pub(crate) fn new( + schema: ArrowSchemaRef, + snapshot_id: Option, + projection: Option<&Vec>, + filters: &[Expr], + ) -> Self { + let output_schema = match projection { + None => schema.clone(), + Some(projection) => Arc::new(schema.project(projection).unwrap()), + }; + + Self { + snapshot_id, + output_schema, + column_names: get_column_names(schema, projection), + predicates: convert_filters_to_predicate(filters), + } + } + + pub(crate) fn snapshot_id(&self) -> Option { + self.snapshot_id + } + + pub(crate) fn output_schema(&self) -> ArrowSchemaRef { + self.output_schema.clone() + } + + pub(crate) fn column_names(&self) -> Option<&[String]> { + self.column_names.as_deref() + } + + pub(crate) fn predicates(&self) -> Option<&Predicate> { + self.predicates.as_ref() + } +} + +pub(crate) async fn plan_file_task_groups( + table: &Table, + scan_config: &IcebergScanConfig, + target_partitions: usize, +) -> DFResult>> { + // Do not cache planned FileScanTasks in the provider in v1. They are query-specific + // because projection, predicate binding, snapshot schema, and delete planning can differ + // between scans. Catalog-backed providers also need fresh metadata on each scan. + // TODO: Revisit provider-level caching for static tables with a precise cache key. + let tasks: Vec = build_table_scan(table, scan_config)? + .plan_files() + .await + .map_err(to_datafusion_error)? + .try_collect::>() + .await + .map_err(to_datafusion_error)?; + + Ok(group_file_scan_tasks_round_robin(tasks, target_partitions)) +} + +fn get_column_names( + schema: ArrowSchemaRef, + projection: Option<&Vec>, +) -> Option> { + projection.map(|v| { + v.iter() + .map(|p| schema.field(*p).name().clone()) + .collect::>() + }) +} + +/// Groups file scan tasks into `target_partitions` groups using a naive +/// round-robin assignment. `target_partitions` is clamped to a minimum of 1. +// TODO: Replace this naive round-robin grouping with size-based grouping once the +// first parallel scan path is stable. Keep this v1 simple and deterministic. +fn group_file_scan_tasks_round_robin( + tasks: Vec, + target_partitions: usize, +) -> Vec> { + if tasks.is_empty() { + return vec![vec![]]; + } + + let target_partitions = target_partitions.max(1); + + let mut groups: Vec> = vec![Vec::new(); target_partitions]; + for (i, task) in tasks.into_iter().enumerate() { + groups[i % target_partitions].push(task); + } + + groups.retain(|group| !group.is_empty()); + groups +} + +pub(crate) fn build_table_scan( + table: &Table, + scan_config: &IcebergScanConfig, +) -> DFResult { + let builder = match scan_config.snapshot_id { + Some(id) => table.scan().snapshot_id(id), + None => table.scan(), + }; + let mut builder = match scan_config.column_names.clone() { + Some(names) => builder.select(names), + None => builder.select_all(), + }; + if let Some(pred) = scan_config.predicates.clone() { + builder = builder.with_filter(pred); + } + builder.build().map_err(to_datafusion_error) +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..4f9c368867 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -49,14 +49,45 @@ use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use metadata_table::IcebergMetadataTableProvider; +use crate::IcebergDataFusionConfig; use crate::error::to_datafusion_error; use crate::physical_plan::commit::IcebergCommitExec; use crate::physical_plan::project::project_with_partition; use crate::physical_plan::repartition::repartition; use crate::physical_plan::scan::IcebergTableScan; +use crate::physical_plan::scan_planning::{IcebergScanConfig, plan_file_task_groups}; use crate::physical_plan::sort::sort_by_partition; use crate::physical_plan::write::IcebergWriteExec; +fn enable_eager_scan_planning(state: &dyn Session) -> bool { + state + .config() + .options() + .extensions + .get::() + .is_some_and(|config| config.enable_eager_scan_planning) +} + +async fn create_scan_plan( + state: &dyn Session, + table: Table, + scan_config: IcebergScanConfig, + limit: Option, +) -> DFResult> { + let file_task_groups = if enable_eager_scan_planning(state) { + Some(plan_file_task_groups(&table, &scan_config, state.config().target_partitions()).await?) + } else { + None + }; + + Ok(Arc::new(IcebergTableScan::new( + table, + scan_config, + limit, + file_task_groups, + ))) +} + /// Catalog-backed table provider with automatic metadata refresh. /// /// This provider loads fresh table metadata from the catalog on every scan and write @@ -136,15 +167,15 @@ impl TableProvider for IcebergTableProvider { .await .map_err(to_datafusion_error)?; - // Create scan with fresh metadata (always use current snapshot) - Ok(Arc::new(IcebergTableScan::new( - table, - None, // Always use current snapshot for catalog-backed provider + let scan_config = IcebergScanConfig::new( self.schema.clone(), + None, // Always use current snapshot for catalog-backed provider. projection, filters, - limit, - ))) + ); + + // Create scan with fresh metadata (always use current snapshot) + create_scan_plan(_state, table, scan_config, limit).await } fn supports_filters_pushdown( @@ -314,15 +345,11 @@ impl TableProvider for IcebergStaticTableProvider { filters: &[Expr], limit: Option, ) -> DFResult> { + let scan_config = + IcebergScanConfig::new(self.schema.clone(), self.snapshot_id, projection, filters); + // Use cached table (no refresh) - Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), - self.snapshot_id, - self.schema.clone(), - projection, - filters, - limit, - ))) + create_scan_plan(_state, self.table.clone(), scan_config, limit).await } fn supports_filters_pushdown( @@ -364,6 +391,7 @@ mod tests { use tempfile::TempDir; use super::*; + use crate::physical_plan::scan::IcebergTableScan; async fn get_test_table_from_metadata_file() -> Table { let metadata_file_name = "TableMetadataV2Valid.json"; diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index cebac75dd9..fcc7826f92 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -21,10 +21,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::vec; -use datafusion::arrow::array::{Array, StringArray, UInt64Array}; +use datafusion::arrow::array::{Array, ArrayRef, Int32Array, StringArray, UInt64Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use datafusion::prelude::SessionConfig; use expect_test::expect; use iceberg::io::LocalFsStorageFactory; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; @@ -35,7 +38,8 @@ use iceberg::test_utils::check_record_batches; use iceberg::{ Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation, TableIdent, }; -use iceberg_datafusion::IcebergCatalogProvider; +use iceberg_datafusion::physical_plan::IcebergTableScan; +use iceberg_datafusion::{IcebergCatalogProvider, IcebergDataFusionConfig}; use tempfile::TempDir; fn temp_path() -> String { @@ -95,6 +99,238 @@ fn get_table_creation( Ok(creation) } +async fn get_multi_file_table_context( + namespace_name: &str, + table_name: &str, + data_file_count: usize, +) -> Result<(Arc, NamespaceIdent, String)> { + let iceberg_catalog = Arc::new(get_iceberg_catalog().await); + let namespace = NamespaceIdent::new(namespace_name.to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + let creation = get_table_creation(temp_path(), table_name, None)?; + iceberg_catalog.create_table(&namespace, creation).await?; + + let write_ctx = SessionContext::new_with_config( + SessionConfig::new().with_target_partitions(data_file_count), + ); + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("foo1", DataType::Int32, false), + Field::new("foo2", DataType::Utf8, false), + ])); + + let batches: Vec = (1..=data_file_count as i32) + .map(|idx| { + RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![idx])) as ArrayRef, + Arc::new(StringArray::from(vec![format!("row-{idx}")])) as ArrayRef, + ]) + }) + .collect::>()?; + + let partitions = batches.into_iter().map(|batch| vec![batch]).collect(); + let source_table = Arc::new(MemTable::try_new(arrow_schema, partitions).unwrap()); + write_ctx + .register_table("source_table", source_table) + .unwrap(); + + let catalog = Arc::new(IcebergCatalogProvider::try_new(iceberg_catalog.clone()).await?); + write_ctx.register_catalog("catalog", catalog); + + let insert_sql = + format!("INSERT INTO catalog.{namespace_name}.{table_name} SELECT * FROM source_table"); + let batches = write_ctx + .sql(&insert_sql) + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(batches.len(), 1); + + let rows_inserted = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(rows_inserted.value(0), data_file_count as u64); + + Ok((iceberg_catalog, namespace, table_name.to_string())) +} + +async fn get_read_context( + catalog: Arc, + target_partitions: usize, + enable_eager_scan_planning: Option, +) -> Result { + let ctx = SessionContext::new_with_config(read_session_config( + target_partitions, + enable_eager_scan_planning, + )); + let catalog = Arc::new(IcebergCatalogProvider::try_new(catalog).await?); + ctx.register_catalog("catalog", catalog); + Ok(ctx) +} + +fn read_session_config( + target_partitions: usize, + enable_eager_scan_planning: Option, +) -> SessionConfig { + let config = SessionConfig::new().with_target_partitions(target_partitions); + + match enable_eager_scan_planning { + Some(enabled) => { + let mut iceberg_config = IcebergDataFusionConfig::default(); + iceberg_config.enable_eager_scan_planning = enabled; + config.with_option_extension(iceberg_config) + } + None => config, + } +} + +async fn scan_partition_count( + ctx: &SessionContext, + namespace: &NamespaceIdent, + table_name: &str, +) -> usize { + let provider = ctx.catalog("catalog").unwrap(); + let namespace_name = &namespace[0]; + let schema = provider.schema(namespace_name).unwrap(); + let table = schema.table(table_name).await.unwrap().unwrap(); + + let state = ctx.state(); + let plan = table.scan(&state, None, &[], None).await.unwrap(); + plan.as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + plan.properties().output_partitioning().partition_count() +} + +#[tokio::test] +async fn test_multi_file_scan_produces_multiple_partitions() -> Result<()> { + let data_file_count = 3; + // Ask for more partitions than files to verify scan planning does not expose empty partitions. + let target_partitions = data_file_count + 1; + let (iceberg_catalog, namespace, table_name) = get_multi_file_table_context( + "test_multi_file_scan_partitions", + "my_table", + data_file_count, + ) + .await?; + let ctx = get_read_context(iceberg_catalog, target_partitions, Some(true)).await?; + let actual_partition_count = scan_partition_count(&ctx, &namespace, &table_name).await; + + assert_eq!(actual_partition_count, data_file_count); + + Ok(()) +} + +#[tokio::test] +async fn test_multi_file_scan_defaults_to_single_lazy_partition() -> Result<()> { + let data_file_count = 3; + let target_partitions = data_file_count + 1; + let (iceberg_catalog, namespace, table_name) = get_multi_file_table_context( + "test_multi_file_scan_default_lazy", + "my_table", + data_file_count, + ) + .await?; + let ctx = get_read_context(iceberg_catalog, target_partitions, None).await?; + + let actual_partition_count = scan_partition_count(&ctx, &namespace, &table_name).await; + + assert_eq!(actual_partition_count, 1); + + Ok(()) +} + +#[tokio::test] +async fn test_set_enable_eager_scan_planning() -> Result<()> { + let data_file_count = 3; + let target_partitions = data_file_count + 1; + let (iceberg_catalog, namespace, table_name) = + get_multi_file_table_context("test_set_eager_scan_planning", "my_table", data_file_count) + .await?; + let ctx = get_read_context(iceberg_catalog, target_partitions, Some(false)).await?; + + ctx.sql("SET iceberg.enable_eager_scan_planning = true") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let actual_partition_count = scan_partition_count(&ctx, &namespace, &table_name).await; + + assert_eq!(actual_partition_count, data_file_count); + + Ok(()) +} + +#[tokio::test] +async fn test_multi_partition_scan_matches_single_partition_results() -> Result<()> { + let data_file_count = 3; + let target_partitions = data_file_count + 1; + let (iceberg_catalog, namespace, table_name) = get_multi_file_table_context( + "test_multi_partition_scan_results", + "my_table", + data_file_count, + ) + .await?; + let namespace_name = &namespace[0]; + + let single_partition_ctx = get_read_context(iceberg_catalog.clone(), 1, Some(true)).await?; + let multi_partition_ctx = + get_read_context(iceberg_catalog, target_partitions, Some(true)).await?; + + let query = + format!("SELECT foo1, foo2 FROM catalog.{namespace_name}.{table_name} ORDER BY foo1"); + + let single_partition_batches = single_partition_ctx + .sql(&query) + .await + .unwrap() + .collect() + .await + .unwrap(); + let multi_partition_batches = multi_partition_ctx + .sql(&query) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let assert_expected_batches = |batches| { + check_record_batches( + batches, + expect![[r#" + Field { "foo1": Int32, metadata: {"PARQUET:field_id": "1"} }, + Field { "foo2": Utf8, metadata: {"PARQUET:field_id": "2"} }"#]], + expect![[r#" + foo1: PrimitiveArray + [ + 1, + 2, + 3, + ], + foo2: StringArray + [ + "row-1", + "row-2", + "row-3", + ]"#]], + &[], + None, + ); + }; + + assert_expected_batches(single_partition_batches); + assert_expected_batches(multi_partition_batches); + + Ok(()) +} + #[tokio::test] async fn test_provider_plan_stream_schema() -> Result<()> { let iceberg_catalog = get_iceberg_catalog().await;