-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Capture element timestamp in DoFnRunners so they do not access mutable fields #36838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @kennknowles, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the reliability of timestamp validation within Beam's DoFnRunners by introducing a practice of eagerly capturing and storing element timestamps. This ensures that validation logic consistently operates on the original timestamp of an element, mitigating risks associated with mutable element states and potential inconsistencies in timestamp checks, especially within asynchronous or deferred execution contexts like lambdas. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
R: @kellen |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
|
Aren't elements expected to be immutable? DirectRunner even has mutability checks to fail pipelines that mutate elements. |
|
Yes, the elements are immutable. But we mutate a local field beam/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java Line 624 in b369dfa
So I think if the lambda is called asynchronously after another element has started processing this field could be set to a different element. |
|
But now that I look closer, |
|
Thanks @kennknowles, @clairemcginty is planning to do some testing today. |
Ran our tests with a locally built snapshot of this branch, still seeing the following error: is it possible the precision of the timestamp checks changed, or something like that? |
I don't think so. There's https://s.apache.org/beam-timestamp-strategy but I don't think it is related to this. Please note that this failure is in a different place than the original report so my commentary may not apply to both. I looked at the Scio code at https://github.com/spotify/scio/blob/041ae06b881dc142ec7e5e6392af7bf627a5d75f/scio-core/src/main/java/com/spotify/scio/transforms/FileDownloadDoFn.java#L105 and I do see that the DoFn is pulling elements from In discussion with @scwhittle he points out that this would be perfectly legal if done from
I suggest this:
|
|
I'm not convinced that checking against the watermark is correct for
several reasons. However I agree that the current check is a bit
restrictive in cases like this.
…On Thu, Nov 20, 2025 at 7:42 AM Kenn Knowles ***@***.***> wrote:
*kennknowles* left a comment (apache/beam#36838)
<#36838 (comment)>
Thanks @kennknowles <https://github.com/kennknowles>, @clairemcginty
<https://github.com/clairemcginty> is planning to do some testing today.
Ran our tests
<https://github.com/spotify/scio/actions/runs/19513763696/job/55860003717>
with a locally built snapshot of this branch, still seeing the following
error:
[info] Cause: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.010Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info] at org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:263)
[info] at org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:89)
[info] at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:464)
[info] at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:222)
[info] at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:467)
[info] at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
[info] at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:222)
[info] at org.apache.beam.sdk.transforms.DoFn$OutputReceiver.outputWindowedValue(DoFn.java:416)
[info] at com.spotify.scio.transforms.FileDownloadDoFn.lambda$processElement$0(FileDownloadDoFn.java:105)
[info] at com.spotify.scio.transforms.FileDownloadDoFn.flush(FileDownloadDoFn.java:141)
[info] ...
is it possible the precision of the timestamp checks changed, or something
like that? 1970-01-01T00:00:00.001Z and 1970-01-01T00:00:00.010Z are only
off by a millisecond
I don't think so. There's https://s.apache.org/beam-timestamp-strategy
but I don't think it is related to this.
Please note that this failure is in a different place than the original
report so my commentary may not apply to both. I looked at the Scio code at
https://github.com/spotify/scio/blob/041ae06b881dc142ec7e5e6392af7bf627a5d75f/scio-core/src/main/java/com/spotify/scio/transforms/FileDownloadDoFn.java#L105
and I do see that the DoFn is pulling elements from batch and outputting
them in the context of a different element. So this will definitely look to
the runtime like outputting an element with a timestamp that is older than
the current input element. I think the fact that it worked before may have
just been luck. Though I can see how unlikely it is for it to have worked
for as long as it has on that basis. Nonetheless the fact that the
FileDownloadDoFn would hit this error is pretty plain.
In discussion with @scwhittle <https://github.com/scwhittle> he points
out that this would be perfectly legal if done from @FinishBundle. So it
is a bit restrictive to disallow it in a process element. The use case of
accumulating a batch in a local variable and flushing periodically as in
FileDownloadDoFn is clearly something we want to allow. To do so we have
a few options:
- Eliminate the timestamp skew check. This might be fine, especially
if it is done intentionally. The check is just to protect DoFn authors from
error.
- Track all timestamps in the bundle and allow skew back to the
earliest in the bundle. This seems pretty ad hoc. It allows this use case
but without sound reasoning.
- Check timestamps against the watermark rather than the current
element. I think this is principled but I'm not sure if we have plumbed the
needed watermark to the right place for this to be easy.
I suggest this:
- Immediately, you can just set allowed skew to a large number and it
will effectively disable the check.
- Short term, we can add a method to just disable the check entirely
for a DoFn that has this computation pattern.
- Medium term, we can revise it to check against the watermark, after
thinking a bit more to ensure this is going to be ok.
—
Reply to this email directly, view it on GitHub
<#36838 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVP2PIHWBTR6TH64DDT35XOPZAVCNFSM6AAAAACMLTMENKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTKNJYG4ZTMMBSGI>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
Reported in https://github.com/apache/beam/pull/34902/files#r2528479326 that there was a problem with the timestamp validation. A possible cause is validation against the timestamp of "current element" at a moment when the element has been mutated to hold a different value. By accessing the field once and storing the retrieved immutable value in the closure, that possibility should be eliminated.
Note: this PR is better practice even if it doesn't fix the problem, so it can be merged either way.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.