-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SDP] Validate streaming-ness of DFs returned by SDP table and standalone flow definitions #51208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SDP] Validate streaming-ness of DFs returned by SDP table and standalone flow definitions #51208
Conversation
…definitions (apache#122) <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. 8. If you want to add or modify an error type or message, please read the guideline first in 'common/utils/src/main/resources/error/README.md'. --> ### What changes were proposed in this pull request? Several tests are broken with these changes, run `dev/declarative-pipelines-ci.sh` to see failures. Most tests should be easy to fix by calling the correct [proposed] `createTable` vs `createMaterializedView` interfaces from `TestGraphRegistrationContext` and/or by making sure streaming table definitions in tests _actually_ return a streaming DFs, but some test failures may require deeper thought. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> ### Does this PR introduce _any_ user-facing change? <!-- Note that it means *any* user-facing change including all aspects such as new features, bug fixes, or other behavior changes. Documentation-only updates are not considered user-facing changes. If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master. If no, write 'No'. --> ### How was this patch tested? `./dev/declarative-pipelines-ci.sh` passes ### Was this patch authored or co-authored using generative AI tooling? <!-- If generative AI tooling has been used in the process of authoring this patch, please include the phrase: 'Generated-by: ' followed by the name of the tool and its version. If no, write 'No'. Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details. -->
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
val destTableOpt = table.get(destTableIdentifier) | ||
|
||
// If the destination identifier does not correspond to a table, it must be a view. | ||
val destViewOpt = destTableOpt.fold(view.get(destTableIdentifier))(_ => None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use view.get(destTableIdentifier)
here?
@@ -55,6 +55,75 @@ trait GraphValidations extends Logging { | |||
multiQueryTables | |||
} | |||
|
|||
protected[graph] def validateFlowStreamingness(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could use some docstring. It doesn't need to go into the individual cases (because that could get out of sync easily), but could be at the level of "Some dataset types require that the flows that target them are backed by streaming queries. Others require batch queries. This validates that the dataset types match whether the flow query is streaming or batch".
// If the destination identifier does not correspond to a table, it must be a view. | ||
val destViewOpt = destTableOpt.fold(view.get(destTableIdentifier))(_ => None) | ||
|
||
flows.foreach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick you can feel free to ignore: this is pretty heavily nested. Could remove a level of nesting with:
val resolvedFlows: Seq[ResolvedFlow] = flows.collect { case rf: ResolvedFlow => rf }
@@ -2719,6 +2719,33 @@ | |||
], | |||
"sqlState" : "42000" | |||
}, | |||
"INVALID_FLOW_RELATION_TYPE" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason to use "relation" instead of "query"? I think the latter is more common in docs.
"Flow <flowIdentifier> returns an invalid relation type." | ||
], | ||
"subClass" : { | ||
"FOR_MATERIALIZED_VIEW" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on being more descriptive with these like STREAMING_RELATION_FOR_MATERIALIZED_VIEW
/ BATCH_RELATION_FOR_STREAMING_TABLE
?
Let's file a JIRA |
What changes were proposed in this pull request?
Validate that streaming flows are actually backed by streaming sources, and batch flows are actually backed by batch sources. Also improve SDP test harnesses to be explicit about whether a streaming table or materialized view is being created, to match the Python/SQL API.
Why are the changes needed?
This change helps prevent incorrect usage of streaming/batch flows, such as directly reading from a batch source from a streaming table's flow. In this case for example, the
STREAM
key word to mark a SQL batch source as streaming orreadStream
should be used in Python to stream read from an otherwise non-streaming file source.Does this PR introduce any user-facing change?
No, as this impacts SDP which is not released in any Spark version yet.
How was this patch tested?
Existing suites and added tests to
ConnectInvalidPipelineSuite
Was this patch authored or co-authored using generative AI tooling?
No