Skip to content

Commit 0996196

Browse files
HeartSaVioRcloud-fan
authored andcommitted
[SPARK-51187][SPARK-49699][SQL][SS][4.0] Implement the graceful deprecation of incorrect config introduced in
### What changes were proposed in this pull request? This PR proposes to implement the graceful deprecation of incorrect config introduced in SPARK-49699. SPARK-49699 was included in Spark 3.5.4, hence we can't simply rename to fix the issue. Also, since the incorrect config is logged in offset log in streaming query, the fix isn't just easy like adding withAlternative and done. We need to manually handle the case where offset log contains the incorrect config, and set the value of incorrect config in the offset log into the new config. Once a single microbatch has planned after the restart (hence the above logic is applied), offset log will contain the "new" config and it will no longer refer to the incorrect config. That said, we can remove the incorrect config in the Spark version which we are confident that there will be no case users will upgrade from Spark 3.5.4 to that version. ### Why are the changes needed? We released an incorrect config and we want to rename it properly. While renaming, we don't also want to have any breakage on the existing streaming query. ### Does this PR introduce _any_ user-facing change? No. That is what this PR is aiming for. ### How was this patch tested? New UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49984 from HeartSaVioR/SPARK-51187-4.0. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 909ea96 commit 0996196

File tree

8 files changed

+92
-0
lines changed

8 files changed

+92
-0
lines changed

docs/streaming/ss-migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](../sql-migration-gui
2828
- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)
2929
- Since Spark 4.0, new configuration `spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint` (default: `0.3`) controls the amount of additional space allowed in the checkpoint directory to store stale version files for batch deletion inside maintenance task. This is to amortize the cost of listing in cloud store. Setting this to `0` defaults to the old behavior. (See [SPARK-48931](https://issues.apache.org/jira/browse/SPARK-48931) for more details.)
3030
- Since Spark 4.0, when relative path is used to output data in `DataStreamWriter` the resolution to absolute path is done in the Spark Driver and is not deferred to Spark Executor. This is to make Structured Streaming behavior similar to DataFrame API (`DataFrameWriter`). (See [SPARK-50854](https://issues.apache.org/jira/browse/SPARK-50854) for more details.)
31+
- Since Spark 4.0, the deprecated config `spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan` has been removed. (See [SPARK-51187](https://issues.apache.org/jira/browse/SPARK-51187) for more details.)
3132

3233
## Upgrading from Structured Streaming 3.3 to 3.4
3334

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,16 @@ object OffsetSeqMetadata extends Logging {
180180
}
181181
}
182182
}
183+
184+
// SPARK-51187: This incorrect config is not added in the relevantSQLConfs, but the
185+
// metadata in the offset log may have this if the batch ran from Spark 3.5.4.
186+
// We need to pick the value from the metadata and set it in the new config.
187+
// This also leads the further batches to have a correct config in the offset log.
188+
metadata.conf.get("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan") match {
189+
case Some(value) =>
190+
sessionConf.setConfString(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key, value)
191+
192+
case _ =>
193+
}
183194
}
184195
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"id":"3f409b2c-b22b-49f6-b6e4-86c2bdcddaba"}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1739419905155,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
3+
0
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1739419906627,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
3+
1

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,75 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
14711471
)
14721472
}
14731473

1474+
test("SPARK-51187 validate that the incorrect config introduced in SPARK-49699 still takes " +
1475+
"effect when restarting from Spark 3.5.4") {
1476+
// Spark 3.5.4 is the only release we accidentally introduced the incorrect config.
1477+
// We just need to confirm that current Spark version will apply the fix of SPARK-49699 when
1478+
// the streaming query started from Spark 3.5.4. We should consistently apply the fix, instead
1479+
// of "on and off", because that may expose more possibility to break.
1480+
1481+
val problematicConfName = "spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan"
1482+
1483+
withTempDir { dir =>
1484+
val input = getClass.getResource("/structured-streaming/checkpoint-version-3.5.4")
1485+
assert(input != null, "cannot find test resource")
1486+
val inputDir = new File(input.toURI)
1487+
1488+
// Copy test files to tempDir so that we won't modify the original data.
1489+
FileUtils.copyDirectory(inputDir, dir)
1490+
1491+
// Below is the code we extract checkpoint from Spark 3.5.4. We need to make sure the offset
1492+
// advancement continues from the last run.
1493+
val inputData = MemoryStream[Int]
1494+
val df = inputData.toDF()
1495+
1496+
inputData.addData(1, 2, 3, 4)
1497+
inputData.addData(5, 6, 7, 8)
1498+
1499+
testStream(df)(
1500+
StartStream(checkpointLocation = dir.getCanonicalPath),
1501+
AddData(inputData, 9, 10, 11, 12),
1502+
ProcessAllAvailable(),
1503+
AssertOnQuery { q =>
1504+
val confValue = q.lastExecution.sparkSession.conf.get(
1505+
SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN)
1506+
assert(confValue === false,
1507+
"The value for the incorrect config in offset metadata should be respected as the " +
1508+
"value of the fixed config")
1509+
1510+
val offsetLog = new OffsetSeqLog(spark, new File(dir, "offsets").getCanonicalPath)
1511+
def checkConfigFromMetadata(batchId: Long, expectCorrectConfig: Boolean): Unit = {
1512+
val offsetLogForBatch = offsetLog.get(batchId).get
1513+
val confInMetadata = offsetLogForBatch.metadata.get.conf
1514+
if (expectCorrectConfig) {
1515+
assert(confInMetadata.get(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key) ===
1516+
Some("false"),
1517+
"The new offset log should have the fixed config instead of the incorrect one."
1518+
)
1519+
assert(!confInMetadata.contains(problematicConfName),
1520+
"The new offset log should not have the incorrect config.")
1521+
} else {
1522+
assert(
1523+
confInMetadata.get(problematicConfName) === Some("false"),
1524+
"The offset log in test resource should have the incorrect config to test properly."
1525+
)
1526+
assert(
1527+
!confInMetadata.contains(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key),
1528+
"The offset log in test resource should not have the fixed config."
1529+
)
1530+
}
1531+
}
1532+
1533+
assert(offsetLog.getLatestBatchId() === Some(2))
1534+
checkConfigFromMetadata(0, expectCorrectConfig = false)
1535+
checkConfigFromMetadata(1, expectCorrectConfig = false)
1536+
checkConfigFromMetadata(2, expectCorrectConfig = true)
1537+
true
1538+
}
1539+
)
1540+
}
1541+
}
1542+
14741543
private def checkAppendOutputModeException(df: DataFrame): Unit = {
14751544
withTempDir { outputDir =>
14761545
withTempDir { checkpointDir =>

0 commit comments

Comments
 (0)