Skip to content

Commit 98867e2

Browse files
authored
[ #9047] feat(iceberg):Support submit table scan plan (#9050)
### What changes were proposed in this pull request? Support submit table scan plan. ### Why are the changes needed? Fix: #([9047](#9047)) ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested?
1 parent 75f4bff commit 98867e2

File tree

14 files changed

+709
-4
lines changed

14 files changed

+709
-4
lines changed

core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public enum OperationType {
2626
PURGE_TABLE,
2727
LOAD_TABLE,
2828
LOAD_TABLE_CREDENTIAL,
29+
PLAN_TABLE_SCAN,
2930
LIST_TABLE,
3031
ALTER_TABLE,
3132
RENAME_TABLE,

docs/iceberg-rest-service.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ There are some key difference between Gravitino Iceberg REST server and Gravitin
2222
- Supports the Apache Iceberg REST API defined in Iceberg 1.10, and supports all namespace and table interfaces. The following interfaces are not implemented yet:
2323
- multi table transaction
2424
- pagination
25-
- scan planning
2625
- Works as a catalog proxy, supporting `Hive` and `JDBC` as catalog backend.
2726
- Supports credential vending for `S3``GCS``OSS` and `ADLS`.
2827
- Supports different storages like `S3`, `HDFS`, `OSS`, `GCS`, `ADLS` and provides the capability to support other storages.

iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.ImmutableMap;
2424
import com.google.common.collect.ImmutableSet;
25+
import java.io.IOException;
26+
import java.util.ArrayList;
2527
import java.util.Collections;
2628
import java.util.HashMap;
29+
import java.util.List;
2730
import java.util.Map;
2831
import java.util.Set;
2932
import java.util.stream.Stream;
@@ -40,15 +43,27 @@
4043
import org.apache.gravitino.storage.GCSProperties;
4144
import org.apache.gravitino.utils.MapUtils;
4245
import org.apache.gravitino.utils.PrincipalUtils;
46+
import org.apache.iceberg.DeleteFile;
47+
import org.apache.iceberg.FileScanTask;
48+
import org.apache.iceberg.IncrementalAppendScan;
49+
import org.apache.iceberg.PartitionSpec;
50+
import org.apache.iceberg.Scan;
51+
import org.apache.iceberg.ScanTaskParser;
52+
import org.apache.iceberg.Table;
4353
import org.apache.iceberg.TableMetadata;
4454
import org.apache.iceberg.TableProperties;
55+
import org.apache.iceberg.TableScan;
4556
import org.apache.iceberg.catalog.Namespace;
4657
import org.apache.iceberg.catalog.TableIdentifier;
4758
import org.apache.iceberg.exceptions.ServiceUnavailableException;
59+
import org.apache.iceberg.io.CloseableIterable;
60+
import org.apache.iceberg.rest.PlanStatus;
4861
import org.apache.iceberg.rest.requests.CreateTableRequest;
62+
import org.apache.iceberg.rest.requests.PlanTableScanRequest;
4963
import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
5064
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
5165
import org.apache.iceberg.rest.responses.LoadTableResponse;
66+
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
5267

5368
/** Process Iceberg REST specific operations, like credential vending. */
5469
public class CatalogWrapperForREST extends IcebergCatalogWrapper {
@@ -207,6 +222,211 @@ private Credential getCredential(
207222
return credential;
208223
}
209224

225+
/**
226+
* Plan table scan and return scan tasks.
227+
*
228+
* <p>This method performs server-side scan planning to optimize query performance by reducing
229+
* client-side metadata loading and enabling parallel task execution.
230+
*
231+
* <p>Implementation uses synchronous scan planning (COMPLETED status) where tasks are returned
232+
* immediately as serialized JSON strings. This is different from asynchronous mode (SUBMITTED
233+
* status) where a plan ID is returned for later retrieval.
234+
*
235+
* <p>Referenced from Iceberg PR #13400 for scan planning implementation.
236+
*
237+
* @param tableIdentifier The table identifier.
238+
* @param scanRequest The scan request parameters including filters, projections, snapshot-id,
239+
* etc.
240+
* @return PlanTableScanResponse with status=COMPLETED and serialized planTasks.
241+
* @throws IllegalArgumentException if scan request validation fails
242+
* @throws org.apache.gravitino.exceptions.NoSuchTableException if table doesn't exist
243+
* @throws RuntimeException for other scan planning failures
244+
*/
245+
public PlanTableScanResponse planTableScan(
246+
TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
247+
248+
LOG.debug(
249+
"Planning scan for table: {}, snapshotId: {}, startSnapshotId: {}, endSnapshotId: {}, select: {}, caseSensitive: {}",
250+
tableIdentifier,
251+
scanRequest.snapshotId(),
252+
scanRequest.startSnapshotId(),
253+
scanRequest.endSnapshotId(),
254+
scanRequest.select(),
255+
scanRequest.caseSensitive());
256+
257+
try {
258+
Table table = catalog.loadTable(tableIdentifier);
259+
CloseableIterable<FileScanTask> fileScanTasks =
260+
createFilePlanScanTasks(table, tableIdentifier, scanRequest);
261+
262+
List<String> planTasks = new ArrayList<>();
263+
Map<Integer, PartitionSpec> specsById = new HashMap<>();
264+
List<org.apache.iceberg.DeleteFile> deleteFiles = new ArrayList<>();
265+
266+
try (fileScanTasks) {
267+
for (FileScanTask fileScanTask : fileScanTasks) {
268+
try {
269+
String taskString = ScanTaskParser.toJson(fileScanTask);
270+
planTasks.add(taskString);
271+
272+
int specId = fileScanTask.spec().specId();
273+
if (!specsById.containsKey(specId)) {
274+
specsById.put(specId, fileScanTask.spec());
275+
}
276+
277+
if (!fileScanTask.deletes().isEmpty()) {
278+
deleteFiles.addAll(fileScanTask.deletes());
279+
}
280+
} catch (Exception e) {
281+
throw new RuntimeException(
282+
String.format(
283+
"Failed to serialize scan task for table: %s. Error: %s",
284+
tableIdentifier, e.getMessage()),
285+
e);
286+
}
287+
}
288+
} catch (IOException e) {
289+
LOG.error("Failed to close scan task iterator for table: {}", tableIdentifier, e);
290+
throw new RuntimeException("Failed to plan scan tasks: " + e.getMessage(), e);
291+
}
292+
293+
List<DeleteFile> uniqueDeleteFiles =
294+
deleteFiles.stream().distinct().collect(java.util.stream.Collectors.toList());
295+
296+
if (planTasks.isEmpty()) {
297+
LOG.info(
298+
"Scan planning returned no tasks for table: {}. Table may be empty or fully filtered.",
299+
tableIdentifier);
300+
}
301+
302+
PlanTableScanResponse.Builder responseBuilder =
303+
PlanTableScanResponse.builder()
304+
.withPlanStatus(PlanStatus.COMPLETED)
305+
.withPlanTasks(planTasks)
306+
.withSpecsById(specsById);
307+
308+
if (!uniqueDeleteFiles.isEmpty()) {
309+
responseBuilder.withDeleteFiles(uniqueDeleteFiles);
310+
LOG.debug(
311+
"Included {} delete files in scan plan for table: {}",
312+
uniqueDeleteFiles.size(),
313+
tableIdentifier);
314+
}
315+
316+
return responseBuilder.build();
317+
318+
} catch (IllegalArgumentException e) {
319+
LOG.error("Invalid scan request for table {}: {}", tableIdentifier, e.getMessage());
320+
throw new IllegalArgumentException("Invalid scan parameters: " + e.getMessage(), e);
321+
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
322+
LOG.error("Table not found during scan planning: {}", tableIdentifier);
323+
throw e;
324+
} catch (Exception e) {
325+
LOG.error("Unexpected error during scan planning for table: {}", tableIdentifier, e);
326+
throw new RuntimeException(
327+
"Scan planning failed for table " + tableIdentifier + ": " + e.getMessage(), e);
328+
}
329+
}
330+
331+
/**
332+
* Create and plan a scan based on the scan request.
333+
*
334+
* <p>If both start and end snapshot IDs are provided, uses IncrementalAppendScan. Otherwise, uses
335+
* regular TableScan.
336+
*
337+
* @param table The table to scan
338+
* @param tableIdentifier The table identifier for logging
339+
* @param scanRequest The scan request parameters
340+
* @return CloseableIterable of FileScanTask
341+
*/
342+
private CloseableIterable<FileScanTask> createFilePlanScanTasks(
343+
Table table, TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
344+
Long startSnapshotId = scanRequest.startSnapshotId();
345+
Long endSnapshotId = scanRequest.endSnapshotId();
346+
// Use IncrementalAppendScan if both start and end snapshot IDs are provided
347+
if (startSnapshotId != null && endSnapshotId != null) {
348+
if (startSnapshotId >= endSnapshotId) {
349+
throw new IllegalArgumentException(
350+
String.format(
351+
"Invalid snapshot range: startSnapshotId (%d) must be less than endSnapshotId (%d)",
352+
startSnapshotId, endSnapshotId));
353+
}
354+
LOG.debug(
355+
"Using IncrementalAppendScan for table: {}, from snapshot: {} to snapshot: {}",
356+
tableIdentifier,
357+
startSnapshotId,
358+
endSnapshotId);
359+
IncrementalAppendScan incrementalScan =
360+
table
361+
.newIncrementalAppendScan()
362+
.fromSnapshotInclusive(startSnapshotId)
363+
.toSnapshot(endSnapshotId);
364+
incrementalScan = applyScanRequest(incrementalScan, scanRequest);
365+
return incrementalScan.planFiles();
366+
} else {
367+
TableScan tableScan = table.newScan();
368+
if (scanRequest.snapshotId() != null && scanRequest.snapshotId() != 0L) {
369+
tableScan = tableScan.useSnapshot(scanRequest.snapshotId());
370+
LOG.debug("Applied snapshot filter: snapshot-id={}", scanRequest.snapshotId());
371+
}
372+
tableScan = applyScanRequest(tableScan, scanRequest);
373+
return tableScan.planFiles();
374+
}
375+
}
376+
377+
@SuppressWarnings("unchecked")
378+
private <T extends Scan> T applyScanRequest(T scan, PlanTableScanRequest scanRequest) {
379+
scan = (T) scan.caseSensitive(scanRequest.caseSensitive());
380+
LOG.debug("Applied case-sensitive: {}", scanRequest.caseSensitive());
381+
scan = applyScanFilter(scan, scanRequest);
382+
scan = applyScanSelect(scan, scanRequest);
383+
scan = applyScanStatsFields(scan, scanRequest);
384+
385+
return scan;
386+
}
387+
388+
@SuppressWarnings("unchecked")
389+
private <T extends Scan> T applyScanFilter(T scan, PlanTableScanRequest scanRequest) {
390+
if (scanRequest.filter() != null) {
391+
try {
392+
scan = (T) scan.filter(scanRequest.filter());
393+
LOG.debug("Applied filter expression: {}", scanRequest.filter());
394+
} catch (Exception e) {
395+
LOG.error("Failed to apply filter expression: {}", e.getMessage(), e);
396+
throw new IllegalArgumentException("Invalid filter expression: " + e.getMessage(), e);
397+
}
398+
}
399+
return scan;
400+
}
401+
402+
@SuppressWarnings("unchecked")
403+
private <T extends Scan> T applyScanSelect(T scan, PlanTableScanRequest scanRequest) {
404+
if (scanRequest.select() != null && !scanRequest.select().isEmpty()) {
405+
try {
406+
scan = (T) scan.select(scanRequest.select());
407+
LOG.debug("Applied column projection: {}", scanRequest.select());
408+
} catch (Exception e) {
409+
LOG.error("Failed to apply column projection: {}", e.getMessage(), e);
410+
throw new IllegalArgumentException("Invalid column selection: " + e.getMessage(), e);
411+
}
412+
}
413+
return scan;
414+
}
415+
416+
@SuppressWarnings("unchecked")
417+
private <T extends Scan> T applyScanStatsFields(T scan, PlanTableScanRequest scanRequest) {
418+
if (scanRequest.statsFields() != null && !scanRequest.statsFields().isEmpty()) {
419+
try {
420+
scan = (T) scan.includeColumnStats(scanRequest.statsFields());
421+
LOG.debug("Applied statistics fields: {}", scanRequest.statsFields());
422+
} catch (Exception e) {
423+
LOG.error("Failed to apply statistics fields: {}", e.getMessage(), e);
424+
throw new IllegalArgumentException("Invalid statistics fields: " + e.getMessage(), e);
425+
}
426+
}
427+
return scan;
428+
}
429+
210430
@VisibleForTesting
211431
static Map<String, String> checkForCompatibility(
212432
Map<String, String> properties, Map<String, String> deprecatedProperties) {

iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
import org.apache.gravitino.listener.api.event.IcebergLoadTableEvent;
4040
import org.apache.gravitino.listener.api.event.IcebergLoadTableFailureEvent;
4141
import org.apache.gravitino.listener.api.event.IcebergLoadTablePreEvent;
42+
import org.apache.gravitino.listener.api.event.IcebergPlanTableScanEvent;
43+
import org.apache.gravitino.listener.api.event.IcebergPlanTableScanFailureEvent;
44+
import org.apache.gravitino.listener.api.event.IcebergPlanTableScanPreEvent;
4245
import org.apache.gravitino.listener.api.event.IcebergRenameTableEvent;
4346
import org.apache.gravitino.listener.api.event.IcebergRenameTableFailureEvent;
4447
import org.apache.gravitino.listener.api.event.IcebergRenameTablePreEvent;
@@ -52,11 +55,13 @@
5255
import org.apache.iceberg.catalog.Namespace;
5356
import org.apache.iceberg.catalog.TableIdentifier;
5457
import org.apache.iceberg.rest.requests.CreateTableRequest;
58+
import org.apache.iceberg.rest.requests.PlanTableScanRequest;
5559
import org.apache.iceberg.rest.requests.RenameTableRequest;
5660
import org.apache.iceberg.rest.requests.UpdateTableRequest;
5761
import org.apache.iceberg.rest.responses.ListTablesResponse;
5862
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
5963
import org.apache.iceberg.rest.responses.LoadTableResponse;
64+
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
6065

6166
/**
6267
* {@code IcebergTableEventDispatcher} is a decorator for {@link IcebergTableOperationExecutor} that
@@ -263,4 +268,36 @@ public LoadCredentialsResponse getTableCredentials(
263268
eventBus.dispatchEvent(new IcebergLoadTableCredentialEvent(context, gravitinoNameIdentifier));
264269
return loadCredentialsResponse;
265270
}
271+
272+
/**
273+
* Plan table scan and return scan tasks.
274+
*
275+
* <p>Dispatches pre/post events for table scan planning operation.
276+
*
277+
* @param context Iceberg REST request context information.
278+
* @param tableIdentifier The Iceberg table identifier.
279+
* @param scanRequest The scan request parameters
280+
* @return A PlanTableScanResponse containing the scan plan with plan-id and tasks
281+
*/
282+
@Override
283+
public PlanTableScanResponse planTableScan(
284+
IcebergRequestContext context,
285+
TableIdentifier tableIdentifier,
286+
PlanTableScanRequest scanRequest) {
287+
NameIdentifier gravitinoNameIdentifier =
288+
IcebergRESTUtils.getGravitinoNameIdentifier(
289+
metalakeName, context.catalogName(), tableIdentifier);
290+
eventBus.dispatchEvent(new IcebergPlanTableScanPreEvent(context, gravitinoNameIdentifier));
291+
PlanTableScanResponse planTableScanResponse;
292+
try {
293+
planTableScanResponse =
294+
icebergTableOperationDispatcher.planTableScan(context, tableIdentifier, scanRequest);
295+
} catch (Exception e) {
296+
eventBus.dispatchEvent(
297+
new IcebergPlanTableScanFailureEvent(context, gravitinoNameIdentifier, e));
298+
throw e;
299+
}
300+
eventBus.dispatchEvent(new IcebergPlanTableScanEvent(context, gravitinoNameIdentifier));
301+
return planTableScanResponse;
302+
}
266303
}

iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@
3535
import org.apache.iceberg.catalog.Namespace;
3636
import org.apache.iceberg.catalog.TableIdentifier;
3737
import org.apache.iceberg.rest.requests.CreateTableRequest;
38+
import org.apache.iceberg.rest.requests.PlanTableScanRequest;
3839
import org.apache.iceberg.rest.requests.RenameTableRequest;
3940
import org.apache.iceberg.rest.requests.UpdateTableRequest;
4041
import org.apache.iceberg.rest.responses.ListTablesResponse;
4142
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
4243
import org.apache.iceberg.rest.responses.LoadTableResponse;
44+
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
4345

4446
public class IcebergTableHookDispatcher implements IcebergTableOperationDispatcher {
4547

@@ -156,6 +158,27 @@ public LoadCredentialsResponse getTableCredentials(
156158
return dispatcher.getTableCredentials(context, tableIdentifier);
157159
}
158160

161+
/**
162+
* Plan table scan and return scan tasks.
163+
*
164+
* <p>This method performs server-side scan planning to optimize query performance by reducing
165+
* client-side metadata loading and enabling parallel task execution.
166+
*
167+
* @param context Iceberg REST request context information.
168+
* @param tableIdentifier The Iceberg table identifier.
169+
* @param scanRequest The scan request parameters including filters, projections, snapshot-id,
170+
* etc.
171+
* @return A PlanTableScanResponse containing the scan plan with plan-id and tasks. The response
172+
* format follows Iceberg REST API specification.
173+
*/
174+
@Override
175+
public PlanTableScanResponse planTableScan(
176+
IcebergRequestContext context,
177+
TableIdentifier tableIdentifier,
178+
PlanTableScanRequest scanRequest) {
179+
return dispatcher.planTableScan(context, tableIdentifier, scanRequest);
180+
}
181+
159182
private void importTable(String catalogName, Namespace namespace, String tableName) {
160183
TableDispatcher tableDispatcher = GravitinoEnv.getInstance().tableDispatcher();
161184
if (tableDispatcher != null) {

0 commit comments

Comments
 (0)