diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 8aa3edafa8fc7..54d6bcb983fc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -96,6 +96,7 @@ trait RocksDBStateStoreChangelogCheckpointingTestUtil { case Array(version) => version.toLong } .sorted + .distinct .toImmutableArraySeq } @@ -274,7 +275,7 @@ trait AlsoTestWithRocksDBFeatures Seq(true, false).foreach { enableStateStoreCheckpointIds => val newTestName = s"$testName - with enableStateStoreCheckpointIds = " + s"$enableStateStoreCheckpointIds" - test(newTestName, testTags: _*) { enableStateStoreCheckpointIds => + test(newTestName, testTags: _*) { testBody(enableStateStoreCheckpointIds) } } @@ -287,8 +288,8 @@ trait AlsoTestWithRocksDBFeatures Seq(true, false).foreach { enableStateStoreCheckpointIds => val newTestName = s"$testName - with enableStateStoreCheckpointIds = " + s"$enableStateStoreCheckpointIds" - testWithChangelogCheckpointingDisabled(newTestName, testTags: _*) { - enableStateStoreCheckpointIds => testBody(enableStateStoreCheckpointIds) + testWithChangelogCheckpointingEnabled(newTestName, testTags: _*) { + testBody(enableStateStoreCheckpointIds) } } } @@ -2813,7 +2814,10 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession // upload snapshot 4.zip db.doMaintenance() } - withDB(remoteDir, version = 4, conf = conf) { db => + withDB(remoteDir, version = 4, conf = conf, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId + ) { db => } }) } @@ -2842,7 +2846,10 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.doMaintenance() } - withDB(remoteDir, version = 4, conf = conf) { db => + withDB(remoteDir, version = 4, conf = conf, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId + ) { db => } } @@ -2872,8 +2879,10 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } // reload version 2 - should succeed - withDB(remoteDir, version = 2, conf = conf) { db => - } + withDB(remoteDir, version = 2, conf = conf, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId + ) { db => } } testWithStateStoreCheckpointIdsAndChangelogEnabled("time travel 5 - validate successful " + @@ -2906,7 +2915,9 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.commit() // create snapshot again // load version 1 - should succeed - withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId) { db => } // upload recently created snapshot @@ -2914,8 +2925,9 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession assert(snapshotVersionsPresent(remoteDir) == Seq(1)) // load version 1 again - should succeed - withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => - } + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId) { db => } } } } @@ -3025,7 +3037,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession versionToUniqueId = versionToUniqueId) { db2 => val random = new Random(randomSeed) var curVer: Int = 0 - for (i <- 1 to 100) { + for (i <- 1 to 10) { db.load(curVer, versionToUniqueId.get(curVer)) db.put("foo", "bar") db.commit()