Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions crates/integrations/datafusion/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ impl datafusion_catalog::catalog::CatalogProvider for iceberg_datafusion::Iceber
pub fn iceberg_datafusion::IcebergCatalogProvider::as_any(&self) -> &dyn core::any::Any
pub fn iceberg_datafusion::IcebergCatalogProvider::schema(&self, name: &str) -> core::option::Option<alloc::sync::Arc<dyn datafusion_catalog::schema::SchemaProvider>>
pub fn iceberg_datafusion::IcebergCatalogProvider::schema_names(&self) -> alloc::vec::Vec<alloc::string::String>
#[non_exhaustive] pub struct iceberg_datafusion::IcebergDataFusionConfig
pub iceberg_datafusion::IcebergDataFusionConfig::enable_eager_scan_planning: bool
impl core::clone::Clone for iceberg_datafusion::IcebergDataFusionConfig
pub fn iceberg_datafusion::IcebergDataFusionConfig::clone(&self) -> iceberg_datafusion::IcebergDataFusionConfig
impl core::default::Default for iceberg_datafusion::IcebergDataFusionConfig
pub fn iceberg_datafusion::IcebergDataFusionConfig::default() -> Self
impl core::fmt::Debug for iceberg_datafusion::IcebergDataFusionConfig
pub fn iceberg_datafusion::IcebergDataFusionConfig::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl datafusion_common::config::ConfigExtension for iceberg_datafusion::IcebergDataFusionConfig
pub const iceberg_datafusion::IcebergDataFusionConfig::PREFIX: &'static str
impl datafusion_common::config::ConfigField for iceberg_datafusion::IcebergDataFusionConfig
pub fn iceberg_datafusion::IcebergDataFusionConfig::set(&mut self, key: &str, value: &str) -> datafusion_common::error::Result<()>
pub fn iceberg_datafusion::IcebergDataFusionConfig::visit<V: datafusion_common::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str)
impl datafusion_common::config::ExtensionOptions for iceberg_datafusion::IcebergDataFusionConfig
pub fn iceberg_datafusion::IcebergDataFusionConfig::as_any(&self) -> &dyn core::any::Any
pub fn iceberg_datafusion::IcebergDataFusionConfig::as_any_mut(&mut self) -> &mut dyn core::any::Any
pub fn iceberg_datafusion::IcebergDataFusionConfig::cloned(&self) -> alloc::boxed::Box<dyn datafusion_common::config::ExtensionOptions>
pub fn iceberg_datafusion::IcebergDataFusionConfig::entries(&self) -> alloc::vec::Vec<datafusion_common::config::ConfigEntry>
pub fn iceberg_datafusion::IcebergDataFusionConfig::set(&mut self, key: &str, value: &str) -> datafusion_common::error::Result<()>
pub struct iceberg_datafusion::IcebergStaticTableProvider
impl iceberg_datafusion::IcebergStaticTableProvider
pub async fn iceberg_datafusion::IcebergStaticTableProvider::try_new_from_table(table: iceberg::table::Table) -> iceberg::error::Result<Self>
Expand Down
31 changes: 31 additions & 0 deletions crates/integrations/datafusion/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::common::config::ConfigExtension;
use datafusion::common::extensions_options;

extensions_options! {
/// Configuration options for Iceberg's DataFusion integration.
pub struct IcebergDataFusionConfig {
/// Plan Iceberg file scan tasks during TableProvider::scan().
pub enable_eager_scan_planning: bool, default = false
}
}

impl ConfigExtension for IcebergDataFusionConfig {
const PREFIX: &'static str = "iceberg";
}
3 changes: 3 additions & 0 deletions crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
mod catalog;
pub use catalog::*;

mod config;
pub use config::IcebergDataFusionConfig;

mod error;
pub use error::*;

Expand Down
1 change: 1 addition & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) mod metadata_scan;
pub(crate) mod project;
pub(crate) mod repartition;
pub(crate) mod scan;
pub(crate) mod scan_planning;
pub(crate) mod sort;
pub(crate) mod write;

Expand Down
173 changes: 85 additions & 88 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
use datafusion::prelude::Expr;
use futures::{Stream, TryStreamExt};
use iceberg::expr::Predicate;
use iceberg::scan::{FileScanTask, FileScanTaskStream};
use iceberg::table::Table;

use super::expr_to_predicate::convert_filters_to_predicate;
use super::scan_planning::{IcebergScanConfig, build_table_scan};
use crate::to_datafusion_error;

/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
Expand All @@ -42,44 +42,40 @@ use crate::to_datafusion_error;
pub struct IcebergTableScan {
/// A table in the catalog.
table: Table,
/// Snapshot of the table to scan.
snapshot_id: Option<i64>,
/// Snapshot, projection, output schema, and pushed predicates for this scan.
scan_config: IcebergScanConfig,
/// Stores certain, often expensive to compute,
/// plan properties used in query optimization.
plan_properties: Arc<PlanProperties>,
/// Projection column names, None means all columns
projection: Option<Vec<String>>,
/// Filters to apply to the table scan
predicates: Option<Predicate>,
/// Optional limit on the number of rows to return
limit: Option<usize>,
/// Pre-planned file scan tasks, grouped by partition. `None` keeps planning lazy.
file_task_groups: Option<Vec<Arc<[FileScanTask]>>>,
}

