Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -4108,6 +4108,7 @@ object SQLConf {
.doc("Allow PruneFilters to remove streaming subplans when we encounter a false filter. " +
"This flag is to restore prior buggy behavior for broken pipelines.")
.version("4.0.0")
.withAlternative("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
.booleanConf
.createWithDefault(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,16 @@ object OffsetSeqMetadata extends Logging {
}
}
}

// SPARK-51187: This incorrect config is not added in the relevantSQLConfs, but the
// metadata in the offset log may have this if the batch ran from Spark 3.5.4.
// We need to pick the value from the metadata and set it in the new config.
// This also leads the further batches to have a correct config in the offset log.
metadata.conf.get("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan") match {
case Some(value) =>
sessionConf.setConfString(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key, value)

case _ =>
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"3f409b2c-b22b-49f6-b6e4-86c2bdcddaba"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1739419905155,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1739419906627,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
1
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,75 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

test("SPARK-51187 validate that the incorrect config introduced in SPARK-49699 still takes " +
"effect when restarting from Spark 3.5.4") {
// Spark 3.5.4 is the only release we accidentally introduced the incorrect config.
// We just need to confirm that current Spark version will apply the fix of SPARK-49699 when
// the streaming query started from Spark 3.5.4. We should consistently apply the fix, instead
// of "on and off", because that may expose more possibility to break.

val problematicConfName = "spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan"

withTempDir { dir =>
val input = getClass.getResource("/structured-streaming/checkpoint-version-3.5.4")
assert(input != null, "cannot find test resource")
val inputDir = new File(input.toURI)

// Copy test files to tempDir so that we won't modify the original data.
FileUtils.copyDirectory(inputDir, dir)

// Below is the code we extract checkpoint from Spark 3.5.4. We need to make sure the offset
// advancement continues from the last run.
val inputData = MemoryStream[Int]
val df = inputData.toDF()

inputData.addData(1, 2, 3, 4)
inputData.addData(5, 6, 7, 8)

testStream(df)(
StartStream(checkpointLocation = dir.getCanonicalPath),
AddData(inputData, 9, 10, 11, 12),
ProcessAllAvailable(),
AssertOnQuery { q =>
val confValue = q.lastExecution.sparkSession.conf.get(
SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN)
assert(confValue === false,
"The value for the incorrect config in offset metadata should be respected as the " +
"value of the fixed config")

val offsetLog = new OffsetSeqLog(spark, new File(dir, "offsets").getCanonicalPath)
def checkConfigFromMetadata(batchId: Long, expectCorrectConfig: Boolean): Unit = {
val offsetLogForBatch = offsetLog.get(batchId).get
val confInMetadata = offsetLogForBatch.metadata.get.conf
if (expectCorrectConfig) {
assert(confInMetadata.get(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key) ===
Some("false"),
"The new offset log should have the fixed config instead of the incorrect one."
)
assert(!confInMetadata.contains(problematicConfName),
"The new offset log should not have the incorrect config.")
} else {
assert(
confInMetadata.get(problematicConfName) === Some("false"),
"The offset log in test resource should have the incorrect config to test properly."
)
assert(
!confInMetadata.contains(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key),
"The offset log in test resource should not have the fixed config."
)
}
}

assert(offsetLog.getLatestBatchId() === Some(2))
checkConfigFromMetadata(0, expectCorrectConfig = false)
checkConfigFromMetadata(1, expectCorrectConfig = false)
checkConfigFromMetadata(2, expectCorrectConfig = true)
true
}
)
}
}

private def checkAppendOutputModeException(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down