From eb87f2ade9c3f4b5e1dd049fa16b0581c5b11b47 Mon Sep 17 00:00:00 2001 From: Anurag Mantripragada Date: Mon, 22 Dec 2025 17:47:51 -0800 Subject: [PATCH 1/2] Spark 4.1: Implement SupportsReportOrdering DSv2 API --- .../apache/iceberg/spark/SparkReadConf.java | 8 + .../iceberg/spark/SparkSQLProperties.java | 5 + .../source/MergingSortedRowDataReader.java | 299 +++++++ .../spark/source/SortOrderAnalyzer.java | 142 +++ .../iceberg/spark/source/SparkBatch.java | 42 +- .../spark/source/SparkInputPartition.java | 9 +- .../spark/source/SparkMicroBatchStream.java | 3 +- .../source/SparkPartitioningAwareScan.java | 30 +- .../spark/source/SparkRowReaderFactory.java | 6 +- .../iceberg/spark/source/SparkScan.java | 7 + .../org/apache/iceberg/spark/TestBase.java | 24 + .../spark/source/TestSortOrderAnalyzer.java | 228 +++++ .../source/TestSupportsReportOrdering.java | 837 ++++++++++++++++++ 13 files changed, 1634 insertions(+), 6 deletions(-) create mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java create mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSortOrderAnalyzer.java create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 36c34251c317..8440ba685c3a 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -267,6 +267,14 @@ public boolean preserveDataGrouping() { .parse(); } + public boolean preserveDataOrdering() { + return confParser + .booleanConf() + .sessionConf(SparkSQLProperties.PRESERVE_DATA_ORDERING) + .defaultValue(SparkSQLProperties.PRESERVE_DATA_ORDERING_DEFAULT) + .parse(); + } + public boolean aggregatePushDownEnabled() { return confParser .booleanConf() diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 161f09d53e2c..e265e68e73f0 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -40,6 +40,11 @@ private SparkSQLProperties() {} "spark.sql.iceberg.planning.preserve-data-grouping"; public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; + // Controls whether to preserve data ordering and report it to Spark + public static final String PRESERVE_DATA_ORDERING = + "spark.sql.iceberg.planning.preserve-data-ordering"; + public static final boolean PRESERVE_DATA_ORDERING_DEFAULT = false; + // Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg public static final String AGGREGATE_PUSH_DOWN_ENABLED = "spark.sql.iceberg.aggregate-push-down.enabled"; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java new file mode 100644 index 000000000000..6fbad877cd54 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseScanTaskGroup; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; +import org.apache.iceberg.spark.source.metrics.TaskNumSplits; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SortedMerge; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PartitionReader} that reads multiple sorted files and merges them into a single sorted + * stream using a k-way heap merge ({@link SortedMerge}). + * + *

This reader is used when {@code preserve-data-ordering} is enabled and the task group contains + * multiple files that all have the same sort order. + * + *

Sort key columns absent from the requested projection are temporarily added to the read schema + * so that {@link SortOrderComparators} can access them during the merge. The extra columns are + * stripped from each row before it is returned to Spark. + */ +class MergingSortedRowDataReader implements PartitionReader { + private static final Logger LOG = LoggerFactory.getLogger(MergingSortedRowDataReader.class); + + private final CloseableGroup resources; + private final CloseableIterator mergedIterator; + private final List fileReaders; + // non-null only when sort key columns were added to the read schema beyond what Spark projected + private final int[] outputPositions; + private final DataType[] outputDataTypes; + private InternalRow current; + + MergingSortedRowDataReader(SparkInputPartition partition) { + Table table = partition.table(); + ScanTaskGroup taskGroup = partition.taskGroup(); + Schema projection = partition.projection(); + SortOrder sortOrder = table.sortOrder(); + + Preconditions.checkState( + sortOrder.isSorted(), "Cannot create merging reader for unsorted table %s", table.name()); + Preconditions.checkArgument( + taskGroup.tasks().size() > 1, + "Merging reader requires multiple files, got %s", + taskGroup.tasks().size()); + + LOG.info( + "Creating merging reader for {} files with sort order {} in table {}", + taskGroup.tasks().size(), + sortOrder.orderId(), + table.name()); + + // Augment the projected schema with any sort key columns Spark did not request so that + // SortOrderComparators can access every sort key field during the merge. + Schema mergeReadSchema = mergeReadSchema(projection, sortOrder, table); + this.outputPositions = buildOutputPositions(projection, mergeReadSchema); + this.outputDataTypes = buildOutputDataTypes(projection, outputPositions); + + this.resources = new CloseableGroup(); + this.fileReaders = + taskGroup.tasks().stream() + .map( + task -> + new RowDataReader( + table, + partition.io(), + new BaseScanTaskGroup<>(Collections.singletonList(task)), + mergeReadSchema, + partition.isCaseSensitive(), + partition.cacheDeleteFilesOnExecutors())) + .collect(Collectors.toList()); + fileReaders.forEach(resources::addCloseable); + + // Wrap each reader as a CloseableIterable and feed into SortedMerge. + List> fileIterables = + fileReaders.stream().map(this::readerToIterable).collect(Collectors.toList()); + SortedMerge sortedMerge = + new SortedMerge<>(buildComparator(mergeReadSchema, sortOrder), fileIterables); + resources.addCloseable(sortedMerge); + this.mergedIterator = sortedMerge.iterator(); + } + + /** + * Adapts a {@link RowDataReader} to a {@link CloseableIterable} for use with {@link SortedMerge}. + * Each row is copied before it enters the priority queue because Spark's Parquet/ORC readers + * reuse {@link InternalRow} instances for performance. + */ + private CloseableIterable readerToIterable(RowDataReader reader) { + return CloseableIterable.withNoopClose( + () -> + new CloseableIterator<>() { + private boolean advanced = false; + private boolean hasNext = false; + + @Override + public boolean hasNext() { + if (!advanced) { + try { + hasNext = reader.next(); + advanced = true; + } catch (IOException e) { + throw new RuntimeException("Failed to advance reader", e); + } + } + return hasNext; + } + + @Override + public InternalRow next() { + if (!advanced) { + hasNext(); + } + advanced = false; + return reader.get().copy(); + } + + @Override + public void close() { + // Do nothing. Reader lifecycle is owned by CloseableGroup. + } + }); + } + + @Override + public boolean next() throws IOException { + if (!mergedIterator.hasNext()) { + return false; + } + + InternalRow merged = mergedIterator.next(); + if (outputPositions == null) { + this.current = merged; + } else { + // Strip the extra sort key columns that were added for comparison purposes. + Object[] values = new Object[outputPositions.length]; + for (int i = 0; i < outputPositions.length; i++) { + values[i] = merged.get(outputPositions[i], outputDataTypes[i]); + } + this.current = new GenericInternalRow(values); + } + + return true; + } + + @Override + public InternalRow get() { + return current; + } + + @Override + public void close() throws IOException { + resources.close(); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + long totalDeletes = + fileReaders.stream() + .flatMap(reader -> Arrays.stream(reader.currentMetricsValues())) + .filter(metric -> metric instanceof TaskNumDeletes) + .mapToLong(CustomTaskMetric::value) + .sum(); + return new CustomTaskMetric[] { + new TaskNumSplits(fileReaders.size()), new TaskNumDeletes(totalDeletes) + }; + } + + /** + * Builds a comparator for merging {@link InternalRow}s by the given sort order. Uses {@link + * SortOrderComparators} which handles all transform types (identity, bucket, truncate), ASC/DESC + * directions, and null ordering. The two {@link InternalRowWrapper} instances are allocated once + * and reused — {@code wrap()} just updates an internal reference. + */ + private static Comparator buildComparator( + Schema mergeReadSchema, SortOrder sortOrder) { + StructType sparkSchema = SparkSchemaUtil.convert(mergeReadSchema); + Comparator keyComparator = + SortOrderComparators.forSchema(mergeReadSchema, sortOrder); + InternalRowWrapper left = new InternalRowWrapper(sparkSchema, mergeReadSchema.asStruct()); + InternalRowWrapper right = new InternalRowWrapper(sparkSchema, mergeReadSchema.asStruct()); + return (r1, r2) -> keyComparator.compare(left.wrap(r1), right.wrap(r2)); + } + + /** + * Returns the Spark {@link DataType}s for each column in {@code projection}, or {@code null} when + * {@code outputPositions} is {@code null} (no extra columns were added, no projection needed). + */ + private static DataType[] buildOutputDataTypes(Schema projection, int[] outputPositions) { + if (outputPositions == null) { + return null; + } + StructType sparkSchema = SparkSchemaUtil.convert(projection); + DataType[] dataTypes = new DataType[sparkSchema.fields().length]; + for (int i = 0; i < sparkSchema.fields().length; i++) { + dataTypes[i] = sparkSchema.fields()[i].dataType(); + } + return dataTypes; + } + + /** + * Returns the schema to use when reading each file. This is the requested {@code projection} + * augmented with any sort key columns that are not already present, so the merge comparator can + * access every sort key field regardless of what Spark projected. + */ + private static Schema mergeReadSchema(Schema projection, SortOrder sortOrder, Table table) { + Schema tableSchema = table.schema(); + List missingFields = Lists.newArrayList(); + + for (SortField sortField : sortOrder.fields()) { + int fieldId = sortField.sourceId(); + if (projection.findField(fieldId) == null) { + Types.NestedField tableField = tableSchema.findField(fieldId); + if (tableField != null) { + missingFields.add(tableField); + } + } + } + + if (missingFields.isEmpty()) { + return projection; + } + + return TypeUtil.join(projection, new Schema(missingFields)); + } + + /** + * Returns an array mapping each output column (in {@code projection} order) to its position in + * {@code mergeSchema}, or {@code null} if the two schemas are identical (no extra columns were + * added and no projection is needed). + */ + private static int[] buildOutputPositions(Schema projection, Schema mergeSchema) { + if (projection.columns().size() == mergeSchema.columns().size()) { + return null; + } + + List mergeColumns = mergeSchema.columns(); + int[] positions = new int[projection.columns().size()]; + + for (int i = 0; i < projection.columns().size(); i++) { + int fieldId = projection.columns().get(i).fieldId(); + boolean found = false; + for (int j = 0; j < mergeColumns.size(); j++) { + if (mergeColumns.get(j).fieldId() == fieldId) { + positions[i] = j; + found = true; + break; + } + } + Preconditions.checkState( + found, "Projection field id=%d not found in merge read schema — this is a bug", fieldId); + } + + return positions; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java new file mode 100644 index 000000000000..26b3875d7913 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Analyzes whether sort ordering can be reported for a table's task groups. + * + *

For sort ordering to be reported, ALL of these conditions must hold: + * + *

+ */ +class SortOrderAnalyzer { + + private static final Logger LOG = LoggerFactory.getLogger(SortOrderAnalyzer.class); + + private SortOrderAnalyzer() {} + + /** + * Returns {@code true} only when sort ordering can be safely reported to Spark for the given + * table and task groups. + */ + static boolean canReportOrdering( + Table table, List> taskGroups, Types.StructType groupingKeyType) { + + SortOrder sortOrder = table.sortOrder(); + + if (sortOrder == null || sortOrder.isUnsorted()) { + LOG.debug("Cannot report ordering: table {} has no sort order defined", table.name()); + return false; + } + + if (taskGroups == null || taskGroups.isEmpty()) { + LOG.debug("Cannot report ordering: no task groups for table {}", table.name()); + return false; + } + + if (!hasUniquePartitionKeys(taskGroups, groupingKeyType)) { + LOG.debug( + "Cannot report ordering: table {} has multiple task groups sharing the same partition" + + " key.", + table.name()); + return false; + } + + for (ScanTaskGroup taskGroup : taskGroups) { + if (!allFilesHaveSortOrder(taskGroup, sortOrder.orderId())) { + LOG.debug( + "Cannot report ordering: table {} has files whose sort order ID does not match the" + + " current table sort order {}", + table.name(), + sortOrder.orderId()); + return false; + } + } + + return true; + } + + /** + * Checks that each partition key appears in at most one task group. + * + *

When multiple {@code InputPartition}s share the same partition key, Spark's {@code + * EnsureRequirements} coalesces them into a single task at join/aggregate time. Because this + * coalescing simply concatenates the partitions rather than merge-sorting them, it destroys the + * within-partition ordering guarantee. Reporting ordering in this situation would cause incorrect + * query results. + */ + private static boolean hasUniquePartitionKeys( + List> taskGroups, Types.StructType groupingKeyType) { + + if (groupingKeyType == null || groupingKeyType.fields().isEmpty()) { + return true; + } + + StructLikeSet seenKeys = StructLikeSet.create(groupingKeyType); + for (ScanTaskGroup taskGroup : taskGroups) { + StructLike key = taskGroup.groupingKey(); + if (key != null && !seenKeys.add(key)) { + return false; + } + } + + return true; + } + + /** + * Checks that every {@link FileScanTask} in the task group carries a sort order ID that matches + * the table's current sort order. + * + *

Non-{@code FileScanTask} entries (e.g. changelog tasks) are skipped. + */ + private static boolean allFilesHaveSortOrder( + ScanTaskGroup taskGroup, int expectedSortOrderId) { + for (ScanTask task : taskGroup.tasks()) { + if (!(task instanceof FileScanTask)) { + continue; + } + + FileScanTask fileTask = (FileScanTask) task; + Integer fileSortOrderId = fileTask.file().sortOrderId(); + + if (fileSortOrderId == null || fileSortOrderId != expectedSortOrderId) { + return false; + } + } + + return true; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 22a4b171b331..30bd16598204 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -57,6 +57,7 @@ class SparkBatch implements Batch { private final boolean executorCacheLocalityEnabled; private final int scanHashCode; private final boolean cacheDeleteFilesOnExecutors; + private Boolean orderingEnabled = null; // lazily computed, driver-only SparkBatch( JavaSparkContext sparkContext, @@ -94,21 +95,46 @@ public InputPartition[] planInputPartitions() { InputPartition[] partitions = new InputPartition[taskGroups.size()]; for (int index = 0; index < taskGroups.size(); index++) { + ScanTaskGroup taskGroup = taskGroups.get(index); + partitions[index] = new SparkInputPartition( groupingKeyType, - taskGroups.get(index), + taskGroup, tableBroadcast, fileIOBroadcast, projectionString, caseSensitive, locations != null ? locations[index] : SparkPlanningUtil.NO_LOCATION_PREFERENCE, - cacheDeleteFilesOnExecutors); + cacheDeleteFilesOnExecutors, + shouldUseMergingSortedReader(taskGroup)); } return partitions; } + /** Returns true if the table's sort ordering can be reported to Spark for this batch. */ + private boolean isOrderingEnabled() { + if (orderingEnabled == null) { + orderingEnabled = + !groupingKeyType.fields().isEmpty() + && readConf.preserveDataOrdering() + && SortOrderAnalyzer.canReportOrdering(table, taskGroups, groupingKeyType); + } + return orderingEnabled; + } + + /** + * Returns true if this task group should use a k-way merging reader. This requires ordering to be + * enabled at the table level (validated by {@link #isOrderingEnabled()}, multiple files in the + * group, and all tasks being {@link FileScanTask}s. + */ + private boolean shouldUseMergingSortedReader(ScanTaskGroup taskGroup) { + return isOrderingEnabled() + && taskGroup.tasks().size() > 1 + && taskGroup.tasks().stream().allMatch(task -> task instanceof FileScanTask); + } + private String[][] computePreferredLocations() { if (localityEnabled) { return SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups); @@ -157,6 +183,12 @@ private boolean useParquetBatchReads() { private boolean supportsParquetBatchReads(ScanTask task) { if (task instanceof ScanTaskGroup) { ScanTaskGroup taskGroup = (ScanTaskGroup) task; + + // Vectorized readers cannot merge sorted data from multiple files + if (shouldUseMergingSortedReader(taskGroup)) { + return false; + } + return taskGroup.tasks().stream().allMatch(this::supportsParquetBatchReads); } else if (task.isFileScanTask() && !task.isDataTask()) { @@ -183,6 +215,12 @@ private boolean useOrcBatchReads() { private boolean supportsOrcBatchReads(ScanTask task) { if (task instanceof ScanTaskGroup) { ScanTaskGroup taskGroup = (ScanTaskGroup) task; + + // Vectorized readers cannot merge sorted data from multiple files + if (shouldUseMergingSortedReader(taskGroup)) { + return false; + } + return taskGroup.tasks().stream().allMatch(this::supportsOrcBatchReads); } else if (task.isFileScanTask() && !task.isDataTask()) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java index a3d78b43a919..997ca1bd7526 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java @@ -40,6 +40,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab private final boolean caseSensitive; private final transient String[] preferredLocations; private final boolean cacheDeleteFilesOnExecutors; + private final boolean useMergingSortedReader; private transient Schema projection = null; @@ -51,7 +52,8 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab String projectionString, boolean caseSensitive, String[] preferredLocations, - boolean cacheDeleteFilesOnExecutors) { + boolean cacheDeleteFilesOnExecutors, + boolean useMergingSortedReader) { this.groupingKeyType = groupingKeyType; this.taskGroup = taskGroup; this.tableBroadcast = tableBroadcast; @@ -60,6 +62,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab this.caseSensitive = caseSensitive; this.preferredLocations = preferredLocations; this.cacheDeleteFilesOnExecutors = cacheDeleteFilesOnExecutors; + this.useMergingSortedReader = useMergingSortedReader; } @Override @@ -104,4 +107,8 @@ public Schema projection() { return projection; } + + public boolean useMergingSortedReader() { + return useMergingSortedReader; + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 7adf3c633cd0..5f12154bfece 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -168,7 +168,8 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { projection, caseSensitive, locations != null ? locations[index] : SparkPlanningUtil.NO_LOCATION_PREFERENCE, - cacheDeleteFilesOnExecutors); + cacheDeleteFilesOnExecutors, + false); } return partitions; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index fe5eeee8fb10..d630be3bff1b 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -48,7 +48,9 @@ import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.expressions.SortOrder; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.SupportsReportOrdering; import org.apache.spark.sql.connector.read.SupportsReportPartitioning; import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning; import org.apache.spark.sql.connector.read.partitioning.Partitioning; @@ -57,12 +59,13 @@ import org.slf4j.LoggerFactory; abstract class SparkPartitioningAwareScan extends SparkScan - implements SupportsReportPartitioning { + implements SupportsReportPartitioning, SupportsReportOrdering { private static final Logger LOG = LoggerFactory.getLogger(SparkPartitioningAwareScan.class); private final Scan> scan; private final boolean preserveDataGrouping; + private final boolean preserveDataOrdering; private Set specs = null; // lazy cache of scanned specs private List tasks = null; // lazy cache of uncombined tasks @@ -91,6 +94,7 @@ abstract class SparkPartitioningAwareScan extends S this.scan = scan; this.preserveDataGrouping = readConf.preserveDataGrouping(); + this.preserveDataOrdering = readConf.preserveDataOrdering(); if (scan == null) { this.specs = Collections.emptySet(); @@ -123,6 +127,30 @@ public Partitioning outputPartitioning() { } } + @Override + public SortOrder[] outputOrdering() { + if (!preserveDataOrdering) { + return new SortOrder[0]; + } + + if (groupingKeyType().fields().isEmpty()) { + LOG.info("Not reporting ordering for unpartitioned table {}", table().name()); + return new SortOrder[0]; + } + + if (!SortOrderAnalyzer.canReportOrdering(table(), taskGroups(), groupingKeyType())) { + LOG.info("Not reporting ordering for table {}", table().name()); + return new SortOrder[0]; + } + + org.apache.iceberg.SortOrder sortOrder = table().sortOrder(); + SortOrder[] ordering = Spark3Util.toOrdering(sortOrder); + LOG.info( + "Reporting sort order {} for table {}: {}", sortOrder.orderId(), table().name(), ordering); + + return ordering; + } + @Override protected StructType groupingKeyType() { if (groupingKeyType == null) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java index 23699aeb167c..b5db9a847d11 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java @@ -42,7 +42,11 @@ public PartitionReader createReader(InputPartition inputPartition) SparkInputPartition partition = (SparkInputPartition) inputPartition; if (partition.allTasksOfType(FileScanTask.class)) { - return new RowDataReader(partition); + if (partition.useMergingSortedReader()) { + return new MergingSortedRowDataReader(partition); + } else { + return new RowDataReader(partition); + } } else if (partition.allTasksOfType(ChangelogScanTask.class)) { return new ChangelogRowReader(partition); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 6b80199a255c..c8f4b157fe3f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -367,6 +367,13 @@ public CustomMetric[] supportedCustomMetrics() { } protected long adjustSplitSize(List tasks, long splitSize) { + if (readConf.preserveDataOrdering()) { + // Disable splitting tasks into multiple groups when we need to preserve ordering. + // This prevents multiple InputPartitions with the same partitionKey, which would + // cause Spark to suppress outputOrdering. + return Long.MAX_VALUE; + } + if (readConf.splitSizeOption() == null && readConf.adaptiveSplitSizeEnabled()) { long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum(); int parallelism = readConf.parallelism(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index daf4e29ac075..5f39a14a1a9e 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -53,13 +53,18 @@ import org.apache.spark.sql.execution.QueryExecution; import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec; +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.util.QueryExecutionListener; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import scala.PartialFunction; +import scala.collection.JavaConverters; public abstract class TestBase extends SparkTestHelperBase { + private static final AdaptiveSparkPlanHelper SPARK_HELPER = new AdaptiveSparkPlanHelper() {}; + protected static TestHiveMetastore metastore = null; protected static HiveConf hiveConf = null; protected static SparkSession spark = null; @@ -282,6 +287,25 @@ public void onFailure(String funcName, QueryExecution qe, Exception exception) { } } + /** Collect all nodes of a specific plan type from the plan tree. */ + protected List collectPlans(SparkPlan plan, Class planClass) { + scala.collection.Seq seq = + SPARK_HELPER.collect( + plan, + new PartialFunction() { + @Override + public T apply(SparkPlan p) { + return planClass.cast(p); + } + + @Override + public boolean isDefinedAt(SparkPlan p) { + return planClass.isInstance(p); + } + }); + return JavaConverters.seqAsJavaListConverter(seq).asJava(); + } + @FunctionalInterface protected interface Action { void invoke(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSortOrderAnalyzer.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSortOrderAnalyzer.java new file mode 100644 index 000000000000..d945304748a2 --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSortOrderAnalyzer.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestSortOrderAnalyzer { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build(); + + // Represents the grouping key type for a single-column string partition + private static final Types.StructType KEY_TYPE = + Types.StructType.of(Types.NestedField.required(3, "partition", Types.StringType.get())); + + @Test + public void testUnsortedTableReturnsFalse() { + Table table = mockTable(SortOrder.unsorted()); + List> groups = ImmutableList.of(taskGroupWithKey(SORT_ORDER.orderId(), "P1")); + + assertThat(SortOrderAnalyzer.canReportOrdering(table, groups, KEY_TYPE)).isFalse(); + } + + @Test + public void testNullTaskGroupsReturnsFalse() { + Table table = mockTable(SORT_ORDER); + + assertThat(SortOrderAnalyzer.canReportOrdering(table, null, KEY_TYPE)).isFalse(); + } + + @Test + public void testEmptyTaskGroupsReturnsFalse() { + Table table = mockTable(SORT_ORDER); + + assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(), KEY_TYPE)).isFalse(); + } + + @Test + public void testFileWithMismatchedSortOrderIdReturnsFalse() { + Table table = mockTable(SORT_ORDER); + // File was written with sort order ID 999, which doesn't match the table's current order + ScanTaskGroup group = taskGroupWithKey(999, "P1"); + + assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE)) + .isFalse(); + } + + @Test + public void testFileWithNullSortOrderIdReturnsFalse() { + Table table = mockTable(SORT_ORDER); + // File has no sort order recorded (null) — written before sort order was set + ScanTaskGroup group = taskGroupWithNullSortOrderId("P1"); + + assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE)) + .isFalse(); + } + + @Test + public void testOnlyOneFileWithWrongSortOrderInMultiFileGroupReturnsFalse() { + Table table = mockTable(SORT_ORDER); + // Two files: one correct, one from old sort order — the group must fail + FileScanTask goodTask = fileTask(SORT_ORDER.orderId()); + FileScanTask badTask = fileTask(999); + ScanTaskGroup group = taskGroupWithTasks("P1", ImmutableList.of(goodTask, badTask)); + + assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE)) + .isFalse(); + } + + @Test + public void testDuplicatePartitionKeyReturnsFalse() { + Table table = mockTable(SORT_ORDER); + // Two task groups that share the same partition key value — Spark coalesces them + // without merge-sorting, destroying the ordering guarantee. + ScanTaskGroup group1 = taskGroupWithKey(SORT_ORDER.orderId(), "same-partition"); + ScanTaskGroup group2 = taskGroupWithKey(SORT_ORDER.orderId(), "same-partition"); + + assertThat( + SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group1, group2), KEY_TYPE)) + .isFalse(); + } + + @Test + public void testUniquePartitionKeysReturnsTrue() { + Table table = mockTable(SORT_ORDER); + ScanTaskGroup group1 = taskGroupWithKey(SORT_ORDER.orderId(), "partition-A"); + ScanTaskGroup group2 = taskGroupWithKey(SORT_ORDER.orderId(), "partition-B"); + + assertThat( + SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group1, group2), KEY_TYPE)) + .isTrue(); + } + + @Test + public void testUnpartitionedTableSkipsUniquenessCheck() { + Table table = mockTable(SORT_ORDER); + // Both groups have null grouping key — acceptable for unpartitioned tables + ScanTaskGroup group1 = taskGroupWithNullKey(SORT_ORDER.orderId()); + ScanTaskGroup group2 = taskGroupWithNullKey(SORT_ORDER.orderId()); + + // Empty struct type signals unpartitioned; uniqueness check is skipped + assertThat( + SortOrderAnalyzer.canReportOrdering( + table, ImmutableList.of(group1, group2), Types.StructType.of())) + .isTrue(); + } + + @Test + public void testSingleGroupSingleFileReturnsTrue() { + Table table = mockTable(SORT_ORDER); + ScanTaskGroup group = taskGroupWithKey(SORT_ORDER.orderId(), "P1"); + + assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE)) + .isTrue(); + } + + @Test + public void testSingleGroupMultipleFilesAllMatchingReturnsTrue() { + Table table = mockTable(SORT_ORDER); + FileScanTask task1 = fileTask(SORT_ORDER.orderId()); + FileScanTask task2 = fileTask(SORT_ORDER.orderId()); + ScanTaskGroup group = taskGroupWithTasks("P1", ImmutableList.of(task1, task2)); + + assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE)) + .isTrue(); + } + + @Test + public void testMultipleGroupsAllValidReturnsTrue() { + Table table = mockTable(SORT_ORDER); + ScanTaskGroup group1 = taskGroupWithKey(SORT_ORDER.orderId(), "P1"); + ScanTaskGroup group2 = taskGroupWithKey(SORT_ORDER.orderId(), "P2"); + ScanTaskGroup group3 = taskGroupWithKey(SORT_ORDER.orderId(), "P3"); + + assertThat( + SortOrderAnalyzer.canReportOrdering( + table, ImmutableList.of(group1, group2, group3), KEY_TYPE)) + .isTrue(); + } + + private static Table mockTable(SortOrder sortOrder) { + Table table = mock(Table.class); + when(table.sortOrder()).thenReturn(sortOrder); + when(table.name()).thenReturn("test_table"); + return table; + } + + private static ScanTaskGroup taskGroupWithKey(int sortOrderId, String partitionValue) { + return taskGroupWithTasks(partitionValue, ImmutableList.of(fileTask(sortOrderId))); + } + + private static ScanTaskGroup taskGroupWithNullSortOrderId(String partitionValue) { + FileScanTask task = mock(FileScanTask.class); + DataFile file = mock(DataFile.class); + when(file.sortOrderId()).thenReturn(null); + when(task.file()).thenReturn(file); + + ScanTaskGroup group = mock(ScanTaskGroup.class); + doReturn(ImmutableList.of(task)).when(group).tasks(); + when(group.groupingKey()).thenReturn(makeKey(partitionValue)); + return group; + } + + private static ScanTaskGroup taskGroupWithNullKey(int sortOrderId) { + ScanTaskGroup group = mock(ScanTaskGroup.class); + doReturn(ImmutableList.of(fileTask(sortOrderId))).when(group).tasks(); + when(group.groupingKey()).thenReturn(null); + return group; + } + + private static ScanTaskGroup taskGroupWithTasks( + String partitionValue, List tasks) { + ScanTaskGroup group = mock(ScanTaskGroup.class); + doReturn(tasks).when(group).tasks(); + when(group.groupingKey()).thenReturn(makeKey(partitionValue)); + return group; + } + + private static FileScanTask fileTask(int sortOrderId) { + FileScanTask task = mock(FileScanTask.class); + DataFile file = mock(DataFile.class); + when(file.sortOrderId()).thenReturn(sortOrderId); + when(task.file()).thenReturn(file); + return task; + } + + private static StructLike makeKey(String value) { + GenericRecord record = GenericRecord.create(KEY_TYPE); + record.setField("partition", value); + return record; + } +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java new file mode 100644 index 000000000000..ce7aeb75d4d7 --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java @@ -0,0 +1,837 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.ReplaceSortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.TestBaseWithCatalog; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.execution.SortExec; +import org.apache.spark.sql.execution.SparkPlan; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestSupportsReportOrdering extends TestBaseWithCatalog { + + private static final Map ENABLED_ORDERING_SQL_CONF = orderingConfig(true); + private static final Map DISABLED_ORDERING_SQL_CONF = orderingConfig(false); + + private static Map orderingConfig(boolean preserveOrdering) { + return ImmutableMap.builder() + .put(SparkSQLProperties.PRESERVE_DATA_ORDERING, String.valueOf(preserveOrdering)) + .put(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true") + .put("spark.sql.autoBroadcastJoinThreshold", "-1") + .put("spark.sql.adaptive.enabled", "false") + .put("spark.sql.sources.v2.bucketing.enabled", "true") + .put("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true") + .put("spark.sql.requireAllClusterKeysForCoPartition", "false") + .buildOrThrow(); + } + + @BeforeEach + public void useCatalog() { + sql("USE %s", catalogName); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS %s", tableName("table_source")); + spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING); + } + + @TestTemplate + public void testMergingMultipleSortedFiles() throws NoSuchTableException { + Table table = createSimpleTable(tableName); + setSortOrder(table, "id"); + + writeBatches( + tableName, + SimpleRecord.class, + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")), + ImmutableList.of(new SimpleRecord(3, "c"), new SimpleRecord(4, "d")), + ImmutableList.of(new SimpleRecord(5, "e"), new SimpleRecord(6, "f")), + ImmutableList.of(new SimpleRecord(7, "g"), new SimpleRecord(8, "h"))); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + Dataset result = + spark.sql(String.format("SELECT id, data FROM %s ORDER BY id", tableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows) + .hasSize(8) + .containsExactly( + row(1, "a"), + row(2, "b"), + row(3, "c"), + row(4, "d"), + row(5, "e"), + row(6, "f"), + row(7, "g"), + row(8, "h")); + } + + @TestTemplate + public void testMergingWithDuplicateSortKeyValues() throws NoSuchTableException { + Table table = createSimpleTable(tableName); + setSortOrder(table, "id"); + + // The same id values appear across multiple files — the k-way merge must correctly + // interleave rows with equal keys rather than dropping or mis-ordering them. + writeBatches( + tableName, + SimpleRecord.class, + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")), + ImmutableList.of(new SimpleRecord(1, "c"), new SimpleRecord(2, "d")), + ImmutableList.of(new SimpleRecord(1, "e"), new SimpleRecord(3, "f"))); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + Dataset result = + spark.sql(String.format("SELECT id, data FROM %s ORDER BY id, data", tableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows) + .hasSize(6) + .containsExactly( + row(1, "a"), row(1, "c"), row(1, "e"), row(2, "b"), row(2, "d"), row(3, "f")); + } + + @TestTemplate + public void testDescendingSortOrder() throws NoSuchTableException { + Table table = createSimpleTable(tableName); + table.replaceSortOrder().desc("id").commit(); + + writeBatches( + tableName, + SimpleRecord.class, + ImmutableList.of(new SimpleRecord(10, "j"), new SimpleRecord(9, "i")), + ImmutableList.of(new SimpleRecord(8, "h"), new SimpleRecord(7, "g")), + ImmutableList.of(new SimpleRecord(6, "f"), new SimpleRecord(4, "d"))); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + Dataset result = spark.sql(String.format("SELECT id FROM %s ORDER BY id DESC", tableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows).hasSize(6).containsExactly(row(10), row(9), row(8), row(7), row(6), row(4)); + } + + @TestTemplate + public void testMultiColumnSortOrder() throws NoSuchTableException { + Table table = createThreeColumnTable(tableName); + setSortOrder(table, "c3", "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "a", "A"), new ThreeColumnRecord(3, "c", "A")), + ImmutableList.of(new ThreeColumnRecord(2, "b", "A"), new ThreeColumnRecord(1, "a", "B")), + ImmutableList.of(new ThreeColumnRecord(2, "b", "B"), new ThreeColumnRecord(3, "c", "B"))); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + Dataset result = + spark.sql(String.format("SELECT c3, c1, c2 FROM %s ORDER BY c3, c1", tableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows) + .hasSize(6) + .containsExactly( + row("A", 1, "a"), + row("A", 2, "b"), + row("A", 3, "c"), + row("B", 1, "a"), + row("B", 2, "b"), + row("B", 3, "c")); + } + + @TestTemplate + public void testSingleFileDoesNotRequireMerging() throws NoSuchTableException { + Table table = createSimpleTable(tableName); + setSortOrder(table, "id"); + + List batch = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + spark.createDataFrame(batch, SimpleRecord.class).coalesce(1).writeTo(tableName).append(); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + Dataset result = spark.sql(String.format("SELECT * FROM %s", tableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows).hasSize(2); + } + + @TestTemplate + public void testPartitionedTableWithMultipleFilesPerPartition() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (c3)", + tableName); + + Table table = validationCatalog.loadTable(tableIdent); + setSortOrder(table, "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "a", "P1"), new ThreeColumnRecord(3, "c", "P1")), + ImmutableList.of(new ThreeColumnRecord(2, "b", "P1"), new ThreeColumnRecord(4, "d", "P1")), + ImmutableList.of(new ThreeColumnRecord(5, "e", "P2"), new ThreeColumnRecord(7, "g", "P2")), + ImmutableList.of(new ThreeColumnRecord(6, "f", "P2"), new ThreeColumnRecord(8, "h", "P2"))); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + Dataset p1Result = + spark.sql(String.format("SELECT c1, c2 FROM %s WHERE c3 = 'P1' ORDER BY c1", tableName)); + List p1Rows = rowsToJava(p1Result.collectAsList()); + + assertThat(p1Rows) + .hasSize(4) + .containsExactly(row(1, "a"), row(2, "b"), row(3, "c"), row(4, "d")); + + Dataset p2Result = + spark.sql(String.format("SELECT c1, c2 FROM %s WHERE c3 = 'P2' ORDER BY c1", tableName)); + List p2Rows = rowsToJava(p2Result.collectAsList()); + + assertThat(p2Rows) + .hasSize(4) + .containsExactly(row(5, "e"), row(6, "f"), row(7, "g"), row(8, "h")); + } + + @TestTemplate + public void testOrderingNotReportedWhenDisabled() throws NoSuchTableException { + Table table = createSimpleTable(tableName); + setSortOrder(table, "id"); + + List batch = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + spark.createDataFrame(batch, SimpleRecord.class).coalesce(1).writeTo(tableName).append(); + + spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING); + + Dataset result = spark.sql(String.format("SELECT * FROM %s", tableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows).hasSize(2); + } + + @TestTemplate + public void testOrderingNotReportedForUnsortedTable() throws NoSuchTableException { + createSimpleTable(tableName); + + List batch = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + spark.createDataFrame(batch, SimpleRecord.class).coalesce(1).writeTo(tableName).append(); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + Dataset result = spark.sql(String.format("SELECT * FROM %s", tableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows).hasSize(2); + } + + @TestTemplate + public void testNoMergeReaderForUnpartitionedSortedTable() throws NoSuchTableException { + Table table = createSimpleTable(tableName); // unpartitioned + setSortOrder(table, "id"); + + // Multiple files so that a merge reader *would* be created if the bug were present + writeBatches( + tableName, + SimpleRecord.class, + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(3, "c")), + ImmutableList.of(new SimpleRecord(2, "b"), new SimpleRecord(4, "d"))); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + SparkPlan plan = + executeAndKeepPlan(String.format("SELECT id, data FROM %s ORDER BY id", tableName)); + List sorts = collectPlans(plan, SortExec.class); + + assertThat(sorts).isNotEmpty(); + + Dataset result = spark.sql(String.format("SELECT id FROM %s ORDER BY id", tableName)); + List rows = rowsToJava(result.collectAsList()); + assertThat(rows).containsExactly(row(1), row(2), row(3), row(4)); + } + + @TestTemplate + public void testSortRequiredWhenOrderingNotReported() throws NoSuchTableException { + Table table = createSimpleTable(tableName); + setSortOrder(table, "id"); + + writeBatches( + tableName, + SimpleRecord.class, + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")), + ImmutableList.of(new SimpleRecord(3, "c"), new SimpleRecord(4, "d"))); + + spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING); + + SparkPlan plan = + executeAndKeepPlan(String.format("SELECT id, data FROM %s ORDER BY id", tableName)); + + List sorts = collectPlans(plan, SortExec.class); + + assertThat(sorts).isNotEmpty(); + } + + @TestTemplate + public void testSortMergeJoinWithSortedTables() throws NoSuchTableException { + createBucketedTable(tableName, "c1"); + createBucketedTable(tableName("table_source"), "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new ThreeColumnRecord(2, "b", "X")), + ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new ThreeColumnRecord(4, "d", "X"))); + + writeBatches( + tableName("table_source"), + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new ThreeColumnRecord(2, "B", "Y")), + ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new ThreeColumnRecord(4, "D", "Y"))); + + assertPlanWithoutSort( + 0, + 2, + null, + "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1", + tableName, + tableName("table_source")); + } + + @TestTemplate + public void testMergeWithSortedBucketedTables() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('write.merge.mode' = 'merge-on-read', '%s' = '%d')", + tableName, TableProperties.SPLIT_SIZE, 1024); + + Table targetTable = validationCatalog.loadTable(tableIdent); + setSortOrder(targetTable, "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of( + new ThreeColumnRecord(1, "old1", "data1"), new ThreeColumnRecord(2, "old2", "data2")), + ImmutableList.of( + new ThreeColumnRecord(3, "old3", "data3"), new ThreeColumnRecord(4, "old4", "data4"))); + + String sourceTableName = tableName("table_source"); + TableIdentifier sourceTableIdent = TableIdentifier.of(Namespace.of("default"), "table_source"); + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('%s' = '%d')", + sourceTableName, TableProperties.SPLIT_SIZE, 1024); + + Table sourceTable = validationCatalog.loadTable(sourceTableIdent); + setSortOrder(sourceTable, "c1"); + + writeBatches( + sourceTableName, + ThreeColumnRecord.class, + ImmutableList.of( + new ThreeColumnRecord(2, "new2", "data2"), new ThreeColumnRecord(3, "new3", "data3")), + ImmutableList.of( + new ThreeColumnRecord(5, "new5", "data5"), new ThreeColumnRecord(6, "new6", "data6"))); + + refreshTables(tableName, sourceTableName); + + validationCatalog.loadTable(tableIdent).refresh(); + validationCatalog.loadTable(sourceTableIdent).refresh(); + + assertPlanWithoutSort( + 1, + 3, + this::verifyMergeResults, + "MERGE INTO %s t USING %s s ON t.c1 = s.c1 " + + "WHEN MATCHED THEN UPDATE SET t.c2 = s.c2, t.c3 = s.c3 " + + "WHEN NOT MATCHED THEN INSERT *", + tableName, + sourceTableName); + } + + @TestTemplate + public void testHistoricalSortOrderInJoin() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('%s' = '%d')", + tableName, TableProperties.SPLIT_SIZE, 1024); + + Table table1 = validationCatalog.loadTable(tableIdent); + setSortOrder(table1, "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new ThreeColumnRecord(2, "b", "X")), + ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new ThreeColumnRecord(4, "d", "X"))); + + table1.replaceSortOrder().asc("c2").asc("c1").commit(); + + String table2Name = tableName("table_source"); + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('%s' = '%d')", + table2Name, TableProperties.SPLIT_SIZE, 1024); + + TableIdentifier table2Ident = TableIdentifier.of(Namespace.of("default"), "table_source"); + Table table2 = validationCatalog.loadTable(table2Ident); + setSortOrder(table2, "c1"); + + writeBatches( + table2Name, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new ThreeColumnRecord(2, "B", "Y")), + ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new ThreeColumnRecord(4, "D", "Y"))); + + table2.replaceSortOrder().asc("c2").asc("c1").commit(); + + // Both tables have files with historical sort order [c1 ASC] + // but current table sort order is [c2 ASC, c1 ASC] + assertPlanWithoutSort( + 2, + 2, + null, + "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1", + tableName, + table2Name); + } + + @TestTemplate + public void testMixedSortOrdersNoReporting() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('%s' = '%d')", + tableName, TableProperties.SPLIT_SIZE, 1024); + + Table table1 = validationCatalog.loadTable(tableIdent); + setSortOrder(table1, "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new ThreeColumnRecord(2, "b", "X"))); + + table1.replaceSortOrder().asc("c2").asc("c1").commit(); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new ThreeColumnRecord(4, "d", "X"))); + + String table2Name = tableName("table_source"); + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('%s' = '%d')", + table2Name, TableProperties.SPLIT_SIZE, 1024); + + TableIdentifier table2Ident = TableIdentifier.of(Namespace.of("default"), "table_source"); + Table table2 = validationCatalog.loadTable(table2Ident); + setSortOrder(table2, "c1"); + + writeBatches( + table2Name, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new ThreeColumnRecord(2, "B", "Y"))); + + table2.replaceSortOrder().asc("c2").asc("c1").commit(); + + writeBatches( + table2Name, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new ThreeColumnRecord(4, "D", "Y"))); + + assertPlanWithoutSort( + 2, + 2, + null, + "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1", + tableName, + table2Name); + } + + @TestTemplate + public void testSPJWithDifferentPartitionAndSortKeys() throws NoSuchTableException { + createBucketedTable(tableName, "c3", "c1"); + createBucketedTable(tableName("table_source"), "c3", "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of( + new ThreeColumnRecord(1, "a", "2024-01-01"), + new ThreeColumnRecord(2, "b", "2024-01-02")), + ImmutableList.of( + new ThreeColumnRecord(1, "c", "2024-01-03"), + new ThreeColumnRecord(2, "d", "2024-01-04"))); + + writeBatches( + tableName("table_source"), + ThreeColumnRecord.class, + ImmutableList.of( + new ThreeColumnRecord(1, "A", "2024-01-01"), + new ThreeColumnRecord(2, "B", "2024-01-02")), + ImmutableList.of( + new ThreeColumnRecord(1, "C", "2024-01-03"), + new ThreeColumnRecord(2, "D", "2024-01-04"))); + + refreshTables(tableName, tableName("table_source")); + + assertPlanWithoutSort( + 0, + 2, + null, + "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c3 = t2.c3 AND t1.c1 = t2.c1", + tableName, + tableName("table_source")); + } + + @TestTemplate + public void testHistoricalSortOrderInMerge() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('write.merge.mode' = 'merge-on-read', '%s' = '%d')", + tableName, TableProperties.SPLIT_SIZE, 1024); + + Table targetTable = validationCatalog.loadTable(tableIdent); + setSortOrder(targetTable, "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of( + new ThreeColumnRecord(1, "old1", "data1"), new ThreeColumnRecord(2, "old2", "data2")), + ImmutableList.of( + new ThreeColumnRecord(3, "old3", "data3"), new ThreeColumnRecord(4, "old4", "data4"))); + + targetTable.replaceSortOrder().asc("c2").asc("c1").commit(); + + String sourceTableName = tableName("table_source"); + TableIdentifier sourceTableIdent = TableIdentifier.of(Namespace.of("default"), "table_source"); + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('%s' = '%d')", + sourceTableName, TableProperties.SPLIT_SIZE, 1024); + + Table sourceTable = validationCatalog.loadTable(sourceTableIdent); + setSortOrder(sourceTable, "c1"); + + writeBatches( + sourceTableName, + ThreeColumnRecord.class, + ImmutableList.of( + new ThreeColumnRecord(2, "new2", "data2"), new ThreeColumnRecord(3, "new3", "data3")), + ImmutableList.of( + new ThreeColumnRecord(5, "new5", "data5"), new ThreeColumnRecord(6, "new6", "data6"))); + + sourceTable.replaceSortOrder().asc("c2").asc("c1").commit(); + + refreshTables(tableName, sourceTableName); + + validationCatalog.loadTable(tableIdent).refresh(); + validationCatalog.loadTable(sourceTableIdent).refresh(); + + // Files have historical sort order [c1 ASC] but tables have [c2 ASC, c1 ASC] + assertPlanWithoutSort( + 3, + 3, + this::verifyMergeResults, + "MERGE INTO %s t USING %s s ON t.c1 = s.c1 " + + "WHEN MATCHED THEN UPDATE SET t.c2 = s.c2, t.c3 = s.c3 " + + "WHEN NOT MATCHED THEN INSERT *", + tableName, + sourceTableName); + } + + @TestTemplate + public void testProjectionPushdownSortKeyNotProjected() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (c3)", + tableName); + + Table table = validationCatalog.loadTable(tableIdent); + setSortOrder(table, "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of(new ThreeColumnRecord(1, "a", "P1"), new ThreeColumnRecord(3, "c", "P1")), + ImmutableList.of(new ThreeColumnRecord(2, "b", "P1"), new ThreeColumnRecord(4, "d", "P1"))); + + spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true"); + + // c1 is the sort key but is NOT in the SELECT list. + Dataset result = spark.sql(String.format("SELECT c2 FROM %s WHERE c3 = 'P1'", tableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows).hasSize(4).containsExactlyInAnyOrder(row("a"), row("b"), row("c"), row("d")); + } + + @TestTemplate + public void testMergeOnReadWithDeleteFiles() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES(" + + " 'write.delete.mode' = 'merge-on-read', " + + " 'write.merge.mode' = 'merge-on-read', " + + " 'format-version' = '2', " + + " '%s' = '%d'" + + ")", + tableName, TableProperties.SPLIT_SIZE, 1024); + + Table targetTable = validationCatalog.loadTable(tableIdent); + setSortOrder(targetTable, "c1"); + + writeBatches( + tableName, + ThreeColumnRecord.class, + ImmutableList.of( + new ThreeColumnRecord(1, "old1", "data1"), new ThreeColumnRecord(2, "old2", "data2")), + ImmutableList.of( + new ThreeColumnRecord(3, "old3", "data3"), new ThreeColumnRecord(4, "old4", "data4"))); + + sql("DELETE FROM %s WHERE c1 = 2", tableName); + + targetTable.refresh(); + long deleteFileCount = + Iterables.size(targetTable.currentSnapshot().addedDeleteFiles(targetTable.io())); + assertThat(deleteFileCount).isGreaterThan(0); + + String sourceTableName = tableName("table_source"); + TableIdentifier sourceTableIdent = TableIdentifier.of(Namespace.of("default"), "table_source"); + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('%s' = '%d')", + sourceTableName, TableProperties.SPLIT_SIZE, 1024); + + Table sourceTable = validationCatalog.loadTable(sourceTableIdent); + setSortOrder(sourceTable, "c1"); + + writeBatches( + sourceTableName, + ThreeColumnRecord.class, + ImmutableList.of( + new ThreeColumnRecord(2, "new2", "data2"), new ThreeColumnRecord(3, "new3", "data3")), + ImmutableList.of( + new ThreeColumnRecord(5, "new5", "data5"), new ThreeColumnRecord(6, "new6", "data6"))); + + refreshTables(tableName, sourceTableName); + + validationCatalog.loadTable(tableIdent).refresh(); + validationCatalog.loadTable(sourceTableIdent).refresh(); + + assertPlanWithoutSort( + 1, + 3, + this::verifyMergeResults, + "MERGE INTO %s t USING %s s ON t.c1 = s.c1 " + + "WHEN MATCHED THEN UPDATE SET t.c2 = s.c2, t.c3 = s.c3 " + + "WHEN NOT MATCHED THEN INSERT *", + tableName, + sourceTableName); + } + + private Table createSimpleTable(String name) { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", name); + return validationCatalog.loadTable(tableIdent); + } + + private Table createThreeColumnTable(String name) { + sql("CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) USING iceberg", name); + return validationCatalog.loadTable(tableIdent); + } + + private void createBucketedTable(String name, String... sortCols) { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, c1)) " + + "TBLPROPERTIES('%s' = '%d')", + name, TableProperties.SPLIT_SIZE, 1024); + + TableIdentifier ident = + name.equals(tableName) + ? tableIdent + : TableIdentifier.of(Namespace.of("default"), "table_source"); + Table table = validationCatalog.loadTable(ident); + + if (sortCols.length > 0) { + ReplaceSortOrder sortOrder = table.replaceSortOrder(); + for (String col : sortCols) { + sortOrder = sortOrder.asc(col); + } + sortOrder.commit(); + } + } + + @SafeVarargs + private void writeBatches(String tableName, Class recordClass, List... batches) + throws NoSuchTableException { + for (List batch : batches) { + spark.createDataFrame(batch, recordClass).coalesce(1).writeTo(tableName).append(); + } + } + + private void setSortOrder(Table table, String... columns) { + ReplaceSortOrder sortOrder = table.replaceSortOrder(); + for (String col : columns) { + sortOrder = sortOrder.asc(col); + } + sortOrder.commit(); + } + + private void refreshTable(String table) { + sql("REFRESH TABLE %s", table); + } + + private void refreshTables(String... tables) { + for (String table : tables) { + refreshTable(table); + } + } + + private void verifyMergeResults(String targetTableName) { + Dataset result = + spark.sql(String.format("SELECT c1, c2, c3 FROM %s ORDER BY c1", targetTableName)); + List rows = rowsToJava(result.collectAsList()); + + assertThat(rows) + .hasSize(6) + .containsExactly( + row(1, "old1", "data1"), // unchanged + row(2, "new2", "data2"), // updated from source + row(3, "new3", "data3"), // updated from source + row(4, "old4", "data4"), // unchanged + row(5, "new5", "data5"), // inserted from source + row(6, "new6", "data6")); // inserted from source + } + + private void assertPlanWithoutSort( + int expectedNumSortsWithOrdering, + int expectedNumSortsWithoutOrdering, + Consumer dataVerification, + String query, + Object... args) { + + AtomicReference> rowsWithOrdering = new AtomicReference<>(); + AtomicReference> rowsWithoutOrdering = new AtomicReference<>(); + + Table targetTable = validationCatalog.loadTable(tableIdent); + long snapshotBeforeExecution = targetTable.currentSnapshot().snapshotId(); + + withSQLConf( + ENABLED_ORDERING_SQL_CONF, + () -> { + String plan = executeAndKeepPlan(query, args).toString(); + int actualNumSorts = StringUtils.countMatches(plan, "Sort ["); + assertThat(actualNumSorts) + .as("Number of sorts with enabled ordering must match") + .isEqualTo(expectedNumSortsWithOrdering); + + sql("REFRESH TABLE %s", tableName); + validationCatalog.loadTable(tableIdent).refresh(); + + if (dataVerification != null) { + dataVerification.accept(tableName); + } else { + rowsWithOrdering.set(sql(query, args)); + } + }); + + sql( + "CALL %s.system.rollback_to_snapshot('%s', %dL)", + catalogName, tableName, snapshotBeforeExecution); + + sql("REFRESH TABLE %s", tableName); + validationCatalog.loadTable(tableIdent).refresh(); + + withSQLConf( + DISABLED_ORDERING_SQL_CONF, + () -> { + String plan = executeAndKeepPlan(query, args).toString(); + int actualNumSorts = StringUtils.countMatches(plan, "Sort ["); + assertThat(actualNumSorts) + .as("Number of sorts with disabled ordering must match") + .isEqualTo(expectedNumSortsWithoutOrdering); + + sql("REFRESH TABLE %s", tableName); + validationCatalog.loadTable(tableIdent).refresh(); + if (dataVerification != null) { + dataVerification.accept(tableName); + } else { + rowsWithoutOrdering.set(sql(query, args)); + } + }); + + if (dataVerification == null) { + assertEquals( + "Sort elimination should not change query output", + rowsWithoutOrdering.get(), + rowsWithOrdering.get()); + } + } +} From a6f020529fe65519e804ebec886827a914397cd5 Mon Sep 17 00:00:00 2001 From: Anurag Mantripragada Date: Thu, 2 Apr 2026 20:23:07 -0700 Subject: [PATCH 2/2] Avoid redundant checking of sort order --- .../iceberg/spark/SparkSQLProperties.java | 4 +- .../spark/source/SortOrderAnalyzer.java | 2 +- .../iceberg/spark/source/SparkBatch.java | 14 +-- .../spark/source/SparkChangelogScan.java | 3 +- .../source/SparkPartitioningAwareScan.java | 27 +++--- .../iceberg/spark/source/SparkScan.java | 9 +- .../source/TestSupportsReportOrdering.java | 94 +++++++++++++++++++ 7 files changed, 128 insertions(+), 25 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index e265e68e73f0..ea17f2c8db21 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -40,7 +40,9 @@ private SparkSQLProperties() {} "spark.sql.iceberg.planning.preserve-data-grouping"; public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; - // Controls whether to preserve data ordering and report it to Spark + // Controls whether to report sort order to Spark Requires + // spark.sql.iceberg.planning.preserve-data-grouping=true to have any effect. Ordering + // is only reported when KeyGroupedPartitioning is active (i.e. grouping is enabled). public static final String PRESERVE_DATA_ORDERING = "spark.sql.iceberg.planning.preserve-data-ordering"; public static final boolean PRESERVE_DATA_ORDERING_DEFAULT = false; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java index 26b3875d7913..9d85690d309c 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java @@ -36,7 +36,7 @@ *

For sort ordering to be reported, ALL of these conditions must hold: * *