impl IcebergTableScan {
/// Creates a new [`IcebergTableScan`] object.
pub(crate) fn new(
table: Table,
snapshot_id: Option<i64>,
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
filters: &[Expr],
scan_config: IcebergScanConfig,
limit: Option<usize>,
file_task_groups: Option<Vec<Vec<FileScanTask>>>,
) -> Self {
let output_schema = match projection {
None => schema.clone(),
Some(projection) => Arc::new(schema.project(projection).unwrap()),
};
let plan_properties = Self::compute_properties(output_schema.clone());
let projection = get_column_names(schema.clone(), projection);
let predicates = convert_filters_to_predicate(filters);
let partition_count = file_task_groups.as_ref().map_or(1, |groups| groups.len());
let plan_properties =
IcebergTableScan::compute_properties(scan_config.output_schema(), partition_count);
let file_task_groups = file_task_groups.map(|groups| {
groups
.into_iter()
.map(Arc::<[FileScanTask]>::from)
.collect()
});

Self {
table,
snapshot_id,
scan_config,
plan_properties,
projection,
predicates,
limit,
file_task_groups,
}
}

Expand All @@ -88,29 +84,26 @@ impl IcebergTableScan {
}

pub fn snapshot_id(&self) -> Option<i64> {
self.snapshot_id
self.scan_config.snapshot_id()
}

pub fn projection(&self) -> Option<&[String]> {
self.projection.as_deref()
self.scan_config.column_names()
}

pub fn predicates(&self) -> Option<&Predicate> {
self.predicates.as_ref()
self.scan_config.predicates()
}

pub fn limit(&self) -> Option<usize> {
self.limit
}

/// Computes [`PlanProperties`] used in query optimization.
fn compute_properties(schema: ArrowSchemaRef) -> Arc<PlanProperties> {
// TODO:
// This is more or less a placeholder, to be replaced
// once we support output-partitioning
fn compute_properties(schema: ArrowSchemaRef, partition_count: usize) -> Arc<PlanProperties> {
Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
Partitioning::UnknownPartitioning(partition_count),
EmissionType::Incremental,
Boundedness::Bounded,
))
Expand Down Expand Up @@ -146,13 +139,55 @@ impl ExecutionPlan for IcebergTableScan {
_partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let fut = get_batch_stream(
self.table.clone(),
self.snapshot_id,
self.projection.clone(),
self.predicates.clone(),
);
let stream = futures::stream::once(fut).try_flatten();
let stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> = match &self
.file_task_groups
{
Some(file_task_groups) => {
let Some(file_task_group) = file_task_groups.get(_partition).cloned() else {
return Err(datafusion::common::DataFusionError::Internal(format!(
"IcebergTableScan partition {_partition} does not exist; scan has {} partitions",
file_task_groups.len()
)));
};

let task_count = file_task_group.len();
let tasks: FileScanTaskStream = Box::pin(futures::stream::iter(
(0..task_count).map(move |idx| Ok(file_task_group[idx].clone())),
));
let stream = self
.table
.reader_builder()
// Eager planning lets DataFusion drive scan concurrency via output
// partitions. Match DataFusion's FileStream model, where each
// output partition owns one ScanState; keep one data file in
// flight per output partition here.
// https://github.com/apache/datafusion/blob/ad8e7b7f2babe3fcddc3a4f9b5cd1ac0d1b16ad9/datafusion/datasource/src/file_stream/scan_state.rs#L42-L43
.with_data_file_concurrency_limit(1)
.build()
// TODO: Avoid cloning FileScanTasks here once ArrowReader can accept shared tasks.
.read(tasks)
.map_err(to_datafusion_error)?
.stream()
.map_err(to_datafusion_error);

Box::pin(stream)
}
None => {
let table = self.table.clone();
let scan_config = self.scan_config.clone();
let fut = async move {
let table_scan = build_table_scan(&table, &scan_config)?;
let stream = table_scan
.to_arrow()
.await
.map_err(to_datafusion_error)?
.map_err(to_datafusion_error);
Ok::<_, datafusion::common::DataFusionError>(stream)
};

Box::pin(futures::stream::once(fut).try_flatten())
}
};

// Apply limit if specified
let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> =
Expand Down Expand Up @@ -190,60 +225,22 @@ impl DisplayAs for IcebergTableScan {
write!(
f,
"IcebergTableScan projection:[{}] predicate:[{}]",
self.projection
.clone()
.map_or(String::new(), |v| v.join(",")),
self.predicates
.clone()
.map_or(String::from(""), |p| format!("{p}"))
self.projection().map_or(String::new(), |v| v.join(",")),
self.predicates()
.map_or(String::from(""), |p| format!("{p}")),
)?;
if let Some(file_task_groups) = &self.file_task_groups {
let task_count: usize = file_task_groups.iter().map(|group| group.len()).sum();
write!(
f,
" task_groups:[{}] tasks:[{}]",
file_task_groups.len(),
task_count,
)?;
}
if let Some(limit) = self.limit {
write!(f, " limit:[{limit}]")?;
}
Ok(())
}
}

/// Asynchronously retrieves a stream of [`RecordBatch`] instances
/// from a given table.
///
/// This function initializes a [`TableScan`], builds it,
/// and then converts it into a stream of Arrow [`RecordBatch`]es.
async fn get_batch_stream(
table: Table,
snapshot_id: Option<i64>,
column_names: Option<Vec<String>>,
predicates: Option<Predicate>,
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
let scan_builder = match snapshot_id {
Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
None => table.scan(),
};

let mut scan_builder = match column_names {
Some(column_names) => scan_builder.select(column_names),
None => scan_builder.select_all(),
};
if let Some(pred) = predicates {
scan_builder = scan_builder.with_filter(pred);
}
let table_scan = scan_builder.build().map_err(to_datafusion_error)?;

let stream = table_scan
.to_arrow()
.await
.map_err(to_datafusion_error)?
.map_err(to_datafusion_error);
Ok(Box::pin(stream))
}

fn get_column_names(
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
) -> Option<Vec<String>> {
projection.map(|v| {
v.iter()
.map(|p| schema.field(*p).name().clone())
.collect::<Vec<String>>()
})
}
Loading
Loading