Skip to content

[SPARK-51187][SQL][SS] Implement the graceful deprecation of incorrect config introduced in SPARK-49699 #49983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/streaming/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](../sql-migration-gui
- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)
- Since Spark 4.0, new configuration `spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint` (default: `0.3`) controls the amount of additional space allowed in the checkpoint directory to store stale version files for batch deletion inside maintenance task. This is to amortize the cost of listing in cloud store. Setting this to `0` defaults to the old behavior. (See [SPARK-48931](https://issues.apache.org/jira/browse/SPARK-48931) for more details.)
- Since Spark 4.0, when relative path is used to output data in `DataStreamWriter` the resolution to absolute path is done in the Spark Driver and is not deferred to Spark Executor. This is to make Structured Streaming behavior similar to DataFrame API (`DataFrameWriter`). (See [SPARK-50854](https://issues.apache.org/jira/browse/SPARK-50854) for more details.)
- Since Spark 4.0, the deprecated config `spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan` has been removed. (See [SPARK-51187](https://issues.apache.org/jira/browse/SPARK-51187) for more details.)

## Upgrading from Structured Streaming 3.3 to 3.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,16 @@ object OffsetSeqMetadata extends Logging {
}
}
}

// SPARK-51187: This incorrect config is not added in the relevantSQLConfs, but the
// metadata in the offset log may have this if the batch ran from Spark 3.5.4.
// We need to pick the value from the metadata and set it in the new config.
// This also leads the further batches to have a correct config in the offset log.
metadata.conf.get("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan") match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add TODO with mentioning removal when we want to file JIRA ticket for removal immediately.

case Some(value) =>
sessionConf.setConfString(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key, value)

case _ =>
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"3f409b2c-b22b-49f6-b6e4-86c2bdcddaba"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1739419905155,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1739419906627,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
1
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,75 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

test("SPARK-51187 validate that the incorrect config introduced in SPARK-49699 still takes " +
"effect when restarting from Spark 3.5.4") {
// Spark 3.5.4 is the only release we accidentally introduced the incorrect config.
// We just need to confirm that current Spark version will apply the fix of SPARK-49699 when
// the streaming query started from Spark 3.5.4. We should consistently apply the fix, instead
// of "on and off", because that may expose more possibility to break.

val problematicConfName = "spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan"

withTempDir { dir =>
val input = getClass.getResource("/structured-streaming/checkpoint-version-3.5.4")
assert(input != null, "cannot find test resource")
val inputDir = new File(input.toURI)

// Copy test files to tempDir so that we won't modify the original data.
FileUtils.copyDirectory(inputDir, dir)

// Below is the code we extract checkpoint from Spark 3.5.4. We need to make sure the offset
// advancement continues from the last run.
val inputData = MemoryStream[Int]
val df = inputData.toDF()

inputData.addData(1, 2, 3, 4)
inputData.addData(5, 6, 7, 8)

testStream(df)(
StartStream(checkpointLocation = dir.getCanonicalPath),
AddData(inputData, 9, 10, 11, 12),
ProcessAllAvailable(),
AssertOnQuery { q =>
val confValue = q.lastExecution.sparkSession.conf.get(
SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN)
assert(confValue === false,
"The value for the incorrect config in offset metadata should be respected as the " +
"value of the fixed config")

val offsetLog = new OffsetSeqLog(spark, new File(dir, "offsets").getCanonicalPath)
def checkConfigFromMetadata(batchId: Long, expectCorrectConfig: Boolean): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a bit hard to understand from non-streaming expert folks. Please let me know if you need some further explanation to justify the logic.

val offsetLogForBatch = offsetLog.get(batchId).get
val confInMetadata = offsetLogForBatch.metadata.get.conf
if (expectCorrectConfig) {
assert(confInMetadata.get(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key) ===
Some("false"),
"The new offset log should have the fixed config instead of the incorrect one."
)
assert(!confInMetadata.contains(problematicConfName),
"The new offset log should not have the incorrect config.")
} else {
assert(
confInMetadata.get(problematicConfName) === Some("false"),
"The offset log in test resource should have the incorrect config to test properly."
)
assert(
!confInMetadata.contains(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key),
"The offset log in test resource should not have the fixed config."
)
}
}

assert(offsetLog.getLatestBatchId() === Some(2))
checkConfigFromMetadata(0, expectCorrectConfig = false)
checkConfigFromMetadata(1, expectCorrectConfig = false)
checkConfigFromMetadata(2, expectCorrectConfig = true)
true
}
)
}
}

private def checkAppendOutputModeException(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down