diff --git a/.github/actions/setup-spark-local-jar/action.yaml b/.github/actions/setup-spark-local-jar/action.yaml index 5334bf1ea6..35b89858a2 100644 --- a/.github/actions/setup-spark-local-jar/action.yaml +++ b/.github/actions/setup-spark-local-jar/action.yaml @@ -44,5 +44,5 @@ runs: cd apache-spark git apply ../dev/diffs/${{inputs.spark-version}}.diff ./dev/change-scala-version.sh ${{inputs.scala-version}} - ./build/mvn versions:set -DnewVersion=${{inputs.spark-version}}-SNAPSHOT + ./build/mvn versions:set -DnewVersion=${{inputs.spark-version}}-SNAPSHOT -DgenerateBackupPoms=false ./build/mvn -Pscala-${{inputs.scala-version}} -Phive -Phive-thriftserver -DskipTests -Denforcer.skip=true clean install diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index f90141dc7b..60312ca9a4 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -45,7 +45,7 @@ jobs: matrix: os: [ubuntu-24.04] java-version: [11, 17] - iceberg-version: [{short: '1.8', full: '1.8.1'}] + iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}] spark-version: [{short: '3.5', full: '3.5.6'}] scala-version: ['2.13'] fail-fast: false diff --git a/dev/diffs/iceberg/1.9.1.diff b/dev/diffs/iceberg/1.9.1.diff new file mode 100644 index 0000000000..0be66ff169 --- /dev/null +++ b/dev/diffs/iceberg/1.9.1.diff @@ -0,0 +1,296 @@ +diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml +index c50991c..1d6afac 100644 +--- a/gradle/libs.versions.toml ++++ b/gradle/libs.versions.toml +@@ -80,7 +80,7 @@ scala-collection-compat = "2.13.0" + slf4j = "2.0.17" + snowflake-jdbc = "3.23.2" + spark34 = "3.4.4" +-spark35 = "3.5.5" ++spark35 = "3.5.6-SNAPSHOT" + sqlite-jdbc = "3.49.1.0" + testcontainers = "1.20.6" + tez08 = { strictly = "0.8.4"} # see rich version usage explanation above +diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle +index 2cd8666..c1893b1 100644 +--- a/spark/v3.4/build.gradle ++++ b/spark/v3.4/build.gradle +@@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { + exclude group: 'org.roaringbitmap' + } + +- compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" ++ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT" + + implementation libs.parquet.column + implementation libs.parquet.hadoop +@@ -186,7 +186,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer + testImplementation libs.parquet.hadoop + testImplementation libs.awaitility + testImplementation libs.junit.vintage.engine +- testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" ++ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT" + + // Required because we remove antlr plugin dependencies from the compile configuration, see note above + runtimeOnly libs.antlr.runtime +@@ -267,6 +267,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio + integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') + integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') ++ integrationImplementation project(path: ':iceberg-parquet') ++ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT" + + // runtime dependencies for running Hive Catalog based integration test + integrationRuntimeOnly project(':iceberg-hive-metastore') +@@ -304,8 +306,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio + relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro' + relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded' + relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer' +- relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' +- relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' ++// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' ++// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' + relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' + relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' + relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5' +diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +index 0ca1236..87daef4 100644 +--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java ++++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +@@ -29,7 +29,7 @@ public class SparkSQLProperties { + + // Controls which Parquet reader implementation to use + public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type"; +- public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG; ++ public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET; + + // Controls whether reading/writing timestamps without timezones is allowed + @Deprecated +diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +index 16159dc..f6c6c95 100644 +--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java ++++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +@@ -19,11 +19,11 @@ + package org.apache.iceberg.spark.data.vectorized; + + import java.io.IOException; ++import org.apache.comet.CometSchemaImporter; + import org.apache.comet.parquet.AbstractColumnReader; + import org.apache.comet.parquet.ColumnReader; + import org.apache.comet.parquet.TypeUtil; + import org.apache.comet.parquet.Utils; +-import org.apache.comet.shaded.arrow.c.CometSchemaImporter; + import org.apache.comet.shaded.arrow.memory.RootAllocator; + import org.apache.iceberg.parquet.VectorizedReader; + import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +@@ -92,7 +92,7 @@ class CometColumnReader implements VectorizedReader { + } + + this.importer = new CometSchemaImporter(new RootAllocator()); +- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); ++ this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false); + this.initialized = true; + } + +diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +index a361a7f..b21859b 100644 +--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ++++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +@@ -24,6 +24,7 @@ import java.util.Objects; + import java.util.Set; + import java.util.function.Supplier; + import java.util.stream.Collectors; ++import org.apache.comet.parquet.SupportsComet; + import org.apache.iceberg.DeleteFile; + import org.apache.iceberg.FileContent; + import org.apache.iceberg.FileScanTask; +@@ -63,7 +64,7 @@ import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + class SparkBatchQueryScan extends SparkPartitioningAwareScan +- implements SupportsRuntimeV2Filtering { ++ implements SupportsRuntimeV2Filtering, SupportsComet { + + private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class); + +@@ -290,4 +291,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan + runtimeFilterExpressions, + caseSensitive()); + } ++ ++ @Override ++ public boolean isCometEnabled() { ++ return true; ++ } + } +diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java +index 47a0e87..531b7ce 100644 +--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java ++++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java +@@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf; + import org.junit.After; + import org.junit.Assert; + import org.junit.Before; ++import org.junit.Ignore; + import org.junit.Test; + + public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog { +@@ -214,7 +215,7 @@ public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog { + Assert.assertEquals(4, fields.size()); + } + +- @Test ++ @Ignore + public void testMergeSchemaIgnoreCastingLongToInt() throws Exception { + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')", +@@ -254,7 +255,7 @@ public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog { + assertThat(idField.type().typeId()).isEqualTo(Type.TypeID.LONG); + } + +- @Test ++ @Ignore + public void testMergeSchemaIgnoreCastingDoubleToFloat() throws Exception { + removeTables(); + sql("CREATE TABLE %s (id double, data string) USING iceberg", tableName); +diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle +index 572c32f..88ef87d 100644 +--- a/spark/v3.5/build.gradle ++++ b/spark/v3.5/build.gradle +@@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { + exclude group: 'org.roaringbitmap' + } + +- compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" ++ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT" + + implementation libs.parquet.column + implementation libs.parquet.hadoop +@@ -184,7 +184,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer + testImplementation libs.avro.avro + testImplementation libs.parquet.hadoop + testImplementation libs.awaitility +- testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" ++ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT" + + // Required because we remove antlr plugin dependencies from the compile configuration, see note above + runtimeOnly libs.antlr.runtime +@@ -265,6 +265,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio + integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') + integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') ++ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT" + + // runtime dependencies for running Hive Catalog based integration test + integrationRuntimeOnly project(':iceberg-hive-metastore') +@@ -302,8 +303,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio + relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro' + relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded' + relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer' +- relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' +- relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' ++// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' ++// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' + relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' + relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' + relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5' +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +index d6c16bb..123a300 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +@@ -29,7 +29,7 @@ public class SparkSQLProperties { + + // Controls which Parquet reader implementation to use + public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type"; +- public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG; ++ public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET; + // Controls whether to perform the nullability check during writes + public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability"; + public static final boolean CHECK_NULLABILITY_DEFAULT = true; +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +index 16159dc..f6c6c95 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +@@ -19,11 +19,11 @@ + package org.apache.iceberg.spark.data.vectorized; + + import java.io.IOException; ++import org.apache.comet.CometSchemaImporter; + import org.apache.comet.parquet.AbstractColumnReader; + import org.apache.comet.parquet.ColumnReader; + import org.apache.comet.parquet.TypeUtil; + import org.apache.comet.parquet.Utils; +-import org.apache.comet.shaded.arrow.c.CometSchemaImporter; + import org.apache.comet.shaded.arrow.memory.RootAllocator; + import org.apache.iceberg.parquet.VectorizedReader; + import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +@@ -92,7 +92,7 @@ class CometColumnReader implements VectorizedReader { + } + + this.importer = new CometSchemaImporter(new RootAllocator()); +- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); ++ this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false); + this.initialized = true; + } + +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +index a361a7f..9021cd5 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +@@ -24,6 +24,7 @@ import java.util.Objects; + import java.util.Set; + import java.util.function.Supplier; + import java.util.stream.Collectors; ++import org.apache.comet.parquet.SupportsComet; + import org.apache.iceberg.DeleteFile; + import org.apache.iceberg.FileContent; + import org.apache.iceberg.FileScanTask; +@@ -63,7 +64,7 @@ import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + class SparkBatchQueryScan extends SparkPartitioningAwareScan +- implements SupportsRuntimeV2Filtering { ++ implements SupportsRuntimeV2Filtering, SupportsComet { + + private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class); + +@@ -290,4 +291,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan + runtimeFilterExpressions, + caseSensitive()); + } ++ ++ @Override ++ public boolean isCometEnabled() { ++ return true; ++ } + } +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java +index 7404b18..6ce9485 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java +@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException; + import org.apache.spark.sql.internal.SQLConf; + import org.junit.jupiter.api.AfterEach; + import org.junit.jupiter.api.BeforeEach; ++import org.junit.jupiter.api.Disabled; + import org.junit.jupiter.api.TestTemplate; + + public class TestDataFrameWriterV2 extends TestBaseWithCatalog { +@@ -248,7 +249,7 @@ public class TestDataFrameWriterV2 extends TestBaseWithCatalog { + sql("select * from %s order by id", tableName)); + } + +- @TestTemplate ++ @Disabled + public void testMergeSchemaIgnoreCastingLongToInt() throws Exception { + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')", +@@ -288,7 +289,7 @@ public class TestDataFrameWriterV2 extends TestBaseWithCatalog { + assertThat(idField.type().typeId()).isEqualTo(Type.TypeID.LONG); + } + +- @TestTemplate ++ @Disabled + public void testMergeSchemaIgnoreCastingDoubleToFloat() throws Exception { + removeTables(); + sql("CREATE TABLE %s (id double, data string) USING iceberg", tableName);