From efbb12a49400181b9d6cfc49d895d776b6cbd282 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 15 Jun 2026 14:30:03 -0700 Subject: [PATCH 1/5] feat(catalog-rest): add server-side REST scan planning (Variant B: ScanPlanner injection) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the client side of the Iceberg REST scan-planning protocol (planTableScan / fetchPlanningResult / fetchScanTasks) so that, when a REST catalog advertises the planning endpoints, table scans delegate planning to the server instead of reading manifests locally. Injection design — Variant B (narrow capability trait): - New `iceberg::scan::ScanPlanner` trait + `ScanPlanRequest`; `Table`/ `TableScanBuilder` gain an optional `Arc`; `TableScan:: plan_files()` delegates to it and falls back to native planning on `FeatureUnsupported`. The core `Catalog` trait is left untouched. REST crate: - `scan_planning` module: wire DTOs, endpoint negotiation (parses the `endpoints` field of /v1/config and gates the scan-plan calls), the async submit/poll/fetch state machine with exponential backoff and best-effort Drop-based cancel, and conversion of wire content-files into FileScanTasks via their public builders (no DataFile internals needed). - `RestScanPlanner` is attached to every table the catalog returns. - Per-task row predicate is the client's own bound scan filter (correct), and the scan filter is pushed down as Iceberg expression JSON when encodable. Tested: DTO/endpoint/expr unit tests, conversion, and end-to-end mockito tests for completed-inline, submitted-then-polled, and plan-task fan-out paths. DataFusion needs no changes. --- Cargo.lock | 1 + crates/catalog/rest/Cargo.toml | 1 + crates/catalog/rest/src/catalog.rs | 284 +++++++++++++++- crates/catalog/rest/src/lib.rs | 2 + .../catalog/rest/src/scan_planning/convert.rs | 181 ++++++++++ .../rest/src/scan_planning/endpoint.rs | 163 +++++++++ crates/catalog/rest/src/scan_planning/expr.rs | 160 +++++++++ crates/catalog/rest/src/scan_planning/mod.rs | 57 ++++ .../catalog/rest/src/scan_planning/planner.rs | 83 +++++ .../catalog/rest/src/scan_planning/stream.rs | 316 ++++++++++++++++++ .../catalog/rest/src/scan_planning/types.rs | 270 +++++++++++++++ crates/catalog/rest/src/types.rs | 7 +- crates/iceberg/src/scan/mod.rs | 73 +++- crates/iceberg/src/scan/planner.rs | 116 +++++++ crates/iceberg/src/table.rs | 23 +- 15 files changed, 1729 insertions(+), 8 deletions(-) create mode 100644 crates/catalog/rest/src/scan_planning/convert.rs create mode 100644 crates/catalog/rest/src/scan_planning/endpoint.rs create mode 100644 crates/catalog/rest/src/scan_planning/expr.rs create mode 100644 crates/catalog/rest/src/scan_planning/mod.rs create mode 100644 crates/catalog/rest/src/scan_planning/planner.rs create mode 100644 crates/catalog/rest/src/scan_planning/stream.rs create mode 100644 crates/catalog/rest/src/scan_planning/types.rs create mode 100644 crates/iceberg/src/scan/planner.rs diff --git a/Cargo.lock b/Cargo.lock index 0a2f0a8206..a0be7b9a30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3873,6 +3873,7 @@ version = "0.9.0" dependencies = [ "async-trait", "chrono", + "futures", "http 1.4.0", "iceberg", "iceberg_test_utils", diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index e043c195ef..fe7e9aaf47 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -32,6 +32,7 @@ repository = { workspace = true } [dependencies] async-trait = { workspace = true } chrono = { workspace = true } +futures = { workspace = true } http = { workspace = true } iceberg = { workspace = true } itertools = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 0626ce5061..aabbf826e2 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -17,7 +17,7 @@ //! This module contains the iceberg REST catalog implementation. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::str::FromStr; use std::sync::Arc; @@ -40,6 +40,8 @@ use typed_builder::TypedBuilder; use crate::client::{ HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, }; +use crate::scan_planning::RestScanPlanner; +use crate::scan_planning::endpoint::Endpoint; use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest, CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult, @@ -215,6 +217,40 @@ impl RestCatalogConfig { ]) } + /// `POST .../tables/{table}/plan` — submit a scan for server-side planning. + pub(crate) fn scan_plan_endpoint(&self, table: &TableIdent) -> String { + self.url_prefixed(&[ + "namespaces", + &table.namespace.to_url_string(), + "tables", + &table.name, + "plan", + ]) + } + + /// `.../tables/{table}/plan/{plan-id}` — poll or cancel an async plan. + pub(crate) fn scan_plan_id_endpoint(&self, table: &TableIdent, plan_id: &str) -> String { + self.url_prefixed(&[ + "namespaces", + &table.namespace.to_url_string(), + "tables", + &table.name, + "plan", + plan_id, + ]) + } + + /// `POST .../tables/{table}/tasks` — fetch tasks for a plan-task token. + pub(crate) fn scan_tasks_endpoint(&self, table: &TableIdent) -> String { + self.url_prefixed(&[ + "namespaces", + &table.namespace.to_url_string(), + "tables", + &table.name, + "tasks", + ]) + } + /// Get the client from the config. pub(crate) fn client(&self) -> Option { self.client.clone() @@ -342,11 +378,15 @@ impl RestCatalogConfig { #[derive(Debug)] struct RestContext { - client: HttpClient, + client: Arc, /// Runtime config is fetched from rest server and stored here. /// /// It's could be different from the user config. config: RestCatalogConfig, + /// Endpoints advertised by the server, used for capability negotiation + /// (e.g. gating server-side scan planning). Empty when the server does not + /// advertise an `endpoints` list. + endpoints: Arc>, } /// Rest catalog implementation. @@ -412,10 +452,16 @@ impl RestCatalog { .get_or_try_init(|| async { let client = HttpClient::new(&self.user_config)?; let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?; + let endpoints: HashSet = + catalog_config.endpoints.iter().cloned().collect(); let config = self.user_config.clone().merge_with_config(catalog_config); let client = client.update_with(&config)?; - Ok(RestContext { config, client }) + Ok(RestContext { + config, + client: Arc::new(client), + endpoints: Arc::new(endpoints), + }) }) .await } @@ -488,6 +534,17 @@ impl RestCatalog { Ok(file_io) } + /// Builds a [`RestScanPlanner`] handle to attach to tables so their scans + /// can delegate planning to the server. + fn scan_planner(&self, context: &RestContext) -> Arc { + Arc::new(RestScanPlanner::new( + context.client.clone(), + context.config.clone(), + context.endpoints.clone(), + self.runtime.clone(), + )) + } + /// Invalidate the current token without generating a new one. On the next request, the client /// will attempt to generate a new token. pub async fn invalidate_token(&self) -> Result<()> { @@ -805,7 +862,8 @@ impl Catalog for RestCatalog { .identifier(table_ident.clone()) .file_io(file_io) .metadata(response.metadata) - .runtime(self.runtime.clone()); + .runtime(self.runtime.clone()) + .scan_planner(self.scan_planner(context)); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -862,7 +920,8 @@ impl Catalog for RestCatalog { .identifier(table_ident.clone()) .file_io(file_io) .metadata(response.metadata) - .runtime(self.runtime.clone()); + .runtime(self.runtime.clone()) + .scan_planner(self.scan_planner(context)); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -999,6 +1058,7 @@ impl Catalog for RestCatalog { .metadata(response.metadata) .metadata_location(metadata_location.clone()) .runtime(self.runtime.clone()) + .scan_planner(self.scan_planner(context)) .build() } @@ -1072,6 +1132,7 @@ impl Catalog for RestCatalog { .metadata(response.metadata) .metadata_location(response.metadata_location) .runtime(self.runtime.clone()) + .scan_planner(self.scan_planner(context)) .build() } } @@ -2346,6 +2407,219 @@ mod tests { rename_table_mock.assert_async().await; } + /// Config mock that advertises the server-side scan-planning endpoints. + async fn create_scan_planning_config_mock(server: &mut ServerGuard) -> Mock { + server + .mock("GET", "/v1/config") + .with_status(200) + .with_body( + r#"{ + "overrides": { "warehouse": "s3://iceberg-catalog" }, + "defaults": {}, + "endpoints": [ + "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan", + "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}", + "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks", + "DELETE /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}" + ] + }"#, + ) + .create_async() + .await + } + + fn data_file_json(path: &str, size: u64, records: u64) -> serde_json::Value { + json!({ + "spec-id": 0, + "content": "data", + "file-path": path, + "file-format": "parquet", + "file-size-in-bytes": size, + "record-count": records + }) + } + + async fn load_scan_test_table(catalog: &RestCatalog) -> Table { + catalog + .load_table(&TableIdent::new( + NamespaceIdent::new("ns1".to_string()), + "test1".to_string(), + )) + .await + .unwrap() + } + + fn scan_test_catalog(server: &ServerGuard) -> RestCatalog { + RestCatalog::new( + RestCatalogConfig::builder().uri(server.url()).build(), + Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), + ) + } + + async fn collect_paths(table: &Table) -> Vec { + use futures::TryStreamExt; + let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + let tasks: Vec<_> = table + .scan() + .snapshot_id(snapshot_id) + .build() + .unwrap() + .plan_files() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + tasks + .into_iter() + .map(|t: iceberg::scan::FileScanTask| t.data_file_path) + .collect() + } + + #[tokio::test] + async fn test_server_scan_planning_completed_inline() { + let mut server = Server::new_async().await; + let config_mock = create_scan_planning_config_mock(&mut server).await; + let load_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/load_table_response.json", + env!("CARGO_MANIFEST_DIR") + )) + .create_async() + .await; + let plan_mock = server + .mock("POST", "/v1/namespaces/ns1/tables/test1/plan") + .with_status(200) + .with_body( + json!({ + "status": "completed", + "file-scan-tasks": [ + { "data-file": data_file_json("s3://warehouse/t/data/a.parquet", 697, 1) } + ] + }) + .to_string(), + ) + .create_async() + .await; + + let catalog = scan_test_catalog(&server); + let table = load_scan_test_table(&catalog).await; + let paths = collect_paths(&table).await; + + assert_eq!(paths, vec!["s3://warehouse/t/data/a.parquet".to_string()]); + config_mock.assert_async().await; + load_mock.assert_async().await; + plan_mock.assert_async().await; + } + + #[tokio::test] + async fn test_server_scan_planning_submitted_then_polled() { + let mut server = Server::new_async().await; + let config_mock = create_scan_planning_config_mock(&mut server).await; + let load_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/load_table_response.json", + env!("CARGO_MANIFEST_DIR") + )) + .create_async() + .await; + let submit_mock = server + .mock("POST", "/v1/namespaces/ns1/tables/test1/plan") + .with_status(200) + .with_body(json!({ "status": "submitted", "plan-id": "p1" }).to_string()) + .create_async() + .await; + // First poll returns completed immediately (no backoff sleep needed). + let poll_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1/plan/p1") + .with_status(200) + .with_body( + json!({ + "status": "completed", + "file-scan-tasks": [ + { "data-file": data_file_json("s3://warehouse/t/data/b.parquet", 1, 1) } + ] + }) + .to_string(), + ) + .create_async() + .await; + + let catalog = scan_test_catalog(&server); + let table = load_scan_test_table(&catalog).await; + let paths = collect_paths(&table).await; + + assert_eq!(paths, vec!["s3://warehouse/t/data/b.parquet".to_string()]); + config_mock.assert_async().await; + load_mock.assert_async().await; + submit_mock.assert_async().await; + poll_mock.assert_async().await; + } + + #[tokio::test] + async fn test_server_scan_planning_plan_task_fanout() { + let mut server = Server::new_async().await; + let config_mock = create_scan_planning_config_mock(&mut server).await; + let load_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/load_table_response.json", + env!("CARGO_MANIFEST_DIR") + )) + .create_async() + .await; + // Completed with an inline task plus a plan-task token to fetch. + let plan_mock = server + .mock("POST", "/v1/namespaces/ns1/tables/test1/plan") + .with_status(200) + .with_body( + json!({ + "status": "completed", + "file-scan-tasks": [ + { "data-file": data_file_json("s3://warehouse/t/data/inline.parquet", 1, 1) } + ], + "plan-tasks": ["task-1"] + }) + .to_string(), + ) + .create_async() + .await; + // fetchScanTasks returns one more task (and no further plan-tasks). + let tasks_mock = server + .mock("POST", "/v1/namespaces/ns1/tables/test1/tasks") + .with_status(200) + .with_body( + json!({ + "file-scan-tasks": [ + { "data-file": data_file_json("s3://warehouse/t/data/fetched.parquet", 1, 1) } + ] + }) + .to_string(), + ) + .create_async() + .await; + + let catalog = scan_test_catalog(&server); + let table = load_scan_test_table(&catalog).await; + let mut paths = collect_paths(&table).await; + paths.sort(); + + assert_eq!(paths, vec![ + "s3://warehouse/t/data/fetched.parquet".to_string(), + "s3://warehouse/t/data/inline.parquet".to_string(), + ]); + config_mock.assert_async().await; + load_mock.assert_async().await; + plan_mock.assert_async().await; + tasks_mock.assert_async().await; + } + #[tokio::test] async fn test_load_table_404() { let mut server = Server::new_async().await; diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs index 6bee950970..f285c13187 100644 --- a/crates/catalog/rest/src/lib.rs +++ b/crates/catalog/rest/src/lib.rs @@ -53,7 +53,9 @@ mod catalog; mod client; +mod scan_planning; mod types; pub use catalog::*; +pub use scan_planning::RestScanPlanner; pub use types::*; diff --git a/crates/catalog/rest/src/scan_planning/convert.rs b/crates/catalog/rest/src/scan_planning/convert.rs new file mode 100644 index 0000000000..b3fff62843 --- /dev/null +++ b/crates/catalog/rest/src/scan_planning/convert.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Conversion from REST wire content-file objects to +//! [`iceberg::scan::FileScanTask`]s. +//! +//! `FileScanTask`s are built directly through their public builders, so this +//! never needs access to `iceberg`'s crate-private `DataFile` internals. The +//! row-level predicate is the client's own scan filter (already bound), not the +//! server's `residual-filter`, which keeps results identical to native planning. + +use iceberg::expr::BoundPredicate; +use iceberg::scan::{FileScanTask, FileScanTaskDeleteFile}; +use iceberg::spec::{DataFileFormat, Literal, SchemaRef, Struct, TableMetadataRef}; +use iceberg::{Error, ErrorKind, Result}; +use serde_json::Value; + +use super::types::{RestContentFile, RestFileScanTask}; + +/// Resolved per-scan context needed to materialize tasks. +pub(crate) struct ConvertContext { + pub(crate) metadata: TableMetadataRef, + pub(crate) snapshot_schema: SchemaRef, + pub(crate) project_field_ids: Vec, + pub(crate) case_sensitive: bool, + /// The client's bound scan filter, applied as the per-task row predicate. + pub(crate) bound_filter: Option, +} + +/// Convert a `delete-files[]` entry into a [`FileScanTaskDeleteFile`]. +pub(crate) fn to_delete_file(rcf: &RestContentFile) -> FileScanTaskDeleteFile { + FileScanTaskDeleteFile::builder() + .with_file_path(rcf.file_path.clone()) + .with_file_size_in_bytes(rcf.file_size_in_bytes) + .with_file_type(rcf.content.into()) + .with_partition_spec_id(rcf.spec_id) + .with_equality_ids(rcf.equality_ids.clone()) + .build() +} + +/// Convert a `file-scan-tasks[]` entry into a [`FileScanTask`], resolving its +/// delete references against the response's shared `delete-files` list. +pub(crate) fn to_file_scan_task( + task: RestFileScanTask, + all_deletes: &[FileScanTaskDeleteFile], + ctx: &ConvertContext, +) -> Result { + let rcf = task.data_file; + + let spec = ctx + .metadata + .partition_spec_by_id(rcf.spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Scan plan referenced unknown partition spec id {}", + rcf.spec_id + ), + ) + })?; + + let data_file_format = rcf.file_format.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Unsupported data file format {:?} in scan plan", + rcf.file_format + ), + ) + .with_source(e) + })?; + + let partition = decode_partition(&rcf, spec, &ctx.snapshot_schema)?; + + let deletes = match task.delete_file_references { + Some(refs) => refs + .into_iter() + .map(|idx| { + all_deletes.get(idx).cloned().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("delete-file-reference {idx} out of range"), + ) + }) + }) + .collect::>>()?, + None => Vec::new(), + }; + + Ok(FileScanTask::builder() + .with_file_size_in_bytes(rcf.file_size_in_bytes) + .with_start(0) + .with_length(rcf.file_size_in_bytes) + .with_record_count(Some(rcf.record_count)) + .with_data_file_path(rcf.file_path) + .with_data_file_format(data_file_format) + .with_schema(ctx.snapshot_schema.clone()) + .with_project_field_ids(ctx.project_field_ids.clone()) + .with_predicate(ctx.bound_filter.clone()) + .with_deletes(deletes) + .with_partition(Some(partition)) + .with_partition_spec(Some(spec.clone())) + .with_name_mapping(None) + .with_case_sensitive(ctx.case_sensitive) + .build()) +} + +/// Decode the wire `partition` value into a [`Struct`] keyed by the spec's +/// partition type. Accepts both the positional-array and field-id-keyed-object +/// encodings; an absent partition yields an empty struct. +fn decode_partition( + rcf: &RestContentFile, + spec: &iceberg::spec::PartitionSpec, + schema: &SchemaRef, +) -> Result { + let partition_type = spec.partition_type(schema)?; + let fields = partition_type.fields(); + + if fields.is_empty() { + return Ok(Struct::empty()); + } + + let Some(value) = rcf.partition.as_ref() else { + // Partitioned table but no partition data supplied: treat all as null. + return Ok(Struct::from_iter(fields.iter().map(|_| None))); + }; + + let mut literals: Vec> = Vec::with_capacity(fields.len()); + match value { + Value::Array(values) => { + if values.len() != fields.len() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Partition array has {} values but spec expects {}", + values.len(), + fields.len() + ), + )); + } + for (field, v) in fields.iter().zip(values) { + literals.push(Literal::try_from_json(v.clone(), &field.field_type)?); + } + } + Value::Object(map) => { + for field in fields { + let v = map + .get(&field.id.to_string()) + .cloned() + .unwrap_or(Value::Null); + literals.push(Literal::try_from_json(v, &field.field_type)?); + } + } + Value::Null => { + return Ok(Struct::from_iter(fields.iter().map(|_| None))); + } + other => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Unexpected partition encoding in scan plan: {other}"), + )); + } + } + + Ok(Struct::from_iter(literals)) +} diff --git a/crates/catalog/rest/src/scan_planning/endpoint.rs b/crates/catalog/rest/src/scan_planning/endpoint.rs new file mode 100644 index 0000000000..2be66ac4c5 --- /dev/null +++ b/crates/catalog/rest/src/scan_planning/endpoint.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Endpoint capability negotiation. +//! +//! The REST server advertises the set of endpoints it supports in the +//! `endpoints` field of the `GET /v1/config` response, each encoded as a +//! `" "` string (mirroring the Java +//! `org.apache.iceberg.rest.Endpoint`). The client gates optional features — +//! notably server-side scan planning — on the presence of the corresponding +//! [`Endpoint`] in the negotiated set. + +use std::collections::HashSet; + +use iceberg::{Error, ErrorKind, Result}; +use serde::de::{Error as DeError, Visitor}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +// Path templates, matching `org.apache.iceberg.rest.ResourcePaths`. These are +// the *template* forms (with `{prefix}`/`{namespace}`/`{table}`/`{plan-id}`), +// not resolved URLs — the server advertises them verbatim and we compare by +// string equality. +const V1_TABLE_SCAN_PLAN_SUBMIT: &str = "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"; +const V1_TABLE_SCAN_PLAN: &str = + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"; +const V1_TABLE_SCAN_PLAN_TASKS: &str = "/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"; + +/// A server endpoint identified by its HTTP method and path template. +/// +/// Serialized as `" "` (e.g. `"POST /v1/{prefix}/.../plan"`), +/// matching the wire form used in the `endpoints` field of the config response. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) struct Endpoint { + method: String, + path: String, +} + +impl Endpoint { + fn new(method: &str, path: &str) -> Self { + Self { + method: method.to_ascii_uppercase(), + path: path.to_string(), + } + } + + /// `POST .../plan` — submit a table scan for server-side planning. + pub(crate) fn submit_table_scan_plan() -> Self { + Self::new("POST", V1_TABLE_SCAN_PLAN_SUBMIT) + } + + /// `GET .../plan/{plan-id}` — poll an asynchronous planning result. + pub(crate) fn fetch_table_scan_plan() -> Self { + Self::new("GET", V1_TABLE_SCAN_PLAN) + } + + /// `DELETE .../plan/{plan-id}` — cancel an in-progress plan. + pub(crate) fn cancel_table_scan_plan() -> Self { + Self::new("DELETE", V1_TABLE_SCAN_PLAN) + } + + /// `POST .../tasks` — fetch concrete scan tasks for a plan task token. + pub(crate) fn fetch_table_scan_plan_tasks() -> Self { + Self::new("POST", V1_TABLE_SCAN_PLAN_TASKS) + } +} + +impl std::fmt::Display for Endpoint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {}", self.method, self.path) + } +} + +impl Serialize for Endpoint { + fn serialize(&self, serializer: S) -> std::result::Result + where S: Serializer { + serializer.serialize_str(&format!("{} {}", self.method, self.path)) + } +} + +impl<'de> Deserialize<'de> for Endpoint { + fn deserialize(deserializer: D) -> std::result::Result + where D: Deserializer<'de> { + struct EndpointVisitor; + impl Visitor<'_> for EndpointVisitor { + type Value = Endpoint; + + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str("an endpoint string of the form \" \"") + } + + fn visit_str(self, v: &str) -> std::result::Result + where E: DeError { + let (method, path) = v.split_once(' ').ok_or_else(|| { + E::custom(format!( + "invalid endpoint {v:?}: expected \" \"" + )) + })?; + Ok(Endpoint::new(method.trim(), path.trim())) + } + } + deserializer.deserialize_str(EndpointVisitor) + } +} + +/// Returns an error if `endpoint` is absent from the negotiated `supported` +/// set, using [`ErrorKind::FeatureUnsupported`] so callers can fall back. +pub(crate) fn check(supported: &HashSet, endpoint: &Endpoint) -> Result<()> { + if supported.contains(endpoint) { + Ok(()) + } else { + Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Server does not support endpoint: {endpoint}"), + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn endpoint_string_round_trip() { + let ep = Endpoint::submit_table_scan_plan(); + let json = serde_json::to_string(&ep).unwrap(); + assert_eq!( + json, + "\"POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan\"" + ); + let back: Endpoint = serde_json::from_str(&json).unwrap(); + assert_eq!(ep, back); + } + + #[test] + fn endpoint_set_parses_from_config_array() { + let arr = serde_json::json!([ + "GET /v1/{prefix}/namespaces", + "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan", + "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}", + "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks", + ]); + let set: HashSet = serde_json::from_value(arr).unwrap(); + assert!(check(&set, &Endpoint::submit_table_scan_plan()).is_ok()); + assert!(check(&set, &Endpoint::fetch_table_scan_plan()).is_ok()); + assert!(check(&set, &Endpoint::fetch_table_scan_plan_tasks()).is_ok()); + // Cancel was not advertised. + assert!(check(&set, &Endpoint::cancel_table_scan_plan()).is_err()); + } +} diff --git a/crates/catalog/rest/src/scan_planning/expr.rs b/crates/catalog/rest/src/scan_planning/expr.rs new file mode 100644 index 0000000000..57058b7f6e --- /dev/null +++ b/crates/catalog/rest/src/scan_planning/expr.rs @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Serialization of an [`iceberg::expr::Predicate`] to the Iceberg expression +//! JSON wire format (matching Java `org.apache.iceberg.expressions.ExpressionParser`), +//! used to push a scan filter down to the planning server. +//! +//! This is intentionally conservative: any literal whose JSON encoding we +//! cannot reproduce faithfully (temporals, decimals, binary, uuid, …) makes the +//! whole conversion return `None`, in which case the caller simply omits the +//! `filter` from the request. Server-side filtering is an optimization — the +//! client still applies its own bound predicate when reading — so omitting a +//! filter never changes results. + +use iceberg::expr::{Predicate, PredicateOperator}; +use iceberg::spec::{Datum, PrimitiveLiteral, PrimitiveType}; +use serde_json::{Value, json}; + +/// Convert a predicate to Iceberg expression JSON, or `None` if it contains a +/// literal we cannot encode losslessly. +pub(crate) fn predicate_to_json(predicate: &Predicate) -> Option { + match predicate { + Predicate::AlwaysTrue => Some(json!({ "type": "true" })), + Predicate::AlwaysFalse => Some(json!({ "type": "false" })), + Predicate::And(expr) => { + let [left, right] = expr.inputs(); + Some(json!({ + "type": "and", + "left": predicate_to_json(left)?, + "right": predicate_to_json(right)?, + })) + } + Predicate::Or(expr) => { + let [left, right] = expr.inputs(); + Some(json!({ + "type": "or", + "left": predicate_to_json(left)?, + "right": predicate_to_json(right)?, + })) + } + Predicate::Not(expr) => { + let [child] = expr.inputs(); + Some(json!({ + "type": "not", + "child": predicate_to_json(child)?, + })) + } + Predicate::Unary(expr) => { + let op = op_str(expr.op())?; + Some(json!({ "type": op, "term": expr.term().name() })) + } + Predicate::Binary(expr) => { + let op = op_str(expr.op())?; + Some(json!({ + "type": op, + "term": expr.term().name(), + "value": datum_to_json(expr.literal())?, + })) + } + Predicate::Set(expr) => { + let op = op_str(expr.op())?; + let mut values = Vec::with_capacity(expr.literals().len()); + for datum in expr.literals() { + values.push(datum_to_json(datum)?); + } + Some(json!({ + "type": op, + "term": expr.term().name(), + "values": values, + })) + } + } +} + +/// Maps an operator to its Iceberg JSON tag, or `None` for any operator we +/// don't recognize (the enum is `#[non_exhaustive]`). +fn op_str(op: PredicateOperator) -> Option<&'static str> { + Some(match op { + PredicateOperator::IsNull => "is-null", + PredicateOperator::NotNull => "not-null", + PredicateOperator::IsNan => "is-nan", + PredicateOperator::NotNan => "not-nan", + PredicateOperator::LessThan => "lt", + PredicateOperator::LessThanOrEq => "lt-eq", + PredicateOperator::GreaterThan => "gt", + PredicateOperator::GreaterThanOrEq => "gt-eq", + PredicateOperator::Eq => "eq", + PredicateOperator::NotEq => "not-eq", + PredicateOperator::StartsWith => "starts-with", + PredicateOperator::NotStartsWith => "not-starts-with", + PredicateOperator::In => "in", + PredicateOperator::NotIn => "not-in", + _ => return None, + }) +} + +/// Encode a literal as Iceberg single-value JSON for the "plain" primitive +/// types. Returns `None` for types whose single-value encoding we don't +/// reproduce here (dates/times/timestamps/decimals/binary/uuid/fixed). +fn datum_to_json(datum: &Datum) -> Option { + match (datum.data_type(), datum.literal()) { + (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(b)) => Some(json!(b)), + (PrimitiveType::Int, PrimitiveLiteral::Int(i)) => Some(json!(i)), + (PrimitiveType::Long, PrimitiveLiteral::Long(l)) => Some(json!(l)), + (PrimitiveType::Float, PrimitiveLiteral::Float(f)) => Some(json!(f.0)), + (PrimitiveType::Double, PrimitiveLiteral::Double(f)) => Some(json!(f.0)), + (PrimitiveType::String, PrimitiveLiteral::String(s)) => Some(json!(s)), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use iceberg::expr::Reference; + use iceberg::spec::Datum; + + use super::*; + + #[test] + fn binary_eq_predicate() { + let p = Reference::new("id").equal_to(Datum::long(5)); + let json = predicate_to_json(&p).unwrap(); + assert_eq!(json["type"], "eq"); + assert_eq!(json["term"], "id"); + assert_eq!(json["value"], 5); + } + + #[test] + fn and_of_unary_and_binary() { + let p = Reference::new("a") + .is_null() + .and(Reference::new("b").greater_than(Datum::int(3))); + let json = predicate_to_json(&p).unwrap(); + assert_eq!(json["type"], "and"); + assert_eq!(json["left"]["type"], "is-null"); + assert_eq!(json["right"]["type"], "gt"); + assert_eq!(json["right"]["value"], 3); + } + + #[test] + fn unsupported_literal_yields_none() { + // A binary predicate on a date literal cannot be encoded -> None. + let p = Reference::new("d").equal_to(Datum::date(1)); + assert!(predicate_to_json(&p).is_none()); + } +} diff --git a/crates/catalog/rest/src/scan_planning/mod.rs b/crates/catalog/rest/src/scan_planning/mod.rs new file mode 100644 index 0000000000..b354a10e79 --- /dev/null +++ b/crates/catalog/rest/src/scan_planning/mod.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Client implementation of the Iceberg REST server-side scan-planning +//! protocol (`planTableScan` / `fetchPlanningResult` / `fetchScanTasks`). + +mod convert; +pub(crate) mod endpoint; +mod expr; +mod planner; +mod stream; +mod types; + +use std::collections::HashSet; +use std::sync::Arc; + +use iceberg::expr::Predicate; +use iceberg::spec::{SchemaRef, TableMetadataRef}; +use iceberg::{Runtime, TableIdent}; +pub use planner::RestScanPlanner; +pub(crate) use stream::plan_table_scan; + +use crate::RestCatalogConfig; +use crate::client::HttpClient; +use crate::scan_planning::endpoint::Endpoint; + +/// All resources and parameters needed to run one server-side scan plan. +pub(crate) struct PlanScanContext { + pub(crate) client: Arc, + pub(crate) config: RestCatalogConfig, + pub(crate) endpoints: Arc>, + pub(crate) runtime: Runtime, + pub(crate) table_ident: TableIdent, + pub(crate) metadata: TableMetadataRef, + pub(crate) snapshot_schema: SchemaRef, + pub(crate) project_field_ids: Vec, + pub(crate) case_sensitive: bool, + pub(crate) snapshot_id: Option, + pub(crate) start_snapshot_id: Option, + pub(crate) end_snapshot_id: Option, + pub(crate) select: Option>, + pub(crate) filter: Option, +} diff --git a/crates/catalog/rest/src/scan_planning/planner.rs b/crates/catalog/rest/src/scan_planning/planner.rs new file mode 100644 index 0000000000..18007b3e96 --- /dev/null +++ b/crates/catalog/rest/src/scan_planning/planner.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`RestScanPlanner`]: the REST catalog's [`ScanPlanner`] implementation. + +use std::collections::HashSet; +use std::sync::Arc; + +use async_trait::async_trait; +use iceberg::scan::{FileScanTaskStream, ScanPlanRequest, ScanPlanner}; +use iceberg::{Result, Runtime}; + +use super::endpoint::Endpoint; +use super::{PlanScanContext, plan_table_scan}; +use crate::RestCatalogConfig; +use crate::client::HttpClient; + +/// A [`ScanPlanner`] backed by a REST catalog's server-side planning endpoints. +/// +/// Instances are created by `RestCatalog` and attached to the tables it loads; +/// each holds a handle to the shared HTTP client, the negotiated runtime config +/// and endpoint set, so planning can run independently of the catalog object. +#[derive(Debug)] +pub struct RestScanPlanner { + client: Arc, + config: RestCatalogConfig, + endpoints: Arc>, + runtime: Runtime, +} + +impl RestScanPlanner { + pub(crate) fn new( + client: Arc, + config: RestCatalogConfig, + endpoints: Arc>, + runtime: Runtime, + ) -> Self { + Self { + client, + config, + endpoints, + runtime, + } + } +} + +#[async_trait] +impl ScanPlanner for RestScanPlanner { + async fn plan_table_scan(&self, request: ScanPlanRequest) -> Result { + request.validate()?; + let ctx = PlanScanContext { + client: self.client.clone(), + config: self.config.clone(), + endpoints: self.endpoints.clone(), + runtime: self.runtime.clone(), + table_ident: request.table_ident, + metadata: request.metadata, + snapshot_schema: request.snapshot_schema, + project_field_ids: request.project_field_ids, + case_sensitive: request.case_sensitive, + snapshot_id: request.snapshot_id, + start_snapshot_id: request.start_snapshot_id, + end_snapshot_id: request.end_snapshot_id, + select: request.select, + filter: request.filter, + }; + plan_table_scan(ctx).await + } +} diff --git a/crates/catalog/rest/src/scan_planning/stream.rs b/crates/catalog/rest/src/scan_planning/stream.rs new file mode 100644 index 0000000000..c1678ea436 --- /dev/null +++ b/crates/catalog/rest/src/scan_planning/stream.rs @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The server-side scan-planning state machine: submit → (poll) → fetch tasks. +//! +//! Planning is driven to completion and the resulting [`FileScanTask`]s are +//! returned as a stream. If the planning future is dropped before it resolves +//! (e.g. the caller aborts the scan), a [`CancelGuard`] fires a best-effort +//! `DELETE .../plan/{plan-id}` so the server can release the in-flight plan. + +use std::collections::{HashSet, VecDeque}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use iceberg::expr::Bind; +use iceberg::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream}; +use iceberg::{Error, ErrorKind, Result, TableIdent}; +use reqwest::{Method, StatusCode}; +use serde::Serialize; +use serde::de::DeserializeOwned; + +use super::PlanScanContext; +use super::convert::{ConvertContext, to_delete_file, to_file_scan_task}; +use super::endpoint::{self, Endpoint}; +use super::types::{ + FetchPlanningResultResponse, FetchScanTasksRequest, FetchScanTasksResponse, PlanStatus, + PlanTableScanRequest, PlanTableScanResponse, RestContentFile, RestFileScanTask, +}; +use crate::RestCatalogConfig; +use crate::client::{ + HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, +}; + +const MIN_SLEEP: Duration = Duration::from_secs(1); +const MAX_SLEEP: Duration = Duration::from_secs(60); +const MAX_RETRIES: u32 = 10; +const MAX_WAIT: Duration = Duration::from_secs(300); + +/// Plan a table scan on the server and return its file scan tasks as a stream. +/// +/// Returns [`ErrorKind::FeatureUnsupported`] if the server does not advertise +/// the submit endpoint, allowing the scan engine to fall back to native +/// planning. +pub(crate) async fn plan_table_scan(ctx: PlanScanContext) -> Result { + endpoint::check(&ctx.endpoints, &Endpoint::submit_table_scan_plan())?; + + let bound_filter = match &ctx.filter { + Some(predicate) => Some(predicate.bind(ctx.snapshot_schema.clone(), ctx.case_sensitive)?), + None => None, + }; + let convert_ctx = ConvertContext { + metadata: ctx.metadata.clone(), + snapshot_schema: ctx.snapshot_schema.clone(), + project_field_ids: ctx.project_field_ids.clone(), + case_sensitive: ctx.case_sensitive, + bound_filter, + }; + + let mut guard = CancelGuard { + client: ctx.client.clone(), + config: ctx.config.clone(), + endpoints: ctx.endpoints.clone(), + table_ident: ctx.table_ident.clone(), + runtime: ctx.runtime.clone(), + plan_id: None, + armed: true, + }; + + let tasks = drive(&ctx, &convert_ctx, &mut guard).await?; + guard.disarm(); + + Ok(Box::pin(futures::stream::iter( + tasks.into_iter().map(Ok::), + ))) +} + +/// Run the full state machine and collect all produced tasks. +async fn drive( + ctx: &PlanScanContext, + convert_ctx: &ConvertContext, + guard: &mut CancelGuard, +) -> Result> { + let request = build_request(ctx); + let submit_url = ctx.config.scan_plan_endpoint(&ctx.table_ident); + let response: PlanTableScanResponse = post_json(&ctx.client, &submit_url, &request).await?; + + if let Some(plan_id) = &response.plan_id { + guard.plan_id = Some(plan_id.clone()); + } + + let mut out = Vec::new(); + let (delete_files, file_scan_tasks, plan_tasks) = match response.status { + PlanStatus::Completed => ( + response.delete_files, + response.file_scan_tasks, + response.plan_tasks, + ), + PlanStatus::Submitted => { + let plan_id = response.plan_id.ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Server returned status=submitted without a plan-id", + ) + })?; + poll_until_complete(ctx, &plan_id).await? + } + PlanStatus::Failed => return Err(failure_error(response.error.as_ref())), + PlanStatus::Cancelled => { + return Err(Error::new( + ErrorKind::Unexpected, + "Server cancelled the scan plan", + )); + } + }; + + convert_into(delete_files, file_scan_tasks, convert_ctx, &mut out)?; + + // Fan out over plan-task tokens; fetchScanTasks may itself return more. + let mut queue: VecDeque = plan_tasks.unwrap_or_default().into_iter().collect(); + if !queue.is_empty() { + endpoint::check(&ctx.endpoints, &Endpoint::fetch_table_scan_plan_tasks())?; + } + let tasks_url = ctx.config.scan_tasks_endpoint(&ctx.table_ident); + while let Some(plan_task) = queue.pop_front() { + let response: FetchScanTasksResponse = + post_json(&ctx.client, &tasks_url, &FetchScanTasksRequest { + plan_task, + }) + .await?; + convert_into( + response.delete_files, + response.file_scan_tasks, + convert_ctx, + &mut out, + )?; + if let Some(more) = response.plan_tasks { + queue.extend(more); + } + } + + Ok(out) +} + +/// Poll `GET .../plan/{plan-id}` with exponential backoff until the plan +/// reaches a terminal state. +async fn poll_until_complete( + ctx: &PlanScanContext, + plan_id: &str, +) -> Result<( + Option>, + Option>, + Option>, +)> { + endpoint::check(&ctx.endpoints, &Endpoint::fetch_table_scan_plan())?; + let url = ctx.config.scan_plan_id_endpoint(&ctx.table_ident, plan_id); + + let mut backoff = MIN_SLEEP; + let mut attempts = 0u32; + let started = Instant::now(); + loop { + let response: FetchPlanningResultResponse = get_json(&ctx.client, &url).await?; + match response.status { + PlanStatus::Completed => { + return Ok(( + response.delete_files, + response.file_scan_tasks, + response.plan_tasks, + )); + } + PlanStatus::Submitted => { + attempts += 1; + if attempts > MAX_RETRIES || started.elapsed() > MAX_WAIT { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Scan plan {plan_id} did not complete within {MAX_WAIT:?} ({MAX_RETRIES} retries)" + ), + )); + } + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(MAX_SLEEP); + } + PlanStatus::Failed => return Err(failure_error(response.error.as_ref())), + PlanStatus::Cancelled => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Scan plan {plan_id} was cancelled by the server"), + )); + } + } + } +} + +fn build_request(ctx: &PlanScanContext) -> PlanTableScanRequest { + let incremental = ctx.start_snapshot_id.is_some() || ctx.end_snapshot_id.is_some(); + PlanTableScanRequest { + snapshot_id: if incremental { None } else { ctx.snapshot_id }, + start_snapshot_id: ctx.start_snapshot_id, + end_snapshot_id: ctx.end_snapshot_id, + select: ctx.select.clone(), + filter: ctx.filter.as_ref().and_then(super::expr::predicate_to_json), + case_sensitive: Some(ctx.case_sensitive), + use_snapshot_schema: incremental.then_some(true), + stats_fields: None, + min_rows_requested: None, + } +} + +fn convert_into( + delete_files: Option>, + file_scan_tasks: Option>, + convert_ctx: &ConvertContext, + out: &mut Vec, +) -> Result<()> { + let deletes: Vec = delete_files + .unwrap_or_default() + .iter() + .map(to_delete_file) + .collect(); + for task in file_scan_tasks.unwrap_or_default() { + out.push(to_file_scan_task(task, &deletes, convert_ctx)?); + } + Ok(()) +} + +fn failure_error(error: Option<&super::types::PlanError>) -> Error { + let detail = error + .map(|e| e.describe()) + .unwrap_or_else(|| "unknown".to_string()); + Error::new( + ErrorKind::Unexpected, + format!("Server-side scan planning failed: {detail}"), + ) +} + +async fn post_json( + client: &HttpClient, + url: &str, + body: &B, +) -> Result { + let request = client.request(Method::POST, url).json(body).build()?; + let response = client.query_catalog(request).await?; + match response.status() { + StatusCode::OK => deserialize_catalog_response(response).await, + _ => Err( + deserialize_unexpected_catalog_error(response, client.disable_header_redaction()).await, + ), + } +} + +async fn get_json(client: &HttpClient, url: &str) -> Result { + let request = client.request(Method::GET, url).build()?; + let response = client.query_catalog(request).await?; + match response.status() { + StatusCode::OK => deserialize_catalog_response(response).await, + _ => Err( + deserialize_unexpected_catalog_error(response, client.disable_header_redaction()).await, + ), + } +} + +/// Fires a best-effort `DELETE .../plan/{plan-id}` if the planning future is +/// dropped while still armed (i.e. before planning completed successfully). +struct CancelGuard { + client: Arc, + config: RestCatalogConfig, + endpoints: Arc>, + table_ident: TableIdent, + runtime: iceberg::Runtime, + plan_id: Option, + armed: bool, +} + +impl CancelGuard { + fn disarm(&mut self) { + self.armed = false; + } +} + +impl Drop for CancelGuard { + fn drop(&mut self) { + if !self.armed { + return; + } + let Some(plan_id) = self.plan_id.take() else { + return; + }; + if endpoint::check(&self.endpoints, &Endpoint::cancel_table_scan_plan()).is_err() { + return; + } + let client = self.client.clone(); + let url = self + .config + .scan_plan_id_endpoint(&self.table_ident, &plan_id); + // Detached, best-effort cancellation. + self.runtime.io().spawn(async move { + if let Ok(request) = client.request(Method::DELETE, url).build() { + let _ = client.execute(request).await; + } + }); + } +} diff --git a/crates/catalog/rest/src/scan_planning/types.rs b/crates/catalog/rest/src/scan_planning/types.rs new file mode 100644 index 0000000000..64c10701eb --- /dev/null +++ b/crates/catalog/rest/src/scan_planning/types.rs @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Wire types for the REST scan-planning protocol, matching the Iceberg +//! OpenAPI spec (`planTableScan` / `fetchPlanningResult` / `fetchScanTasks`). +//! +//! Only the fields needed to construct an [`iceberg::scan::FileScanTask`] are +//! modelled on the content-file objects; unknown fields (column stats, bounds, +//! key metadata, …) are ignored on deserialization since a `FileScanTask` +//! carries no statistics. + +use iceberg::spec::DataContentType; +use serde_derive::{Deserialize, Serialize}; + +use crate::types::StorageCredential; + +/// Status of a server-side scan plan. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(crate) enum PlanStatus { + /// Planning finished; tasks (and/or plan-task tokens) are available. + Completed, + /// Planning is in progress; poll the plan id until it completes. + Submitted, + /// Planning was cancelled. + Cancelled, + /// Planning failed. + Failed, +} + +/// Body of `POST .../plan`. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct PlanTableScanRequest { + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshot_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub start_snapshot_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub end_snapshot_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub select: Option>, + /// Filter predicate as Iceberg expression JSON (see `expr.rs`). + #[serde(skip_serializing_if = "Option::is_none")] + pub filter: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub case_sensitive: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub use_snapshot_schema: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stats_fields: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub min_rows_requested: Option, +} + +/// Error payload embedded in a `failed` plan response. +#[derive(Debug, Clone, Deserialize)] +pub(crate) struct PlanError { + #[serde(default)] + pub message: Option, + #[serde(default, rename = "type")] + pub r#type: Option, + #[serde(default)] + pub code: Option, +} + +impl PlanError { + pub(crate) fn describe(&self) -> String { + format!( + "{} (type={}, code={})", + self.message.as_deref().unwrap_or("unknown"), + self.r#type.as_deref().unwrap_or("unknown"), + self.code.unwrap_or(0) + ) + } +} + +/// Response of `POST .../plan`. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct PlanTableScanResponse { + pub status: PlanStatus, + #[serde(default)] + pub error: Option, + #[serde(default)] + pub plan_id: Option, + #[serde(default)] + pub plan_tasks: Option>, + /// Vended credentials. Parsed for protocol completeness; reads currently use + /// the table's existing `FileIO` (see the scan-planning module docs). + #[serde(default)] + #[allow(dead_code)] + pub storage_credentials: Option>, + #[serde(default)] + pub delete_files: Option>, + #[serde(default)] + pub file_scan_tasks: Option>, +} + +/// Response of `GET .../plan/{plan-id}` (no `plan-id` — it is in the URL). +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct FetchPlanningResultResponse { + pub status: PlanStatus, + #[serde(default)] + pub error: Option, + #[serde(default)] + pub plan_tasks: Option>, + /// Vended credentials. Parsed for protocol completeness; reads currently use + /// the table's existing `FileIO` (see the scan-planning module docs). + #[serde(default)] + #[allow(dead_code)] + pub storage_credentials: Option>, + #[serde(default)] + pub delete_files: Option>, + #[serde(default)] + pub file_scan_tasks: Option>, +} + +/// Body of `POST .../tasks`. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct FetchScanTasksRequest { + pub plan_task: String, +} + +/// Response of `POST .../tasks`. May return further `plan-tasks` (recursive). +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct FetchScanTasksResponse { + #[serde(default)] + pub plan_tasks: Option>, + #[serde(default)] + pub delete_files: Option>, + #[serde(default)] + pub file_scan_tasks: Option>, +} + +/// One entry in a `file-scan-tasks` array. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct RestFileScanTask { + pub data_file: RestContentFile, + /// Indices into the response's shared `delete-files` array. + #[serde(default)] + pub delete_file_references: Option>, + /// Residual filter as Iceberg expression JSON. Parsed for protocol + /// completeness; the client's own scan filter is authoritative for row + /// filtering, so this is not consumed. + #[serde(default)] + #[allow(dead_code)] + pub residual_filter: Option, +} + +/// The `data-file` / `delete-files[]` object (a subset of `ContentFileParser`). +/// +/// Statistics/bounds/key-metadata are intentionally omitted: a +/// [`FileScanTask`](iceberg::scan::FileScanTask) does not carry them, and serde +/// ignores the unmodelled fields. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct RestContentFile { + pub spec_id: i32, + pub content: ContentTypeStr, + pub file_path: String, + /// Lowercase format string (`"parquet"`/`"avro"`/`"orc"`); parsed case-insensitively. + pub file_format: String, + /// Positional array (one element per partition field) or field-id-keyed object. + #[serde(default)] + pub partition: Option, + pub file_size_in_bytes: u64, + pub record_count: u64, + #[serde(default)] + pub equality_ids: Option>, +} + +/// Content type as a kebab-case string, tolerating legacy SCREAMING_CASE names. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +pub(crate) enum ContentTypeStr { + #[serde(rename = "data", alias = "DATA")] + Data, + #[serde(rename = "position-deletes", alias = "POSITION_DELETES")] + PositionDeletes, + #[serde(rename = "equality-deletes", alias = "EQUALITY_DELETES")] + EqualityDeletes, +} + +impl From for DataContentType { + fn from(value: ContentTypeStr) -> Self { + match value { + ContentTypeStr::Data => DataContentType::Data, + ContentTypeStr::PositionDeletes => DataContentType::PositionDeletes, + ContentTypeStr::EqualityDeletes => DataContentType::EqualityDeletes, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn plan_status_kebab() { + assert_eq!( + serde_json::from_value::(serde_json::json!("completed")).unwrap(), + PlanStatus::Completed + ); + assert_eq!( + serde_json::from_value::(serde_json::json!("position-deletes")).is_err(), + true + ); + } + + #[test] + fn request_omits_none_fields() { + let req = PlanTableScanRequest { + snapshot_id: Some(42), + case_sensitive: Some(true), + ..Default::default() + }; + let v = serde_json::to_value(&req).unwrap(); + assert_eq!(v["snapshot-id"], 42); + assert_eq!(v["case-sensitive"], true); + assert!(v.get("select").is_none()); + assert!(v.get("filter").is_none()); + } + + #[test] + fn content_type_kebab_and_legacy() { + let kebab: ContentTypeStr = + serde_json::from_value(serde_json::json!("equality-deletes")).unwrap(); + assert_eq!(kebab, ContentTypeStr::EqualityDeletes); + let legacy: ContentTypeStr = + serde_json::from_value(serde_json::json!("POSITION_DELETES")).unwrap(); + assert_eq!(legacy, ContentTypeStr::PositionDeletes); + } + + #[test] + fn content_file_ignores_unmodelled_fields() { + let json = serde_json::json!({ + "spec-id": 0, + "content": "data", + "file-path": "s3://b/f.parquet", + "file-format": "parquet", + "file-size-in-bytes": 1024, + "record-count": 10, + "column-sizes": {"keys": [1], "values": [100]}, + "lower-bounds": {"keys": [1], "values": ["00000000"]}, + "key-metadata": "abcd" + }); + let cf: RestContentFile = serde_json::from_value(json).unwrap(); + assert_eq!(cf.file_path, "s3://b/f.parquet"); + assert_eq!(cf.record_count, 10); + assert_eq!(cf.content, ContentTypeStr::Data); + } +} diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index ab44c40ee3..47115cb386 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -25,10 +25,15 @@ use iceberg::{ }; use serde_derive::{Deserialize, Serialize}; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub(super) struct CatalogConfig { pub(super) overrides: HashMap, pub(super) defaults: HashMap, + /// Endpoints advertised by the server for capability negotiation. Absent on + /// older servers, in which case optional features (e.g. server-side scan + /// planning) are treated as unsupported. + #[serde(default)] + pub(super) endpoints: Vec, } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 368e8143e2..e27a9701ea 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -21,6 +21,7 @@ mod cache; use cache::*; mod context; use context::*; +mod planner; mod task; use std::sync::Arc; @@ -29,6 +30,7 @@ use arrow_array::RecordBatch; use futures::channel::mpsc::{Sender, channel}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; +pub use planner::{ScanPlanRequest, ScanPlanner}; pub use task::*; use crate::arrow::ArrowReaderBuilder; @@ -42,7 +44,7 @@ use crate::runtime::Runtime; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::util::available_parallelism; -use crate::{Error, ErrorKind, Result}; +use crate::{Error, ErrorKind, Result, TableIdent}; /// A stream of arrow [`RecordBatch`]es. pub type ArrowRecordBatchStream = BoxStream<'static, Result>; @@ -53,6 +55,10 @@ pub struct TableScanBuilder<'a> { // Defaults to none which means select all columns column_names: Option>, snapshot_id: Option, + // Exclusive start / inclusive end snapshots for an incremental scan. Only + // honored by server-side scan planning; the native planner ignores them. + start_snapshot_id: Option, + end_snapshot_id: Option, batch_size: Option, case_sensitive: bool, filter: Option, @@ -71,6 +77,8 @@ impl<'a> TableScanBuilder<'a> { table, column_names: None, snapshot_id: None, + start_snapshot_id: None, + end_snapshot_id: None, batch_size: None, case_sensitive: true, filter: None, @@ -132,6 +140,18 @@ impl<'a> TableScanBuilder<'a> { self } + /// Configure an incremental scan between an exclusive `start` snapshot and + /// an inclusive `end` snapshot. + /// + /// Incremental scans are only supported when the table is backed by a + /// catalog that performs server-side scan planning; the native planner + /// ignores these bounds and scans the resolved snapshot instead. + pub fn with_incremental(mut self, start_snapshot_id: i64, end_snapshot_id: i64) -> Self { + self.start_snapshot_id = Some(start_snapshot_id); + self.end_snapshot_id = Some(end_snapshot_id); + self + } + /// Sets the concurrency limit for both manifest files and manifest /// entries for this scan pub fn with_concurrency_limit(mut self, limit: usize) -> Self { @@ -212,6 +232,10 @@ impl<'a> TableScanBuilder<'a> { row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, runtime: self.table.runtime().clone(), + scan_planner: self.table.scan_planner(), + table_ident: self.table.identifier().clone(), + start_snapshot_id: self.start_snapshot_id, + end_snapshot_id: self.end_snapshot_id, }); }; current_snapshot_id.clone() @@ -306,6 +330,10 @@ impl<'a> TableScanBuilder<'a> { row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, runtime: self.table.runtime().clone(), + scan_planner: self.table.scan_planner(), + table_ident: self.table.identifier().clone(), + start_snapshot_id: self.start_snapshot_id, + end_snapshot_id: self.end_snapshot_id, }) } } @@ -336,6 +364,16 @@ pub struct TableScan { row_selection_enabled: bool, runtime: Runtime, + + /// Optional server-side scan planner injected by the catalog. When present, + /// [`plan_files`](Self::plan_files) delegates planning to it. + scan_planner: Option>, + /// Identifier of the scanned table, required to address the server-side + /// planning endpoints. + table_ident: TableIdent, + /// Incremental scan bounds, forwarded to the server-side planner. + start_snapshot_id: Option, + end_snapshot_id: Option, } impl TableScan { @@ -345,6 +383,39 @@ impl TableScan { return Ok(Box::pin(futures::stream::empty())); }; + // Delegate to server-side scan planning when the catalog provides a + // planner. A `FeatureUnsupported` error means the server does not + // advertise the planning endpoints, so we fall back to native planning. + if let Some(planner) = self.scan_planner.as_ref() { + let snapshot_id = if self.start_snapshot_id.is_some() || self.end_snapshot_id.is_some() + { + None + } else { + Some(plan_context.snapshot.snapshot_id()) + }; + + let request = ScanPlanRequest { + table_ident: self.table_ident.clone(), + snapshot_id, + start_snapshot_id: self.start_snapshot_id, + end_snapshot_id: self.end_snapshot_id, + select: self.column_names.clone(), + filter: plan_context.predicate.as_ref().map(|p| p.as_ref().clone()), + case_sensitive: plan_context.case_sensitive, + project_field_ids: plan_context.field_ids.as_ref().clone(), + metadata: plan_context.table_metadata.clone(), + snapshot_schema: plan_context.snapshot_schema.clone(), + }; + + match planner.plan_table_scan(request).await { + Ok(stream) => return Ok(stream), + Err(e) if e.kind() == ErrorKind::FeatureUnsupported => { + // Fall through to native, client-side planning below. + } + Err(e) => return Err(e), + } + } + let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files; let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; diff --git a/crates/iceberg/src/scan/planner.rs b/crates/iceberg/src/scan/planner.rs new file mode 100644 index 0000000000..3a8838b345 --- /dev/null +++ b/crates/iceberg/src/scan/planner.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Server-side scan planning abstraction. +//! +//! Iceberg's REST protocol allows a catalog server to perform scan planning +//! (file pruning, delete-file resolution) on the client's behalf and return a +//! ready-made list of [`FileScanTask`](crate::scan::FileScanTask)s. The +//! [`ScanPlanner`] trait is the seam through which a catalog injects that +//! capability into a [`Table`](crate::table::Table): when a planner is present, +//! [`TableScan::plan_files`](crate::scan::TableScan::plan_files) delegates to it +//! instead of reading manifests locally. +//! +//! This is the narrow, purpose-built capability trait used by the "Variant B" +//! injection design — only catalogs that actually support remote planning +//! implement it, and the core [`Catalog`](crate::Catalog) trait is left +//! untouched. + +use std::fmt::Debug; + +use async_trait::async_trait; + +use crate::expr::Predicate; +use crate::scan::FileScanTaskStream; +use crate::spec::{SchemaRef, TableMetadataRef}; +use crate::{Error, ErrorKind, Result, TableIdent}; + +/// A neutral, catalog-agnostic description of the scan to be planned remotely. +/// +/// This is the contract between the core scan engine (which assembles it from a +/// [`TableScan`](crate::scan::TableScan)) and a catalog-specific +/// [`ScanPlanner`] implementation (which maps it onto its wire protocol). +#[derive(Debug, Clone)] +pub struct ScanPlanRequest { + /// Identifier of the table being scanned. + pub table_ident: TableIdent, + + /// The snapshot to scan. Mutually exclusive with + /// [`start_snapshot_id`](Self::start_snapshot_id)/[`end_snapshot_id`](Self::end_snapshot_id). + pub snapshot_id: Option, + + /// Exclusive start snapshot for an incremental scan. + pub start_snapshot_id: Option, + + /// Inclusive end snapshot for an incremental scan. + pub end_snapshot_id: Option, + + /// The column names to project, or `None` to select all top-level columns. + pub select: Option>, + + /// An optional, still-unbound filter predicate to push down to the server. + pub filter: Option, + + /// Whether predicate binding should treat column names case-sensitively. + pub case_sensitive: bool, + + /// The resolved field ids being projected. These are not carried on the + /// wire by the server, so the planner must stamp them onto every produced + /// task. + pub project_field_ids: Vec, + + /// The metadata of the table at scan time, used to resolve partition specs + /// and schemas while converting the server's response. + pub metadata: TableMetadataRef, + + /// The schema associated with the scanned snapshot. + pub snapshot_schema: SchemaRef, +} + +/// A catalog capability for planning table scans on the server. +/// +/// Implementations should return an error with +/// [`ErrorKind::FeatureUnsupported`] when the server does not advertise the +/// scan-planning endpoints, which signals the scan engine to fall back to local +/// (client-side) planning. Any other error is treated as a hard failure. +#[async_trait] +pub trait ScanPlanner: Debug + Send + Sync { + /// Plan a table scan on the server and return a stream of file scan tasks. + async fn plan_table_scan(&self, request: ScanPlanRequest) -> Result; +} + +impl ScanPlanRequest { + /// Returns an error if the snapshot selectors are inconsistent: a request + /// must specify either a single `snapshot_id` or *both* incremental + /// boundaries, never a mix. + pub fn validate(&self) -> Result<()> { + let incremental = self.start_snapshot_id.is_some() || self.end_snapshot_id.is_some(); + if incremental && self.snapshot_id.is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Scan plan request cannot set both snapshot_id and an incremental snapshot range", + )); + } + if incremental && (self.start_snapshot_id.is_none() || self.end_snapshot_id.is_none()) { + return Err(Error::new( + ErrorKind::DataInvalid, + "Incremental scan plan request requires both start_snapshot_id and end_snapshot_id", + )); + } + Ok(()) + } +} diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 31feade038..4386b0c653 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -26,7 +26,7 @@ use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; use crate::runtime::Runtime; -use crate::scan::TableScanBuilder; +use crate::scan::{ScanPlanner, TableScanBuilder}; use crate::spec::{ManifestListReader, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -41,6 +41,7 @@ pub struct TableBuilder { disable_cache: bool, cache_size_bytes: Option, runtime: Option, + scan_planner: Option>, } impl TableBuilder { @@ -55,6 +56,7 @@ impl TableBuilder { disable_cache: false, cache_size_bytes: None, runtime: None, + scan_planner: None, } } @@ -108,6 +110,16 @@ impl TableBuilder { self } + /// optional - sets a server-side [`ScanPlanner`] for this table. + /// + /// When provided, [`Table::scan`] produces scans that delegate planning to + /// this planner (typically the REST catalog), falling back to native, + /// client-side planning if the server does not support it. + pub fn scan_planner(mut self, scan_planner: Arc) -> Self { + self.scan_planner = Some(scan_planner); + self + } + /// optional - sets the KMS client used to unwrap keys for table encryption. /// /// If the table metadata has the `encryption.key-id` property set, a @@ -130,6 +142,7 @@ impl TableBuilder { disable_cache, cache_size_bytes, runtime, + scan_planner, } = self; let Some(file_io) = file_io else { @@ -190,6 +203,7 @@ impl TableBuilder { object_cache, runtime, encryption_manager, + scan_planner, }) } } @@ -205,6 +219,8 @@ pub struct Table { object_cache: Arc, runtime: Runtime, encryption_manager: Option>, + /// Optional server-side scan planner injected by the catalog (e.g. REST). + scan_planner: Option>, } impl Table { @@ -291,6 +307,11 @@ impl Table { &self.runtime } + /// Returns the server-side [`ScanPlanner`] for this table, if one was set. + pub(crate) fn scan_planner(&self) -> Option> { + self.scan_planner.clone() + } + /// Returns the flag indicating whether the `Table` is readonly or not pub fn readonly(&self) -> bool { self.readonly From 551d64aaa456252de5cb94e9a84b94225ad5cf61 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 15 Jun 2026 22:09:22 -0700 Subject: [PATCH 2/5] feat(catalog-rest): vended-credential reads for server-side scan planning Builds on the Variant B server-side scan planning so a server-planned scan can actually read its data files end-to-end against Unity Catalog FGAC tables (verified live: server applies column masks, client reads the masked rows). - Vended storage credentials: REST `storage-credentials` (from load-table and from scan-plan responses) are attached to a `FileIO` via the new `FileIOBuilder::with_storage_credentials` / `StorageConfig` credential support, resolved per object path (longest-prefix) by `OpenDalResolvingStorageFactory`. - R10 (plan-scoped credentials): `ScanPlanner::plan_table_scan` now returns `ServerScanPlan { tasks, file_io }`. The REST planner builds a plan-scoped `FileIO` from the credentials the server vends in the plan/poll responses, and `TableScan::to_arrow` reads through it. Server-planned tables typically vend credentials only in the plan response, not at load-table time. - PlanStatus / content type now also accept the SCREAMING_CASE forms UC emits (e.g. `COMPLETED`, `DATA`) in addition to the kebab-case spec forms. Note: UC's scan planning currently rejects `case-sensitive=true`; callers must build scans with `with_case_sensitive(false)` against such servers. --- crates/catalog/rest/src/catalog.rs | 39 +++++- crates/catalog/rest/src/scan_planning/mod.rs | 7 +- .../catalog/rest/src/scan_planning/planner.rs | 18 ++- .../catalog/rest/src/scan_planning/stream.rs | 62 +++++++-- .../catalog/rest/src/scan_planning/types.rs | 4 + crates/iceberg/src/io/file_io.rs | 9 +- crates/iceberg/src/io/storage/config/mod.rs | 124 +++++++++++++++++- crates/iceberg/src/scan/mod.rs | 36 +++-- crates/iceberg/src/scan/planner.rs | 17 ++- crates/storage/opendal/src/resolving.rs | 117 +++++++++++++++-- 10 files changed, 386 insertions(+), 47 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index aabbf826e2..51e1202a25 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -23,7 +23,7 @@ use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; -use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; +use iceberg::io::{FileIO, FileIOBuilder, StorageCredential, StorageFactory}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, Runtime, @@ -59,6 +59,18 @@ const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); const PATH_V1: &str = "v1"; +/// Convert REST `storage-credentials` entries into core [`StorageCredential`]s +/// that can be attached to a [`FileIO`] via its [`StorageConfig`](iceberg::io::StorageConfig). +fn to_storage_credentials( + creds: Option>, +) -> Vec { + creds + .unwrap_or_default() + .into_iter() + .map(|c| StorageCredential::new(c.prefix, c.config)) + .collect() +} + /// Builder for [`RestCatalog`]. #[derive(Debug)] pub struct RestCatalogBuilder { @@ -497,6 +509,7 @@ impl RestCatalog { &self, metadata_location: Option<&str>, extra_config: Option>, + credentials: Vec, ) -> Result { let mut props = self.context().await?.config.props.clone(); if let Some(config) = extra_config { @@ -529,7 +542,10 @@ impl RestCatalog { ) })?; - let file_io = FileIOBuilder::new(factory).with_props(props).build(); + let file_io = FileIOBuilder::new(factory) + .with_props(props) + .with_storage_credentials(credentials) + .build(); Ok(file_io) } @@ -542,6 +558,8 @@ impl RestCatalog { context.config.clone(), context.endpoints.clone(), self.runtime.clone(), + self.storage_factory.clone(), + context.config.props.clone(), )) } @@ -854,8 +872,9 @@ impl Catalog for RestCatalog { .chain(self.user_config.props.clone()) .collect(); + let credentials = to_storage_credentials(response.storage_credentials); let file_io = self - .load_file_io(Some(metadata_location), Some(config)) + .load_file_io(Some(metadata_location), Some(config), credentials) .await?; let table_builder = Table::builder() @@ -912,8 +931,13 @@ impl Catalog for RestCatalog { .chain(self.user_config.props.clone()) .collect(); + let credentials = to_storage_credentials(response.storage_credentials); let file_io = self - .load_file_io(response.metadata_location.as_deref(), Some(config)) + .load_file_io( + response.metadata_location.as_deref(), + Some(config), + credentials, + ) .await?; let table_builder = Table::builder() @@ -1050,7 +1074,10 @@ impl Catalog for RestCatalog { "Metadata location missing in `register_table` response!", ))?; - let file_io = self.load_file_io(Some(metadata_location), None).await?; + let credentials = to_storage_credentials(response.storage_credentials); + let file_io = self + .load_file_io(Some(metadata_location), None, credentials) + .await?; Table::builder() .identifier(table_ident.clone()) @@ -1123,7 +1150,7 @@ impl Catalog for RestCatalog { }; let file_io = self - .load_file_io(Some(&response.metadata_location), None) + .load_file_io(Some(&response.metadata_location), None, Vec::new()) .await?; Table::builder() diff --git a/crates/catalog/rest/src/scan_planning/mod.rs b/crates/catalog/rest/src/scan_planning/mod.rs index b354a10e79..38081120c1 100644 --- a/crates/catalog/rest/src/scan_planning/mod.rs +++ b/crates/catalog/rest/src/scan_planning/mod.rs @@ -25,10 +25,11 @@ mod planner; mod stream; mod types; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use iceberg::expr::Predicate; +use iceberg::io::StorageFactory; use iceberg::spec::{SchemaRef, TableMetadataRef}; use iceberg::{Runtime, TableIdent}; pub use planner::RestScanPlanner; @@ -54,4 +55,8 @@ pub(crate) struct PlanScanContext { pub(crate) end_snapshot_id: Option, pub(crate) select: Option>, pub(crate) filter: Option, + /// Storage factory + base props used to build a plan-scoped `FileIO` from + /// the credentials the server vends in the plan response. + pub(crate) storage_factory: Option>, + pub(crate) base_props: HashMap, } diff --git a/crates/catalog/rest/src/scan_planning/planner.rs b/crates/catalog/rest/src/scan_planning/planner.rs index 18007b3e96..3eafe4c89e 100644 --- a/crates/catalog/rest/src/scan_planning/planner.rs +++ b/crates/catalog/rest/src/scan_planning/planner.rs @@ -17,11 +17,12 @@ //! [`RestScanPlanner`]: the REST catalog's [`ScanPlanner`] implementation. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use async_trait::async_trait; -use iceberg::scan::{FileScanTaskStream, ScanPlanRequest, ScanPlanner}; +use iceberg::io::StorageFactory; +use iceberg::scan::{ScanPlanRequest, ScanPlanner, ServerScanPlan}; use iceberg::{Result, Runtime}; use super::endpoint::Endpoint; @@ -33,13 +34,16 @@ use crate::client::HttpClient; /// /// Instances are created by `RestCatalog` and attached to the tables it loads; /// each holds a handle to the shared HTTP client, the negotiated runtime config -/// and endpoint set, so planning can run independently of the catalog object. +/// and endpoint set, plus the storage factory and base props needed to build a +/// plan-scoped `FileIO` from the credentials the server vends per scan. #[derive(Debug)] pub struct RestScanPlanner { client: Arc, config: RestCatalogConfig, endpoints: Arc>, runtime: Runtime, + storage_factory: Option>, + base_props: HashMap, } impl RestScanPlanner { @@ -48,19 +52,23 @@ impl RestScanPlanner { config: RestCatalogConfig, endpoints: Arc>, runtime: Runtime, + storage_factory: Option>, + base_props: HashMap, ) -> Self { Self { client, config, endpoints, runtime, + storage_factory, + base_props, } } } #[async_trait] impl ScanPlanner for RestScanPlanner { - async fn plan_table_scan(&self, request: ScanPlanRequest) -> Result { + async fn plan_table_scan(&self, request: ScanPlanRequest) -> Result { request.validate()?; let ctx = PlanScanContext { client: self.client.clone(), @@ -77,6 +85,8 @@ impl ScanPlanner for RestScanPlanner { end_snapshot_id: request.end_snapshot_id, select: request.select, filter: request.filter, + storage_factory: self.storage_factory.clone(), + base_props: self.base_props.clone(), }; plan_table_scan(ctx).await } diff --git a/crates/catalog/rest/src/scan_planning/stream.rs b/crates/catalog/rest/src/scan_planning/stream.rs index c1678ea436..be83a45030 100644 --- a/crates/catalog/rest/src/scan_planning/stream.rs +++ b/crates/catalog/rest/src/scan_planning/stream.rs @@ -27,7 +27,8 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use iceberg::expr::Bind; -use iceberg::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream}; +use iceberg::io::{FileIO, FileIOBuilder, StorageCredential}; +use iceberg::scan::{FileScanTask, FileScanTaskDeleteFile, ServerScanPlan}; use iceberg::{Error, ErrorKind, Result, TableIdent}; use reqwest::{Method, StatusCode}; use serde::Serialize; @@ -55,7 +56,7 @@ const MAX_WAIT: Duration = Duration::from_secs(300); /// Returns [`ErrorKind::FeatureUnsupported`] if the server does not advertise /// the submit endpoint, allowing the scan engine to fall back to native /// planning. -pub(crate) async fn plan_table_scan(ctx: PlanScanContext) -> Result { +pub(crate) async fn plan_table_scan(ctx: PlanScanContext) -> Result { endpoint::check(&ctx.endpoints, &Endpoint::submit_table_scan_plan())?; let bound_filter = match &ctx.filter { @@ -80,12 +81,37 @@ pub(crate) async fn plan_table_scan(ctx: PlanScanContext) -> Result), - ))) + Ok(ServerScanPlan { + tasks: Box::pin(futures::stream::iter( + tasks.into_iter().map(Ok::), + )), + file_io: build_scan_file_io(&ctx, wire_creds), + }) +} + +/// Build a plan-scoped `FileIO` from the credentials the server vended for this +/// scan, or `None` if there were none (or no storage factory is configured). +fn build_scan_file_io( + ctx: &PlanScanContext, + wire_creds: Vec, +) -> Option { + if wire_creds.is_empty() { + return None; + } + let factory = ctx.storage_factory.clone()?; + let credentials: Vec = wire_creds + .into_iter() + .map(|c| StorageCredential::new(c.prefix, c.config)) + .collect(); + Some( + FileIOBuilder::new(factory) + .with_props(ctx.base_props.clone()) + .with_storage_credentials(credentials) + .build(), + ) } /// Run the full state machine and collect all produced tasks. @@ -93,7 +119,7 @@ async fn drive( ctx: &PlanScanContext, convert_ctx: &ConvertContext, guard: &mut CancelGuard, -) -> Result> { +) -> Result<(Vec, Vec)> { let request = build_request(ctx); let submit_url = ctx.config.scan_plan_endpoint(&ctx.table_ident); let response: PlanTableScanResponse = post_json(&ctx.client, &submit_url, &request).await?; @@ -103,12 +129,16 @@ async fn drive( } let mut out = Vec::new(); + let mut credentials: Vec = Vec::new(); let (delete_files, file_scan_tasks, plan_tasks) = match response.status { - PlanStatus::Completed => ( - response.delete_files, - response.file_scan_tasks, - response.plan_tasks, - ), + PlanStatus::Completed => { + credentials.extend(response.storage_credentials.unwrap_or_default()); + ( + response.delete_files, + response.file_scan_tasks, + response.plan_tasks, + ) + } PlanStatus::Submitted => { let plan_id = response.plan_id.ok_or_else(|| { Error::new( @@ -116,7 +146,9 @@ async fn drive( "Server returned status=submitted without a plan-id", ) })?; - poll_until_complete(ctx, &plan_id).await? + let (d, f, p, c) = poll_until_complete(ctx, &plan_id).await?; + credentials.extend(c); + (d, f, p) } PlanStatus::Failed => return Err(failure_error(response.error.as_ref())), PlanStatus::Cancelled => { @@ -152,7 +184,7 @@ async fn drive( } } - Ok(out) + Ok((out, credentials)) } /// Poll `GET .../plan/{plan-id}` with exponential backoff until the plan @@ -164,6 +196,7 @@ async fn poll_until_complete( Option>, Option>, Option>, + Vec, )> { endpoint::check(&ctx.endpoints, &Endpoint::fetch_table_scan_plan())?; let url = ctx.config.scan_plan_id_endpoint(&ctx.table_ident, plan_id); @@ -179,6 +212,7 @@ async fn poll_until_complete( response.delete_files, response.file_scan_tasks, response.plan_tasks, + response.storage_credentials.unwrap_or_default(), )); } PlanStatus::Submitted => { diff --git a/crates/catalog/rest/src/scan_planning/types.rs b/crates/catalog/rest/src/scan_planning/types.rs index 64c10701eb..30b6bd5c56 100644 --- a/crates/catalog/rest/src/scan_planning/types.rs +++ b/crates/catalog/rest/src/scan_planning/types.rs @@ -33,12 +33,16 @@ use crate::types::StorageCredential; #[serde(rename_all = "kebab-case")] pub(crate) enum PlanStatus { /// Planning finished; tasks (and/or plan-task tokens) are available. + #[serde(alias = "COMPLETED")] Completed, /// Planning is in progress; poll the plan id until it completes. + #[serde(alias = "SUBMITTED")] Submitted, /// Planning was cancelled. + #[serde(alias = "CANCELLED")] Cancelled, /// Planning failed. + #[serde(alias = "FAILED")] Failed, } diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 227d8f4d5b..7624b34435 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -22,7 +22,8 @@ use bytes::Bytes; use futures::{Stream, StreamExt}; use super::storage::{ - LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory, + LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageCredential, + StorageFactory, }; use crate::Result; @@ -219,6 +220,12 @@ impl FileIOBuilder { self } + /// Attach per-prefix storage credentials (e.g. vended by a REST catalog). + pub fn with_storage_credentials(mut self, credentials: Vec) -> Self { + self.config = self.config.with_credentials(credentials); + self + } + /// Get the storage configuration. pub fn config(&self) -> &StorageConfig { &self.config diff --git a/crates/iceberg/src/io/storage/config/mod.rs b/crates/iceberg/src/io/storage/config/mod.rs index 2350aab6dd..b7ac173fcc 100644 --- a/crates/iceberg/src/io/storage/config/mod.rs +++ b/crates/iceberg/src/io/storage/config/mod.rs @@ -45,6 +45,30 @@ pub use oss::*; pub use s3::*; use serde::{Deserialize, Serialize}; +/// A storage credential scoped to a location prefix. +/// +/// This mirrors the `storage-credentials` entries returned by an Iceberg REST +/// catalog (see the REST spec). Each credential applies to objects whose path +/// starts with `prefix`; when several credentials match a path, the one with +/// the longest prefix should be preferred. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct StorageCredential { + /// Storage location prefix this credential applies to (e.g. `s3://bucket/db/table`). + pub prefix: String, + /// Backend configuration carrying the credential (e.g. `s3.access-key-id`, ...). + pub config: HashMap, +} + +impl StorageCredential { + /// Create a new storage credential for the given location prefix. + pub fn new(prefix: impl Into, config: HashMap) -> Self { + Self { + prefix: prefix.into(), + config, + } + } +} + /// Configuration properties for storage backends. /// /// This struct contains only configuration properties without specifying @@ -55,6 +79,9 @@ use serde::{Deserialize, Serialize}; pub struct StorageConfig { /// Configuration properties for the storage backend props: HashMap, + /// Per-prefix storage credentials, typically vended by a catalog. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + credentials: Vec, } impl StorageConfig { @@ -62,6 +89,7 @@ impl StorageConfig { pub fn new() -> Self { Self { props: HashMap::new(), + credentials: Vec::new(), } } @@ -71,7 +99,10 @@ impl StorageConfig { /// /// * `props` - Configuration properties for the storage backend pub fn from_props(props: HashMap) -> Self { - Self { props } + Self { + props, + credentials: Vec::new(), + } } /// Get all configuration properties. @@ -79,6 +110,41 @@ impl StorageConfig { &self.props } + /// Get the per-prefix storage credentials. + pub fn credentials(&self) -> &[StorageCredential] { + &self.credentials + } + + /// Attach per-prefix storage credentials (e.g. vended by a REST catalog). + /// + /// This is a builder-style method that returns `self` for chaining. + pub fn with_credentials(mut self, credentials: Vec) -> Self { + self.credentials = credentials; + self + } + + /// Resolve the most specific credential for `location`. + /// + /// Returns the credential whose `prefix` is the longest match for the given + /// location, mirroring the selection rule from the Iceberg REST spec. Returns + /// `None` when no credential prefix matches. + pub fn resolve_credential(&self, location: &str) -> Option<&StorageCredential> { + self.credentials + .iter() + .filter(|c| location.starts_with(&c.prefix)) + .max_by_key(|c| c.prefix.len()) + } + + /// Build the effective property map for `location`: base props overlaid with + /// the most specific matching credential's config (credential wins, per spec). + pub fn resolved_props(&self, location: &str) -> HashMap { + let mut props = self.props.clone(); + if let Some(cred) = self.resolve_credential(location) { + props.extend(cred.config.clone()); + } + props + } + /// Get a specific configuration property by key. /// /// # Arguments @@ -224,6 +290,62 @@ mod tests { assert!(config.props().is_empty()); } + #[test] + fn test_resolve_credential_longest_prefix_wins() { + let config = StorageConfig::new().with_credentials(vec![ + StorageCredential::new( + "s3://bucket", + HashMap::from([("s3.access-key-id".to_string(), "broad".to_string())]), + ), + StorageCredential::new( + "s3://bucket/db/table", + HashMap::from([("s3.access-key-id".to_string(), "specific".to_string())]), + ), + ]); + + let cred = config + .resolve_credential("s3://bucket/db/table/data/0.parquet") + .expect("a credential should match"); + assert_eq!(cred.prefix, "s3://bucket/db/table"); + assert_eq!(cred.config.get("s3.access-key-id").unwrap(), "specific"); + + // A path only under the broad prefix falls back to it. + let cred = config + .resolve_credential("s3://bucket/other/file.parquet") + .unwrap(); + assert_eq!(cred.prefix, "s3://bucket"); + } + + #[test] + fn test_resolve_credential_no_match() { + let config = StorageConfig::new().with_credentials(vec![StorageCredential::new( + "s3://bucket-a", + HashMap::from([("s3.access-key-id".to_string(), "a".to_string())]), + )]); + + assert!(config.resolve_credential("s3://bucket-b/x").is_none()); + } + + #[test] + fn test_resolved_props_credential_overrides_base() { + let config = StorageConfig::new() + .with_prop("s3.region", "us-east-1") + .with_prop("s3.access-key-id", "base") + .with_credentials(vec![StorageCredential::new( + "s3://bucket", + HashMap::from([("s3.access-key-id".to_string(), "vended".to_string())]), + )]); + + let props = config.resolved_props("s3://bucket/db/table/data/0.parquet"); + // Credential wins over base for the same key, base-only keys are preserved. + assert_eq!(props.get("s3.access-key-id").unwrap(), "vended"); + assert_eq!(props.get("s3.region").unwrap(), "us-east-1"); + + // No matching credential: base props returned unchanged. + let props = config.resolved_props("gs://other/file"); + assert_eq!(props.get("s3.access-key-id").unwrap(), "base"); + } + #[test] fn test_storage_config_serialization_empty() { let config = StorageConfig::new(); diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e27a9701ea..3e97bf54d8 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -24,13 +24,13 @@ use context::*; mod planner; mod task; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use arrow_array::RecordBatch; use futures::channel::mpsc::{Sender, channel}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; -pub use planner::{ScanPlanRequest, ScanPlanner}; +pub use planner::{ScanPlanRequest, ScanPlanner, ServerScanPlan}; pub use task::*; use crate::arrow::ArrowReaderBuilder; @@ -236,6 +236,7 @@ impl<'a> TableScanBuilder<'a> { table_ident: self.table.identifier().clone(), start_snapshot_id: self.start_snapshot_id, end_snapshot_id: self.end_snapshot_id, + scan_io_override: Arc::new(OnceLock::new()), }); }; current_snapshot_id.clone() @@ -334,6 +335,7 @@ impl<'a> TableScanBuilder<'a> { table_ident: self.table.identifier().clone(), start_snapshot_id: self.start_snapshot_id, end_snapshot_id: self.end_snapshot_id, + scan_io_override: Arc::new(OnceLock::new()), }) } } @@ -374,6 +376,9 @@ pub struct TableScan { /// Incremental scan bounds, forwarded to the server-side planner. start_snapshot_id: Option, end_snapshot_id: Option, + /// Set by server-side planning to a plan-scoped [`FileIO`] carrying vended + /// credentials; [`to_arrow`](Self::to_arrow) reads through it when present. + scan_io_override: Arc>, } impl TableScan { @@ -408,7 +413,12 @@ impl TableScan { }; match planner.plan_table_scan(request).await { - Ok(stream) => return Ok(stream), + Ok(plan) => { + if let Some(io) = plan.file_io { + let _ = self.scan_io_override.set(io); + } + return Ok(plan.tasks); + } Err(e) if e.kind() == ErrorKind::FeatureUnsupported => { // Fall through to native, client-side planning below. } @@ -533,11 +543,19 @@ impl TableScan { /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { - let mut arrow_reader_builder = - ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone()) - .with_data_file_concurrency_limit(self.concurrency_limit_data_files) - .with_row_group_filtering_enabled(self.row_group_filtering_enabled) - .with_row_selection_enabled(self.row_selection_enabled); + // Plan first: server-side planning may install a plan-scoped FileIO + // (with vended credentials) that we must read through. + let tasks = self.plan_files().await?; + let file_io = self + .scan_io_override + .get() + .cloned() + .unwrap_or_else(|| self.file_io.clone()); + + let mut arrow_reader_builder = ArrowReaderBuilder::new(file_io, self.runtime.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files) + .with_row_group_filtering_enabled(self.row_group_filtering_enabled) + .with_row_selection_enabled(self.row_selection_enabled); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); @@ -545,7 +563,7 @@ impl TableScan { arrow_reader_builder .build() - .read(self.plan_files().await?) + .read(tasks) .map(|result| result.stream()) } diff --git a/crates/iceberg/src/scan/planner.rs b/crates/iceberg/src/scan/planner.rs index 3a8838b345..2456e40d38 100644 --- a/crates/iceberg/src/scan/planner.rs +++ b/crates/iceberg/src/scan/planner.rs @@ -35,10 +35,22 @@ use std::fmt::Debug; use async_trait::async_trait; use crate::expr::Predicate; +use crate::io::FileIO; use crate::scan::FileScanTaskStream; use crate::spec::{SchemaRef, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; +/// The result of a server-side scan plan: the file scan tasks plus an optional +/// plan-scoped [`FileIO`] built from the credentials the server vended for this +/// scan. When present, the scan engine reads data files through this `FileIO` +/// instead of the table's default one. +pub struct ServerScanPlan { + /// The planned file scan tasks. + pub tasks: FileScanTaskStream, + /// A `FileIO` carrying the plan's vended storage credentials, if any. + pub file_io: Option, +} + /// A neutral, catalog-agnostic description of the scan to be planned remotely. /// /// This is the contract between the core scan engine (which assembles it from a @@ -89,8 +101,9 @@ pub struct ScanPlanRequest { /// (client-side) planning. Any other error is treated as a hard failure. #[async_trait] pub trait ScanPlanner: Debug + Send + Sync { - /// Plan a table scan on the server and return a stream of file scan tasks. - async fn plan_table_scan(&self, request: ScanPlanRequest) -> Result; + /// Plan a table scan on the server, returning the file scan tasks and an + /// optional plan-scoped [`FileIO`] carrying any vended credentials. + async fn plan_table_scan(&self, request: ScanPlanRequest) -> Result; } impl ScanPlanRequest { diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 86993220a8..8dc174c357 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -27,7 +27,7 @@ use futures::StreamExt; use futures::stream::BoxStream; use iceberg::io::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, - StorageFactory, + StorageCredential, StorageFactory, }; use iceberg::{Error, ErrorKind, Result}; use serde::{Deserialize, Serialize}; @@ -188,6 +188,7 @@ impl StorageFactory for OpenDalResolvingStorageFactory { fn build(&self, config: &StorageConfig) -> Result> { Ok(Arc::new(OpenDalResolvingStorage { props: config.props().clone(), + credentials: config.credentials().to_vec(), storages: RwLock::new(HashMap::new()), #[cfg(feature = "opendal-s3")] customized_credential_load: self.customized_credential_load.clone(), @@ -205,9 +206,16 @@ impl StorageFactory for OpenDalResolvingStorageFactory { pub struct OpenDalResolvingStorage { /// Configuration properties shared across all backends. props: HashMap, - /// Cache of canonical scheme to storage mappings. + /// Per-prefix storage credentials (e.g. vended by a REST catalog). + #[serde(default, skip_serializing_if = "Vec::is_empty")] + credentials: Vec, + /// Cache of resolution-key to storage mappings. + /// + /// The key is the canonical scheme when no credential matches, or the matched + /// credential prefix otherwise, so that different prefixes (with different + /// vended credentials) under the same scheme get distinct storage instances. #[serde(skip, default)] - storages: RwLock>>, + storages: RwLock>>, /// Custom AWS credential loader for S3 storage. #[cfg(feature = "opendal-s3")] #[serde(skip)] @@ -215,10 +223,26 @@ pub struct OpenDalResolvingStorage { } impl OpenDalResolvingStorage { - /// Resolve the storage for the given path by extracting the canonical scheme and - /// returning the cached or newly-created [`OpenDalStorage`]. + /// Select the most specific (longest-prefix) credential matching `path`. + fn resolve_credential(&self, path: &str) -> Option<&StorageCredential> { + self.credentials + .iter() + .filter(|c| path.starts_with(&c.prefix)) + .max_by_key(|c| c.prefix.len()) + } + + /// Resolve the storage for the given path by extracting the canonical scheme, + /// overlaying any matching vended credential, and returning the cached or + /// newly-created [`OpenDalStorage`]. fn resolve(&self, path: &str) -> Result> { let scheme = extract_scheme(path)?; + let credential = self.resolve_credential(path); + + // Cache key: matched credential prefix (most specific) or the scheme. + let cache_key = match credential { + Some(cred) => format!("{scheme}\u{0}{}", cred.prefix), + None => scheme.to_string(), + }; // Fast path: check read lock first. { @@ -226,7 +250,7 @@ impl OpenDalResolvingStorage { .storages .read() .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; - if let Some(storage) = cache.get(&scheme) { + if let Some(storage) = cache.get(&cache_key) { return Ok(storage.clone()); } } @@ -238,18 +262,29 @@ impl OpenDalResolvingStorage { .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; // Double-check after acquiring write lock. - if let Some(storage) = cache.get(&scheme) { + if let Some(storage) = cache.get(&cache_key) { return Ok(storage.clone()); } + // Overlay the matching credential's config over the base props (credential + // wins, per the Iceberg REST spec preference for `storage-credentials`). + let props = match credential { + Some(cred) => { + let mut merged = self.props.clone(); + merged.extend(cred.config.clone()); + merged + } + None => self.props.clone(), + }; + let storage = build_storage_for_scheme( scheme, - &self.props, + &props, #[cfg(feature = "opendal-s3")] &self.customized_credential_load, )?; let storage = Arc::new(storage); - cache.insert(scheme, storage.clone()); + cache.insert(cache_key, storage.clone()); Ok(storage) } } @@ -331,12 +366,76 @@ mod tests { fn empty_resolving_storage() -> OpenDalResolvingStorage { OpenDalResolvingStorage { props: HashMap::new(), + credentials: Vec::new(), storages: RwLock::new(HashMap::new()), #[cfg(feature = "opendal-s3")] customized_credential_load: None, } } + #[cfg(feature = "opendal-s3")] + fn resolving_storage_with_credentials( + credentials: Vec, + ) -> OpenDalResolvingStorage { + OpenDalResolvingStorage { + props: HashMap::from([("s3.region".to_string(), "us-east-1".to_string())]), + credentials, + storages: RwLock::new(HashMap::new()), + customized_credential_load: None, + } + } + + #[cfg(feature = "opendal-s3")] + #[test] + fn test_resolve_distinct_credentials_yield_distinct_storages() { + let storage = resolving_storage_with_credentials(vec![ + StorageCredential::new( + "s3://bucket/db/t1", + HashMap::from([ + ("s3.access-key-id".to_string(), "k1".to_string()), + ("s3.secret-access-key".to_string(), "s1".to_string()), + ]), + ), + StorageCredential::new( + "s3://bucket/db/t2", + HashMap::from([ + ("s3.access-key-id".to_string(), "k2".to_string()), + ("s3.secret-access-key".to_string(), "s2".to_string()), + ]), + ), + ]); + + let t1_a = storage.resolve("s3://bucket/db/t1/data/0.parquet").unwrap(); + let t1_b = storage.resolve("s3://bucket/db/t1/data/1.parquet").unwrap(); + let t2 = storage.resolve("s3://bucket/db/t2/data/0.parquet").unwrap(); + + // Same credential prefix -> shared cached storage. + assert!( + Arc::ptr_eq(&t1_a, &t1_b), + "paths under the same credential prefix should share a storage" + ); + // Different credential prefix -> distinct storage instances. + assert!( + !Arc::ptr_eq(&t1_a, &t2), + "paths under different credential prefixes must not share a storage" + ); + } + + #[cfg(feature = "opendal-s3")] + #[test] + fn test_resolve_no_credential_falls_back_to_scheme_sharing() { + // With credentials present but a path that matches none, resolution falls + // back to scheme-level sharing (keyed by scheme, not prefix). + let storage = resolving_storage_with_credentials(vec![StorageCredential::new( + "s3://bucket/db/t1", + HashMap::from([("s3.access-key-id".to_string(), "k1".to_string())]), + )]); + + let other_a = storage.resolve("s3://other/x.parquet").unwrap(); + let other_b = storage.resolve("s3://other/y.parquet").unwrap(); + assert!(Arc::ptr_eq(&other_a, &other_b)); + } + #[cfg(feature = "opendal-s3")] #[test] fn test_resolve_s3_aliases_share_instance() { From 356d502f2d47f8baa5ad06332e7257583680fea9 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 19 Jun 2026 14:32:58 -0700 Subject: [PATCH 3/5] test(catalog-rest): integration test for server-side scan planning End-to-end test: create a table, append a data file, then scan a reloaded table (which carries the injected RestScanPlanner) and assert the rows. With a scan-planning-capable fixture (>= 1.11.x) the scan is served via POST .../plan; otherwise it falls back to native planning and the same assertions hold. --- .../tests/scan_planning_test.rs | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 crates/integration_tests/tests/scan_planning_test.rs diff --git a/crates/integration_tests/tests/scan_planning_test.rs b/crates/integration_tests/tests/scan_planning_test.rs new file mode 100644 index 0000000000..d298af0288 --- /dev/null +++ b/crates/integration_tests/tests/scan_planning_test.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration test for server-side REST scan planning. +//! +//! Drives the full client protocol end-to-end against the REST fixture: create a +//! table, append a data file, then scan a freshly-loaded table and assert the +//! rows come back. A table loaded through `RestCatalog` carries an injected +//! `RestScanPlanner`, so its scan negotiates the planning endpoints and, when the +//! fixture advertises them (rest-fixture >= 1.11.x), is served by the server via +//! `POST .../plan`. Against an older fixture the client transparently falls back +//! to native planning and the same assertions hold. + +mod common; + +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use common::{random_ns, test_schema}; +use futures::TryStreamExt; +use iceberg::spec::DataFileFormat; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, CatalogBuilder, TableCreation}; +use iceberg_catalog_rest::RestCatalogBuilder; +use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; +use parquet::file::properties::WriterProperties; + +#[tokio::test] +async fn test_server_side_scan_planning() { + let fixture = get_test_fixture(); + let rest_catalog = RestCatalogBuilder::default() + .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 { + customized_credential_load: None, + })) + .load("rest", fixture.catalog_config.clone()) + .await + .unwrap(); + + let ns = random_ns().await; + let schema = test_schema(); + let table = rest_catalog + .create_table( + ns.name(), + TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(), + ) + .await + .unwrap(); + + // Write a single data file with four rows. + let arrow_schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata()).unwrap(); + let file_name_generator = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + ); + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + table.file_io().clone(), + location_generator, + file_name_generator, + ); + let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder) + .build(None) + .await + .unwrap(); + + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + let tx = Transaction::new(&table); + let append_action = tx.fast_append().add_data_files(data_file); + let tx = append_action.apply(tx).unwrap(); + tx.commit(&rest_catalog).await.unwrap(); + + // Reload so the table carries the catalog-injected `RestScanPlanner`, then + // scan. With a scan-planning-capable fixture this round-trips through the + // server's `POST .../plan`; otherwise it falls back to native planning. + let table = rest_catalog.load_table(table.identifier()).await.unwrap(); + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec = batch_stream.try_collect().await.unwrap(); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4, "expected the four rows that were written"); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); +} From 80bc282c0c5f28f15a642bc2d0d1301f1ff725fa Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 20 Jun 2026 13:35:16 -0700 Subject: [PATCH 4/5] chore(dev): use latest iceberg-rest-fixture for scan-planning test The 1.10.x fixtures only ship the scan-planning wire types, not the server-side implementation (the handler + endpoint advertising landed in 1.11.0). Against those images the integration test silently falls back to native planning, so it never exercises the server path. Use `latest` (currently the only published fixture image that serves server-side scan planning) so CI actually drives `POST .../plan`. Pin to a 1.11.x tag once one is published. --- dev/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 21920c9ce6..9eb6829497 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -80,7 +80,7 @@ services: # REST Catalog - Apache Iceberg REST Catalog # ============================================================================= rest: - image: apache/iceberg-rest-fixture:1.10.0 + image: apache/iceberg-rest-fixture:latest environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password From 9e6a898efdf18e96a2f03809672ebb1942546881 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 22 Jun 2026 14:26:23 -0700 Subject: [PATCH 5/5] perf(catalog-rest): stream scan-plan tasks lazily with bounded concurrency The planner previously drained every `plan-task` token in sequence and collected all `FileScanTask`s into a `Vec` before returning. Replace that with a lazy stream: the tasks produced inline by planning are yielded first, then the outstanding tokens are fetched on demand (up to a bounded number in flight), queueing any further tokens a fetch returns. The bounded buffer applies natural backpressure, and the cancel guard now travels with the stream, so dropping the scan early cancels the plan on the server. Add a mock-server test for the recursive plan-task fan-out and termination, and fix a `clippy::bool_assert_comparison` in an existing test. --- .../catalog/rest/src/scan_planning/stream.rs | 281 +++++++++++++++--- .../catalog/rest/src/scan_planning/types.rs | 5 +- 2 files changed, 242 insertions(+), 44 deletions(-) diff --git a/crates/catalog/rest/src/scan_planning/stream.rs b/crates/catalog/rest/src/scan_planning/stream.rs index be83a45030..e324be8098 100644 --- a/crates/catalog/rest/src/scan_planning/stream.rs +++ b/crates/catalog/rest/src/scan_planning/stream.rs @@ -17,18 +17,23 @@ //! The server-side scan-planning state machine: submit → (poll) → fetch tasks. //! -//! Planning is driven to completion and the resulting [`FileScanTask`]s are -//! returned as a stream. If the planning future is dropped before it resolves -//! (e.g. the caller aborts the scan), a [`CancelGuard`] fires a best-effort -//! `DELETE .../plan/{plan-id}` so the server can release the in-flight plan. +//! Planning is driven to completion, then the resulting [`FileScanTask`]s are +//! produced as a lazy stream: tasks returned inline come first, then any +//! `plan-task` tokens are fetched on demand with bounded concurrency (a token's +//! response may carry further tokens, which are queued the same way). A +//! [`CancelGuard`] held by the stream fires a best-effort `DELETE .../plan/{id}` +//! if the stream is dropped before it is fully drained, so the server can +//! release the plan. use std::collections::{HashSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; +use futures::future::BoxFuture; +use futures::stream::{FuturesUnordered, StreamExt}; use iceberg::expr::Bind; use iceberg::io::{FileIO, FileIOBuilder, StorageCredential}; -use iceberg::scan::{FileScanTask, FileScanTaskDeleteFile, ServerScanPlan}; +use iceberg::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream, ServerScanPlan}; use iceberg::{Error, ErrorKind, Result, TableIdent}; use reqwest::{Method, StatusCode}; use serde::Serialize; @@ -81,15 +86,24 @@ pub(crate) async fn plan_table_scan(ctx: PlanScanContext) -> Result), - )), - file_io: build_scan_file_io(&ctx, wire_creds), - }) + // `plan-task` tokens require the fetch-tasks endpoint; check once up front + // (the same endpoint covers any further tokens that fetching returns). + if !planned.plan_tasks.is_empty() { + endpoint::check(&ctx.endpoints, &Endpoint::fetch_table_scan_plan_tasks())?; + } + + let tasks = task_stream( + &ctx, + convert_ctx, + planned.initial_tasks, + planned.plan_tasks, + guard, + ); + + Ok(ServerScanPlan { tasks, file_io }) } /// Build a plan-scoped `FileIO` from the credentials the server vended for this @@ -114,12 +128,26 @@ fn build_scan_file_io( ) } -/// Run the full state machine and collect all produced tasks. -async fn drive( +/// Maximum number of `fetchScanTasks` requests in flight at once. +const MAX_CONCURRENT_TASK_FETCHES: usize = 8; + +/// Outcome of the planning phase: tasks produced inline, the `plan-task` tokens +/// still to be fetched, and the credentials the server vended for this plan. +struct PlannedScan { + initial_tasks: Vec, + plan_tasks: VecDeque, + credentials: Vec, +} + +/// Submit the scan plan and drive it to completion, returning the tasks the +/// server produced inline along with any outstanding `plan-task` tokens and +/// vended credentials. Tokens are not fetched here — that happens lazily in +/// [`task_stream`]. +async fn plan_initial( ctx: &PlanScanContext, convert_ctx: &ConvertContext, guard: &mut CancelGuard, -) -> Result<(Vec, Vec)> { +) -> Result { let request = build_request(ctx); let submit_url = ctx.config.scan_plan_endpoint(&ctx.table_ident); let response: PlanTableScanResponse = post_json(&ctx.client, &submit_url, &request).await?; @@ -128,7 +156,6 @@ async fn drive( guard.plan_id = Some(plan_id.clone()); } - let mut out = Vec::new(); let mut credentials: Vec = Vec::new(); let (delete_files, file_scan_tasks, plan_tasks) = match response.status { PlanStatus::Completed => { @@ -159,32 +186,110 @@ async fn drive( } }; - convert_into(delete_files, file_scan_tasks, convert_ctx, &mut out)?; + let mut initial_tasks = Vec::new(); + convert_into( + delete_files, + file_scan_tasks, + convert_ctx, + &mut initial_tasks, + )?; - // Fan out over plan-task tokens; fetchScanTasks may itself return more. - let mut queue: VecDeque = plan_tasks.unwrap_or_default().into_iter().collect(); - if !queue.is_empty() { - endpoint::check(&ctx.endpoints, &Endpoint::fetch_table_scan_plan_tasks())?; - } - let tasks_url = ctx.config.scan_tasks_endpoint(&ctx.table_ident); - while let Some(plan_task) = queue.pop_front() { - let response: FetchScanTasksResponse = - post_json(&ctx.client, &tasks_url, &FetchScanTasksRequest { - plan_task, - }) - .await?; - convert_into( - response.delete_files, - response.file_scan_tasks, - convert_ctx, - &mut out, - )?; - if let Some(more) = response.plan_tasks { - queue.extend(more); - } - } + Ok(PlannedScan { + initial_tasks, + plan_tasks: plan_tasks.unwrap_or_default().into_iter().collect(), + credentials, + }) +} - Ok((out, credentials)) +/// Mutable state threaded through the lazy task stream. +struct TaskStreamState { + client: Arc, + tasks_url: String, + convert_ctx: ConvertContext, + /// `plan-task` tokens still to fetch (grows as fetches return more). + pending: VecDeque, + inflight: FuturesUnordered>>, + /// Converted tasks ready to be yielded. + buffer: VecDeque, + /// Held until the stream is drained; dropping the stream early cancels the + /// plan on the server. + guard: CancelGuard, +} + +/// Produce the file scan tasks as a lazy stream. +/// +/// The tasks returned inline by planning are yielded first; the outstanding +/// `plan-task` tokens are then fetched on demand, up to +/// [`MAX_CONCURRENT_TASK_FETCHES`] at a time, and any further tokens a fetch +/// returns are queued and fetched the same way. Fetches only run while the +/// consumer is polling, so the bounded buffer applies natural backpressure. +fn task_stream( + ctx: &PlanScanContext, + convert_ctx: ConvertContext, + initial_tasks: Vec, + plan_tasks: VecDeque, + guard: CancelGuard, +) -> FileScanTaskStream { + let state = TaskStreamState { + client: ctx.client.clone(), + tasks_url: ctx.config.scan_tasks_endpoint(&ctx.table_ident), + convert_ctx, + pending: plan_tasks, + inflight: FuturesUnordered::new(), + buffer: initial_tasks.into_iter().collect(), + guard, + }; + + Box::pin(futures::stream::unfold(state, |mut st| async move { + loop { + if let Some(task) = st.buffer.pop_front() { + return Some((Ok(task), st)); + } + + // Top up in-flight fetches from the pending token queue. + while st.inflight.len() < MAX_CONCURRENT_TASK_FETCHES { + let Some(plan_task) = st.pending.pop_front() else { + break; + }; + let client = st.client.clone(); + let url = st.tasks_url.clone(); + let future: BoxFuture<'static, Result> = + Box::pin(async move { + post_json(&client, &url, &FetchScanTasksRequest { plan_task }).await + }); + st.inflight.push(future); + } + + // No buffered tasks, no pending tokens, nothing in flight → done. + if st.inflight.is_empty() { + st.guard.disarm(); + return None; + } + + match st.inflight.next().await { + Some(Ok(response)) => { + let mut produced = Vec::new(); + if let Err(err) = convert_into( + response.delete_files, + response.file_scan_tasks, + &st.convert_ctx, + &mut produced, + ) { + return Some((Err(err), st)); + } + st.buffer.extend(produced); + if let Some(more) = response.plan_tasks { + st.pending.extend(more); + } + } + Some(Err(err)) => return Some((Err(err), st)), + None => { + st.guard.disarm(); + return None; + } + } + } + })) } /// Poll `GET .../plan/{plan-id}` with exponential backoff until the plan @@ -348,3 +453,97 @@ impl Drop for CancelGuard { }); } } + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + + use futures::TryStreamExt; + use iceberg::Runtime; + use mockito::Matcher; + + use super::*; + use crate::types::LoadTableResult; + + /// Build a minimal [`PlanScanContext`] pointed at a mock server. The table + /// metadata is borrowed from the load-table fixture; it is only consulted + /// when converting file-scan tasks, which these tests deliberately leave + /// empty so the focus stays on the planning/fan-out control flow. + fn test_context(server_url: &str) -> PlanScanContext { + let load: LoadTableResult = + serde_json::from_str(include_str!("../../testdata/load_table_response.json")).unwrap(); + let metadata = Arc::new(load.metadata); + let snapshot_schema = metadata.current_schema().clone(); + let config = RestCatalogConfig::builder() + .uri(server_url.to_string()) + .build(); + let endpoints: HashSet = [ + Endpoint::submit_table_scan_plan(), + Endpoint::fetch_table_scan_plan(), + Endpoint::fetch_table_scan_plan_tasks(), + Endpoint::cancel_table_scan_plan(), + ] + .into_iter() + .collect(); + + PlanScanContext { + client: Arc::new(HttpClient::new(&config).unwrap()), + config, + endpoints: Arc::new(endpoints), + runtime: Runtime::current(), + table_ident: TableIdent::from_strs(["ns", "t"]).unwrap(), + metadata, + snapshot_schema, + project_field_ids: vec![], + case_sensitive: true, + snapshot_id: None, + start_snapshot_id: None, + end_snapshot_id: None, + select: None, + filter: None, + storage_factory: None, + base_props: HashMap::new(), + } + } + + /// A completed plan that hands back a `plan-task` token, whose fetch in turn + /// returns a further token, must drive both fetches and then terminate. + #[tokio::test] + async fn fan_out_fetches_recursive_plan_tasks() { + let mut server = mockito::Server::new_async().await; + + let plan_mock = server + .mock("POST", Matcher::Regex(r"/plan$".to_string())) + .with_status(200) + .with_body(r#"{"status":"completed","plan-tasks":["t1"]}"#) + .create_async() + .await; + + // Fetching "t1" yields no tasks but a further token "t2". + let t1_mock = server + .mock("POST", Matcher::Regex(r"/tasks$".to_string())) + .match_body(Matcher::Regex(r#""t1""#.to_string())) + .with_status(200) + .with_body(r#"{"plan-tasks":["t2"]}"#) + .create_async() + .await; + + // Fetching "t2" returns nothing further, ending the fan-out. + let t2_mock = server + .mock("POST", Matcher::Regex(r"/tasks$".to_string())) + .match_body(Matcher::Regex(r#""t2""#.to_string())) + .with_status(200) + .with_body(r#"{}"#) + .create_async() + .await; + + let plan = plan_table_scan(test_context(&server.url())).await.unwrap(); + let tasks: Vec = plan.tasks.try_collect().await.unwrap(); + + assert!(tasks.is_empty()); + plan_mock.assert_async().await; + t1_mock.assert_async().await; + // The recursive token was fetched and the stream terminated. + t2_mock.assert_async().await; + } +} diff --git a/crates/catalog/rest/src/scan_planning/types.rs b/crates/catalog/rest/src/scan_planning/types.rs index 30b6bd5c56..a345c930c4 100644 --- a/crates/catalog/rest/src/scan_planning/types.rs +++ b/crates/catalog/rest/src/scan_planning/types.rs @@ -223,9 +223,8 @@ mod tests { serde_json::from_value::(serde_json::json!("completed")).unwrap(), PlanStatus::Completed ); - assert_eq!( - serde_json::from_value::(serde_json::json!("position-deletes")).is_err(), - true + assert!( + serde_json::from_value::(serde_json::json!("position-deletes")).is_err() ); }