diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index aaf5ab50160a..bba1872a33e8 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -1,5 +1,6 @@ { "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 1, diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index d266aa094efa..78b2bdb93e2b 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,5 +1,6 @@ { "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 3, diff --git a/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_Java.json b/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_Java.json index ac06b8aaf7ba..cdc04bcd331a 100644 --- a/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_Java.json @@ -1,4 +1,5 @@ { "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "revision": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_V2.json b/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_V2.json index e328a4f4bba1..ffdd1b908f46 100644 --- a/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_V2.json +++ b/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_V2.json @@ -1,5 +1,6 @@ { "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 2 } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index 85482285d1ae..2d05fc1b5d19 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 2, "https://github.com/apache/beam/pull/34294": "noting that PR #34294 should run this test", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index c695f7cb67b7..24fc17d4c74a 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json index c695f7cb67b7..24fc17d4c74a 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json index 96e098eb7f97..7dab8be7160a 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json index 42959ad85255..7e7462c0b059 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json index 3ce625b167aa..afda4087adf8 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json index 1fd497f4748d..db03186ab405 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json index 6062b83a322d..f0c7c2ae3cfd 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Twister2.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Twister2.json index b970762c8397..2ec5e41ecf4a 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Twister2.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Twister2.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test" } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json index 26d472693709..6e2f429dd24e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 103405a57931..ec90dd7adfbb 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1194,6 +1194,7 @@ class BeamModulePlugin implements Plugin { List skipDefRegexes = [] skipDefRegexes << "AutoValue_.*" + skipDefRegexes << "AutoBuilder_.*" skipDefRegexes << "AutoOneOf_.*" skipDefRegexes << ".*\\.jmh_generated\\..*" skipDefRegexes += configuration.generatedClassPatterns @@ -1287,7 +1288,8 @@ class BeamModulePlugin implements Plugin { '**/org/apache/beam/gradle/**', '**/org/apache/beam/model/**', '**/org/apache/beam/runners/dataflow/worker/windmill/**', - '**/AutoValue_*' + '**/AutoValue_*', + '**/AutoBuilder_*', ] def jacocoEnabled = project.hasProperty('enableJacocoReport') diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle index ea7989873712..9f24ce39b974 100644 --- a/runners/core-java/build.gradle +++ b/runners/core-java/build.gradle @@ -48,6 +48,7 @@ dependencies { implementation library.java.slf4j_api implementation library.java.jackson_core implementation library.java.jackson_databind + implementation library.java.hamcrest testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.junit testImplementation library.java.mockito_core diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java index fbb7b315c3b1..65084120f922 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java @@ -81,7 +81,9 @@ public static Iterable> dropExpiredWindows( if (input == null) { return null; } - return input.explodeWindows(); + // The generics in this chain of calls line up best if we drop the covariance + // in the return value of explodeWindows() + return (Iterable>) input.explodeWindows(); }) .filter( input -> { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index b16dad86df18..9bda4dd2cbca 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.OutputBuilderSuppliers; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; @@ -180,7 +181,8 @@ public TimeDomain timeDomain(DoFn doFn) { @Override public OutputReceiver outputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedReceiver(processContext, null); + return DoFnOutputReceivers.windowedReceiver( + processContext, OutputBuilderSuppliers.supplierForElement(element), null); } @Override @@ -190,7 +192,8 @@ public OutputReceiver outputRowReceiver(DoFn doFn) { @Override public MultiOutputReceiver taggedOutputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedMultiReceiver(processContext, null); + return DoFnOutputReceivers.windowedMultiReceiver( + processContext, OutputBuilderSuppliers.supplierForElement(element)); } @Override @@ -385,12 +388,12 @@ public PaneInfo pane() { @Override public String currentRecordId() { - return element.getCurrentRecordId(); + return element.getRecordId(); } @Override public Long currentRecordOffset() { - return element.getCurrentRecordOffset(); + return element.getRecordOffset(); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 4e10dd471b40..b08bd42b0b22 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -1057,8 +1057,13 @@ private void prefetchOnTrigger( } // Output the actual value. - outputter.output( - WindowedValues.of(KV.of(key, toOutput), outputTimestamp, windows, paneInfo)); + WindowedValues.>builder() + .setValue(KV.of(key, toOutput)) + .setTimestamp(outputTimestamp) + .setWindows(windows) + .setPaneInfo(paneInfo) + .setReceiver(outputter) + .output(); }); reduceFn.onTrigger(renamedTriggerContext); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 217c06c56fe5..3af90ea9a0a1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -51,6 +51,8 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.OutputBuilderSupplier; +import org.apache.beam.sdk.util.OutputBuilderSuppliers; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; @@ -113,7 +115,7 @@ public class SimpleDoFnRunner implements DoFnRunner mainOutputSchemaCoder; - private @Nullable Map, Coder> outputCoders; + private final @Nullable Map, Coder> outputCoders; private final @Nullable DoFnSchemaInformation doFnSchemaInformation; @@ -395,6 +397,8 @@ private class DoFnProcessContext extends DoFn.ProcessContext /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ private @Nullable StateNamespace namespace; + private final OutputBuilderSupplier builderSupplier; + /** * The state namespace for this context. * @@ -412,6 +416,7 @@ private StateNamespace getNamespace() { private DoFnProcessContext(WindowedValue elem) { fn.super(); this.elem = elem; + this.builderSupplier = OutputBuilderSuppliers.supplierForElement(elem); } @Override @@ -494,8 +499,17 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - SimpleDoFnRunner.this.outputWindowedValue( - tag, WindowedValues.of(output, timestamp, windows, paneInfo)); + builderSupplier + .builder(output) + .setTimestamp(timestamp) + .setWindows(windows) + .setPaneInfo(paneInfo) + .setReceiver( + wv -> { + checkTimestamp(elem.getTimestamp(), wv.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, wv); + }) + .output(); } @Override @@ -520,12 +534,12 @@ public Instant timestamp() { @Override public String currentRecordId() { - return elem.getCurrentRecordId(); + return elem.getRecordId(); } @Override public Long currentRecordOffset() { - return elem.getCurrentRecordOffset(); + return elem.getRecordOffset(); } public Collection windows() { @@ -604,17 +618,18 @@ public TimeDomain timeDomain(DoFn doFn) { @Override public OutputReceiver outputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag); + return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag); } @Override public OutputReceiver outputRowReceiver(DoFn doFn) { - return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder); + return DoFnOutputReceivers.rowReceiver( + this, builderSupplier, mainOutputTag, mainOutputSchemaCoder); } @Override public MultiOutputReceiver taggedOutputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders); + return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders); } @Override @@ -710,6 +725,7 @@ private class OnTimerArgumentProvider extends DoFn.OnTime private final TimeDomain timeDomain; private final String timerId; private final KeyT key; + private final OutputBuilderSupplier builderSupplier; /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ private @Nullable StateNamespace namespace; @@ -742,6 +758,13 @@ private OnTimerArgumentProvider( this.timestamp = timestamp; this.timeDomain = timeDomain; this.key = key; + this.builderSupplier = + OutputBuilderSuppliers.supplierForElement( + WindowedValues.builder() + .setValue(null) + .setTimestamp(timestamp) + .setWindow(window) + .setPaneInfo(PaneInfo.NO_FIRING)); } @Override @@ -828,17 +851,19 @@ public TimeDomain timeDomain(DoFn doFn) { @Override public OutputReceiver outputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag); + return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag); } @Override public OutputReceiver outputRowReceiver(DoFn doFn) { - return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder); + return DoFnOutputReceivers.rowReceiver( + this, builderSupplier, mainOutputTag, mainOutputSchemaCoder); } @Override public MultiOutputReceiver taggedOutputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders); + // ... what to doooo 0... + return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders); } @Override @@ -978,8 +1003,14 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo) { checkTimestamp(timestamp(), timestamp); - SimpleDoFnRunner.this.outputWindowedValue( - tag, WindowedValues.of(output, timestamp, windows, paneInfo)); + + builderSupplier + .builder(output) + .setTimestamp(timestamp) + .setWindows(windows) + .setPaneInfo(paneInfo) + .setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv)) + .output(); } @Override @@ -1015,6 +1046,8 @@ private class OnWindowExpirationArgumentProvider private final BoundedWindow window; private final Instant timestamp; private final KeyT key; + private final OutputBuilderSupplier builderSupplier; + /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ private @Nullable StateNamespace namespace; @@ -1037,6 +1070,13 @@ private OnWindowExpirationArgumentProvider(BoundedWindow window, Instant timesta this.window = window; this.timestamp = timestamp; this.key = key; + this.builderSupplier = + OutputBuilderSuppliers.supplierForElement( + WindowedValues.builder() + .setValue(null) + .setWindow(window) + .setTimestamp(timestamp) + .setPaneInfo(PaneInfo.NO_FIRING)); } @Override @@ -1109,17 +1149,18 @@ public KeyT key() { @Override public OutputReceiver outputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag); + return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag); } @Override public OutputReceiver outputRowReceiver(DoFn doFn) { - return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder); + return DoFnOutputReceivers.rowReceiver( + this, builderSupplier, mainOutputTag, mainOutputSchemaCoder); } @Override public MultiOutputReceiver taggedOutputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders); + return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders); } @Override @@ -1241,8 +1282,13 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo) { checkTimestamp(this.timestamp, timestamp); - SimpleDoFnRunner.this.outputWindowedValue( - tag, WindowedValues.of(output, timestamp, windows, paneInfo)); + builderSupplier + .builder(output) + .setTimestamp(timestamp) + .setWindows(windows) + .setPaneInfo(paneInfo) + .setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv)) + .output(); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowMatchers.java similarity index 91% rename from runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java rename to runners/core-java/src/main/java/org/apache/beam/runners/core/WindowMatchers.java index 33ae2f68b48f..463cb9320237 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowMatchers.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Objects; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.WindowedValue; @@ -31,6 +32,9 @@ import org.joda.time.Instant; /** Matchers that are useful for working with Windowing, Timestamps, etc. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) public class WindowMatchers { public static Matcher> isWindowedValue( @@ -99,6 +103,15 @@ public static Matcher> isSingleWindowedValue( Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window)); } + public static Matcher> isSingleWindowedValue( + T value, BoundedWindow window) { + return WindowMatchers.isSingleWindowedValue( + Matchers.equalTo(value), + Matchers.anything(), + Matchers.equalTo(window), + Matchers.anything()); + } + public static Matcher> isSingleWindowedValue( Matcher valueMatcher, long timestamp, long windowStart, long windowEnd) { IntervalWindow intervalWindow = @@ -166,6 +179,15 @@ protected void describeMismatchSafely( }; } + public static Matcher> isValueInGlobalWindow(T value) { + return isSingleWindowedValue(value, GlobalWindow.INSTANCE); + } + + public static Matcher> isValueInGlobalWindow( + T value, Instant timestamp) { + return isSingleWindowedValue(value, timestamp, GlobalWindow.INSTANCE); + } + @SuppressWarnings({"unchecked", "rawtypes"}) @SafeVarargs public static final Matcher> ofWindows( diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java index 9dd8ac502fde..06995a515fcf 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.MatcherAssert.assertThat; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.WindowedValues; @@ -75,4 +76,29 @@ public void testIsWindowedValueReorderedWindows() { new IntervalWindow(new Instant(windowStart2), new Instant(windowEnd2))), PaneInfo.NO_FIRING)); } + + @Test + public void test_IsValueInGlobalWindow_TimestampedValueInGlobalWindow() { + assertThat( + WindowedValues.timestampedValueInGlobalWindow("foo", new Instant(7)), + WindowMatchers.isValueInGlobalWindow("foo", new Instant(7))); + + assertThat( + WindowedValues.timestampedValueInGlobalWindow("foo", BoundedWindow.TIMESTAMP_MIN_VALUE), + WindowMatchers.isValueInGlobalWindow("foo", BoundedWindow.TIMESTAMP_MIN_VALUE)); + + assertThat( + WindowedValues.timestampedValueInGlobalWindow("foo", BoundedWindow.TIMESTAMP_MIN_VALUE), + WindowMatchers.isValueInGlobalWindow("foo")); + } + + @Test + public void test_IsValueInGlobalWindow_ValueInGlobalWindow() { + assertThat( + WindowedValues.valueInGlobalWindow("foo"), WindowMatchers.isValueInGlobalWindow("foo")); + + assertThat( + WindowedValues.valueInGlobalWindow("foo"), + WindowMatchers.isValueInGlobalWindow("foo", BoundedWindow.TIMESTAMP_MIN_VALUE)); + } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 0e011aa5cd9b..c6726fb3463f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -246,8 +246,8 @@ private BundleWindowedValueReceiver(UncommittedBundle>> bundle } @Override - public void output(WindowedValue>> valueWithMetadata) { - bundle.add(valueWithMetadata); + public void output(WindowedValue>> windowedValue) { + bundle.add(windowedValue); } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index 27de46bf102b..2724312c99a7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -90,9 +90,7 @@ public WindowIntoEvaluator( public void processElement(WindowedValue compressedElement) throws Exception { for (WindowedValue element : compressedElement.explodeWindows()) { Collection windows = assignWindows(windowFn, element); - outputBundle.add( - WindowedValues.of( - element.getValue(), element.getTimestamp(), windows, element.getPaneInfo())); + WindowedValues.builder(element).setWindows(windows).setReceiver(outputBundle::add).output(); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 19ccdb76af58..79a90c554027 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -1403,20 +1403,16 @@ private SourceContextWrapper(SourceContext> ctx) { @Override public void collect(WindowedValue> element) { OutputT originalValue = element.getValue().getValue(); - WindowedValue output = - WindowedValues.of( - originalValue, element.getTimestamp(), element.getWindows(), element.getPaneInfo()); - ctx.collect(output); + WindowedValues.builder(element).withValue(originalValue).setReceiver(ctx::collect).output(); } @Override public void collectWithTimestamp( WindowedValue> element, long timestamp) { OutputT originalValue = element.getValue().getValue(); - WindowedValue output = - WindowedValues.of( - originalValue, element.getTimestamp(), element.getWindows(), element.getPaneInfo()); - ctx.collectWithTimestamp(output, timestamp); + WindowedValues.builder(element) + .withValue(originalValue) + .setReceiver(wv -> ctx.collectWithTimestamp(wv, timestamp)); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index a707e366c8a5..882e7dfe46b1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -223,12 +223,10 @@ public void setCollector(Collector> collector) { @Override public void output(TupleTag tag, WindowedValue output) { checkStateNotNull(collector); - collector.collect( - WindowedValues.of( - new RawUnionValue(0 /* single output */, output.getValue()), - output.getTimestamp(), - output.getWindows(), - output.getPaneInfo())); + WindowedValues.builder(output) + .withValue(new RawUnionValue(0 /* single output */, output.getValue())) + .setReceiver(collector::collect) + .output(); } } @@ -257,13 +255,10 @@ public void setCollector(Collector> collector) { @Override public void output(TupleTag tag, WindowedValue output) { checkStateNotNull(collector); - - collector.collect( - WindowedValues.of( - new RawUnionValue(outputMap.get(tag), output.getValue()), - output.getTimestamp(), - output.getWindows(), - output.getPaneInfo())); + WindowedValues.builder(output) + .withValue(new RawUnionValue(outputMap.get(tag), output.getValue())) + .setReceiver(collector::collect) + .output(); } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java index bcc5a244d3b1..38c6ad27cf12 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java @@ -101,11 +101,11 @@ public void reduce( (WindowedValue> wv) -> Objects.requireNonNull(wv).getValue().getValue())); } - coll.collect( - WindowedValues.of( - KV.of(first.getValue().getKey(), values), - combinedTimestamp, - first.getWindows(), - PaneInfo.ON_TIME_AND_ONLY_FIRING)); + WindowedValues.builder(first) + .withValue(KV.of(first.getValue().getKey(), values)) + .setReceiver(coll::collect) + .setPaneInfo(PaneInfo.ON_TIME_AND_ONLY_FIRING) + .setTimestamp(combinedTimestamp) + .output(); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 15627534411c..10b41bb5b5ba 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1394,12 +1394,12 @@ public PaneInfo getPaneInfo() { } @Override - public @Nullable String getCurrentRecordId() { + public @Nullable String getRecordId() { return null; } @Override - public @Nullable Long getCurrentRecordOffset() { + public @Nullable Long getRecordOffset() { return null; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java index cea195ed2013..4375cc5adcfe 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.dataflow; -import java.util.Collections; import org.apache.beam.runners.dataflow.internal.DataflowGroupByKey; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -134,12 +133,14 @@ public Duration getAllowedTimestampSkew() { @ProcessElement public void processElement( - @Element KV> kv, OutputReceiver> r) { - r.outputWindowedValue( - KV.of(kv.getKey(), kv.getValue().getValue()), - kv.getValue().getTimestamp(), - Collections.singleton(kv.getValue().getWindow()), - kv.getValue().getPaneInfo()); + @Element KV> kv, + OutputReceiver> outputReceiver) { + outputReceiver + .builder(KV.of(kv.getKey(), kv.getValue().getValue())) + .setTimestamp(kv.getValue().getTimestamp()) + .setWindow(kv.getValue().getWindow()) + .setPaneInfo(kv.getValue().getPaneInfo()) + .output(); } })); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java index d45e1f3a4e46..83cbc3aa62c7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java @@ -111,9 +111,7 @@ public BoundedWindow window() { } }); - WindowedValue res = - WindowedValues.of(elem.getValue(), elem.getTimestamp(), windows, elem.getPaneInfo()); - receiver.process(res); + WindowedValues.builder(elem).setWindows(windows).setReceiver(receiver::process).output(); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java index 05f537948072..399258d7dbb9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java @@ -243,8 +243,12 @@ public WindowingCoderGroupingKeyCreator(Coder coder) { public Object createGroupingKey(WindowedValue key) throws Exception { // Ignore timestamp for grouping purposes. // The PGBK output will inherit the timestamp of one of its inputs. - return WindowedValues.of( - coder.structuralValue(key.getValue()), ignored, key.getWindows(), key.getPaneInfo()); + return WindowedValues.builder(key) + .withValue(coder.structuralValue(key.getValue())) + .setTimestamp(ignored) + .setWindows(key.getWindows()) + .setPaneInfo(key.getPaneInfo()) + .build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java index 31d846d1102d..248ed34e8c40 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java @@ -70,18 +70,17 @@ public void startBundle(Receiver... receivers) throws Exception { public void processElement(Object untypedElem) throws Exception { WindowedValue> typedElem = (WindowedValue>) untypedElem; - receiver.process( - WindowedValues.of( + WindowedValues.builder(typedElem) + .withValue( KV.of( typedElem.getValue().getKey(), WindowedValues.of( typedElem.getValue().getValue(), typedElem.getTimestamp(), typedElem.getWindows(), - typedElem.getPaneInfo())), - typedElem.getTimestamp(), - typedElem.getWindows(), - typedElem.getPaneInfo())); + typedElem.getPaneInfo()))) + .setReceiver(receiver::process) + .output(); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index 0cbff31c8de2..7cb6f2223472 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -255,8 +255,8 @@ public long add(WindowedValue data) throws IOException { } byte[] rawId = null; - if (data.getCurrentRecordId() != null) { - rawId = data.getCurrentRecordId().getBytes(StandardCharsets.UTF_8); + if (data.getRecordId() != null) { + rawId = data.getRecordId().getBytes(StandardCharsets.UTF_8); } else { rawId = context.getCurrentRecordId(); } @@ -267,8 +267,8 @@ public long add(WindowedValue data) throws IOException { id = ByteString.copyFrom(rawId); byte[] rawOffset = null; - if (data.getCurrentRecordOffset() != null) { - rawOffset = Longs.toByteArray(data.getCurrentRecordOffset()); + if (data.getRecordOffset() != null) { + rawOffset = Longs.toByteArray(data.getRecordOffset()); } else { rawOffset = context.getCurrentRecordOffset(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowAndCombineFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowAndCombineFn.java index 1a66f4484292..c028ed4c58d7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowAndCombineFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowAndCombineFn.java @@ -190,7 +190,8 @@ private void closeWindow( W window, Map accumulators, Map accumulatorOutputTimes, - WindowedValueReceiver> output) { + WindowedValueReceiver> output) + throws Exception { AccumT accum = accumulators.remove(window); Instant timestamp = accumulatorOutputTimes.remove(window); checkState(accum != null && timestamp != null); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 1119617a068e..a51c9ed419e1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -50,12 +50,12 @@ public PaneInfo getPaneInfo() { } @Override - public @Nullable String getCurrentRecordId() { + public @Nullable String getRecordId() { return null; } @Override - public @Nullable Long getCurrentRecordOffset() { + public @Nullable Long getRecordOffset() { return null; } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/GroupAlsoByWindowProperties.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/GroupAlsoByWindowProperties.java index 06206de92e49..aec6b474e7d5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/GroupAlsoByWindowProperties.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/GroupAlsoByWindowProperties.java @@ -610,8 +610,8 @@ private static class TestOutput implements WindowedValueReceiver>> output = new ArrayList<>(); @Override - public void output(WindowedValue> valueWithMetadata) { - this.output.add(valueWithMetadata); + public void output(WindowedValue> windowedValue) { + this.output.add(windowedValue); } public List>> getOutput() { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java index d03914a256ca..03735355de51 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java @@ -111,12 +111,12 @@ public PaneInfo getPaneInfo() { } @Override - public @Nullable String getCurrentRecordId() { + public @Nullable String getRecordId() { return null; } @Override - public @Nullable Long getCurrentRecordOffset() { + public @Nullable Long getRecordOffset() { return null; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java new file mode 100644 index 000000000000..83d2af7b66bb --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.OutputBuilder; +import org.apache.beam.sdk.values.WindowedValues; +import org.joda.time.Instant; + +/** + * An implementation of {@link DoFn.OutputReceiver} that naively collects all output values. + * + *

Because this API is crude and not designed to be very general, it is for internal use only and + * will be changed arbitrarily. + */ +@Internal +public class TestOutputReceiver implements DoFn.OutputReceiver { + private final List records = new ArrayList<>(); + + // To simplify testing of a DoFn, we want to be able to collect their outputs even + // when no window is provided (because processElement is called with only a value in testing). + private static final BoundedWindow fakeWindow = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return BoundedWindow.TIMESTAMP_MIN_VALUE; + } + }; + + @Override + public OutputBuilder builder(T value) { + return WindowedValues.builder() + .setValue(value) + .setWindow(fakeWindow) + .setPaneInfo(PaneInfo.NO_FIRING) + .setTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE) + .setReceiver(windowedValue -> records.add(windowedValue.getValue())); + } + + public List getOutputs() { + return records; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index 88e3780384ff..a2f32b8b3dd3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -913,12 +913,14 @@ private WindowedValues( private static class ConvertWindowedValues extends DoFn, T> { @ProcessElement - public void processElement(@Element WindowedValue element, OutputReceiver r) { - r.outputWindowedValue( - element.getValue(), - element.getTimestamp(), - element.getWindows(), - element.getPaneInfo()); + public void processElement( + @Element WindowedValue element, OutputReceiver outputReceiver) { + outputReceiver + .builder(element.getValue()) + .setTimestamp(element.getTimestamp()) + .setWindows(element.getWindows()) + .setPaneInfo(element.getPaneInfo()) + .output(); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 10904b2aa393..d0714de60328 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -428,17 +429,22 @@ public TypeDescriptor getOutputTypeDescriptor() { /** Receives values of the given type. */ public interface OutputReceiver { - void output(T output); + OutputBuilder builder(T value); - void outputWithTimestamp(T output, Instant timestamp); + default void output(T value) { + builder(value).output(); + } + + default void outputWithTimestamp(T value, Instant timestamp) { + builder(value).setTimestamp(timestamp).output(); + } default void outputWindowedValue( - T output, + T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { - throw new UnsupportedOperationException( - String.format("Not implemented: %s.outputWindowedValue", this.getClass().getName())); + builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index 2c8f7468ebb6..fee19810c15c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -21,117 +21,146 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import java.util.Collection; import java.util.Map; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.OutputBuilderSupplier; +import org.apache.beam.sdk.util.WindowedValueReceiver; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Instant; /** Common {@link OutputReceiver} and {@link MultiOutputReceiver} classes. */ @Internal public class DoFnOutputReceivers { + private static class RowOutputReceiver implements OutputReceiver { - WindowedContextOutputReceiver outputReceiver; + private final @Nullable TupleTag tag; + private final DoFn.WindowedContext context; + private final OutputBuilderSupplier builderSupplier; SchemaCoder schemaCoder; - public RowOutputReceiver( + private RowOutputReceiver( DoFn.WindowedContext context, + OutputBuilderSupplier builderSupplier, @Nullable TupleTag outputTag, SchemaCoder schemaCoder) { - outputReceiver = new WindowedContextOutputReceiver<>(context, outputTag); - this.schemaCoder = checkNotNull(schemaCoder); - } - - @Override - public void output(Row output) { - outputReceiver.output(schemaCoder.getFromRowFunction().apply(output)); + this.context = context; + this.builderSupplier = builderSupplier; + this.tag = outputTag; + this.schemaCoder = schemaCoder; } @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - outputReceiver.outputWithTimestamp(schemaCoder.getFromRowFunction().apply(output), timestamp); - } + public OutputBuilder builder(Row value) { + // assigning to final variable allows static analysis to know it + // will not change between now and when receiver is invoked + final TupleTag tag = this.tag; + if (tag == null) { + return builderSupplier + .builder(value) + .setValue(value) + .setReceiver( + rowWithMetadata -> { + ((DoFn.WindowedContext) context) + .outputWindowedValue( + schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()), + rowWithMetadata.getTimestamp(), + rowWithMetadata.getWindows(), + rowWithMetadata.getPaneInfo()); + }); - @Override - public void outputWindowedValue( - Row output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - outputReceiver.outputWindowedValue( - schemaCoder.getFromRowFunction().apply(output), timestamp, windows, paneInfo); + } else { + checkStateNotNull(tag); + return builderSupplier + .builder(value) + .setReceiver( + rowWithMetadata -> { + context.outputWindowedValue( + tag, + schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()), + rowWithMetadata.getTimestamp(), + rowWithMetadata.getWindows(), + rowWithMetadata.getPaneInfo()); + }); + } } } - private static class WindowedContextOutputReceiver implements OutputReceiver { + /** + * OutputReceiver that delegates all its core functionality to DoFn.WindowedContext which predates + * OutputReceiver and has most of the same methods. + */ + private static class WindowedContextOutputReceiver + implements OutputReceiver, WindowedValueReceiver { + private final OutputBuilderSupplier builderSupplier; DoFn.WindowedContext context; @Nullable TupleTag outputTag; public WindowedContextOutputReceiver( - DoFn.WindowedContext context, @Nullable TupleTag outputTag) { + DoFn.WindowedContext context, + OutputBuilderSupplier builderSupplier, + @Nullable TupleTag outputTag) { this.context = context; + this.builderSupplier = builderSupplier; this.outputTag = outputTag; } @Override - public void output(T output) { - if (outputTag != null) { - context.output(outputTag, output); - } else { - ((DoFn.WindowedContext) context).output(output); - } - } - - @Override - public void outputWithTimestamp(T output, Instant timestamp) { - if (outputTag != null) { - context.outputWithTimestamp(outputTag, output, timestamp); - } else { - ((DoFn.WindowedContext) context).outputWithTimestamp(output, timestamp); - } + public OutputBuilder builder(T value) { + return WindowedValues.builder(builderSupplier.builder(value)).setReceiver(this); } @Override - public void outputWindowedValue( - T output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { + public void output(WindowedValue windowedValue) { if (outputTag != null) { - context.outputWindowedValue(outputTag, output, timestamp, windows, paneInfo); + context.outputWindowedValue( + outputTag, + windowedValue.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo()); } else { ((DoFn.WindowedContext) context) - .outputWindowedValue(output, timestamp, windows, paneInfo); + .outputWindowedValue( + windowedValue.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo()); } } } private static class WindowedContextMultiOutputReceiver implements MultiOutputReceiver { - DoFn.WindowedContext context; + private final OutputBuilderSupplier builderSupplier; + private final DoFn.WindowedContext context; @Nullable Map, Coder> outputCoders; public WindowedContextMultiOutputReceiver( - DoFn.WindowedContext context, @Nullable Map, Coder> outputCoders) { + DoFn.WindowedContext context, + OutputBuilderSupplier builderSupplier, + @Nullable Map, Coder> outputCoders) { this.context = context; + this.builderSupplier = builderSupplier; this.outputCoders = outputCoders; } // This exists for backwards compatibility with the Dataflow runner, and will be removed. - public WindowedContextMultiOutputReceiver(DoFn.WindowedContext context) { + public WindowedContextMultiOutputReceiver( + DoFn.WindowedContext context, OutputBuilderSupplier builderSupplier) { this.context = context; + this.builderSupplier = builderSupplier; } @Override public OutputReceiver get(TupleTag tag) { - return DoFnOutputReceivers.windowedReceiver(context, tag); + return DoFnOutputReceivers.windowedReceiver(context, builderSupplier, tag); } @Override @@ -141,20 +170,25 @@ public OutputReceiver getRowReceiver(TupleTag tag) { checkState( outputCoder instanceof SchemaCoder, "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); - return DoFnOutputReceivers.rowReceiver(context, tag, (SchemaCoder) outputCoder); + return DoFnOutputReceivers.rowReceiver( + context, builderSupplier, tag, (SchemaCoder) outputCoder); } } /** Returns a {@link OutputReceiver} that delegates to a {@link DoFn.WindowedContext}. */ public static OutputReceiver windowedReceiver( - DoFn.WindowedContext context, @Nullable TupleTag outputTag) { - return new WindowedContextOutputReceiver<>(context, outputTag); + DoFn.WindowedContext context, + OutputBuilderSupplier builderSupplier, + @Nullable TupleTag outputTag) { + return new WindowedContextOutputReceiver<>(context, builderSupplier, outputTag); } /** Returns a {@link MultiOutputReceiver} that delegates to a {@link DoFn.WindowedContext}. */ public static MultiOutputReceiver windowedMultiReceiver( - DoFn.WindowedContext context, @Nullable Map, Coder> outputCoders) { - return new WindowedContextMultiOutputReceiver(context, outputCoders); + DoFn.WindowedContext context, + OutputBuilderSupplier builderSupplier, + @Nullable Map, Coder> outputCoders) { + return new WindowedContextMultiOutputReceiver(context, builderSupplier, outputCoders); } /** @@ -162,8 +196,9 @@ public static MultiOutputReceiver windowedMultiReceiver( * *

This exists for backwards-compatibility with the Dataflow runner, and will be removed. */ - public static MultiOutputReceiver windowedMultiReceiver(DoFn.WindowedContext context) { - return new WindowedContextMultiOutputReceiver(context); + public static MultiOutputReceiver windowedMultiReceiver( + DoFn.WindowedContext context, OutputBuilderSupplier builderSupplier) { + return new WindowedContextMultiOutputReceiver(context, builderSupplier); } /** @@ -172,8 +207,9 @@ public static MultiOutputReceiver windowedMultiReceiver(DoFn.WindowedConte */ public static OutputReceiver rowReceiver( DoFn.WindowedContext context, + OutputBuilderSupplier builderSupplier, @Nullable TupleTag outputTag, SchemaCoder schemaCoder) { - return new RowOutputReceiver<>(context, outputTag, schemaCoder); + return new RowOutputReceiver<>(context, builderSupplier, outputTag, schemaCoder); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index f4670a4d0e94..c59d6b528c3f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -47,12 +47,16 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.OutputBuilderSupplier; +import org.apache.beam.sdk.util.OutputBuilderSuppliers; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; @@ -211,9 +215,14 @@ public void processWindowedElement(InputT element, Instant timestamp, final Boun startBundle(); } try { + ValueInSingleWindow templateElement = + ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING); + WindowedValue templateWv = + WindowedValues.of(element, timestamp, window, PaneInfo.NO_FIRING); final DoFn.ProcessContext processContext = - createProcessContext( - ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING, null, null)); + createProcessContext(templateElement); + final OutputBuilderSupplier builderSupplier = + OutputBuilderSuppliers.supplierForElement(templateWv); fnInvoker.invokeProcessElement( new DoFnInvoker.BaseArgumentProvider() { @@ -286,12 +295,13 @@ public TimeDomain timeDomain(DoFn doFn) { @Override public OutputReceiver outputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedReceiver(processContext, null); + return DoFnOutputReceivers.windowedReceiver(processContext, builderSupplier, null); } @Override public MultiOutputReceiver taggedOutputReceiver(DoFn doFn) { - return DoFnOutputReceivers.windowedMultiReceiver(processContext, null); + return DoFnOutputReceivers.windowedMultiReceiver( + processContext, builderSupplier, null); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java index ea55cbd88b36..a01b5f570a57 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import com.google.auto.service.AutoService; -import java.util.Collections; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -178,12 +177,14 @@ public Duration getAllowedTimestampSkew() { @ProcessElement public void processElement( - @Element KV> kv, OutputReceiver> r) { - r.outputWindowedValue( - KV.of(kv.getKey(), kv.getValue().getValue()), - kv.getValue().getTimestamp(), - Collections.singleton(kv.getValue().getWindow()), - kv.getValue().getPaneInfo()); + @Element KV> kv, + OutputReceiver> outputReceiver) { + outputReceiver + .builder(KV.of(kv.getKey(), kv.getValue().getValue())) + .setTimestamp(kv.getValue().getTimestamp()) + .setWindow(kv.getValue().getWindow()) + .setPaneInfo(kv.getValue().getPaneInfo()) + .output(); } })); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 2a301d0480c0..b2de48342d7c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -183,12 +182,14 @@ public Duration getAllowedTimestampSkew() { @ProcessElement public void processElement( - @Element KV> kv, OutputReceiver> r) { - r.outputWindowedValue( - KV.of(kv.getKey(), kv.getValue().getValue()), - kv.getValue().getTimestamp(), - Collections.singleton(kv.getValue().getWindow()), - kv.getValue().getPaneInfo()); + @Element KV> kv, + OutputReceiver> outputReceiver) { + outputReceiver + .builder(KV.of(kv.getKey(), kv.getValue().getValue())) + .setTimestamp(kv.getValue().getTimestamp()) + .setWindow(kv.getValue().getWindow()) + .setPaneInfo(kv.getValue().getPaneInfo()) + .output(); } })); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputBuilderSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputBuilderSupplier.java new file mode 100644 index 000000000000..cee7fc5f607d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputBuilderSupplier.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.values.WindowedValues; + +@Internal +@FunctionalInterface +public interface OutputBuilderSupplier { + // Returns WindowedValues.Builder so that downstream can setReceiver (when tag is specified) + // but we need the value at a minimum in order to fix the type variable + WindowedValues.Builder builder(OutputT value); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputBuilderSuppliers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputBuilderSuppliers.java new file mode 100644 index 000000000000..e766982e295b --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OutputBuilderSuppliers.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; + +/** Implementations of {@link OutputBuilderSupplier}. */ +@Internal +public class OutputBuilderSuppliers { + private OutputBuilderSuppliers() {} + + public static OutputBuilderSupplier supplierForElement(WindowedValue templateValue) { + return new OutputBuilderSupplier() { + @Override + public WindowedValues.Builder builder(T value) { + return WindowedValues.builder(templateValue).withValue(value); + } + }; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueReceiver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueReceiver.java index 8c5b2434ae5a..a6c11d5a2798 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueReceiver.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueReceiver.java @@ -25,5 +25,5 @@ @FunctionalInterface public interface WindowedValueReceiver { /** Outputs a value with windowing information. */ - void output(WindowedValue output); + void output(WindowedValue output) throws Exception; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDo.java index 8dd19528db4e..74af80d6feee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDo.java @@ -60,12 +60,14 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.NameUtils; +import org.apache.beam.sdk.util.OutputBuilderSupplier; import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.util.construction.ParDoTranslation.ParDoLike; import org.apache.beam.sdk.util.construction.ParDoTranslation.ParDoLikeTimerFamilySpecs; import org.apache.beam.sdk.util.construction.ReadTranslation.BoundedReadPayloadTranslator; import org.apache.beam.sdk.util.construction.ReadTranslation.UnboundedReadPayloadTranslator; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -74,6 +76,7 @@ import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -609,7 +612,19 @@ public void setup(PipelineOptions options) { } @ProcessElement - public void processElement(final ProcessContext c, BoundedWindow w) { + public void processElement( + final ProcessContext c, + BoundedWindow w, + OutputReceiver> outputReceiver) { + + OutputBuilderSupplier outputBuilderSupplier = + new OutputBuilderSupplier() { + @Override + public WindowedValues.Builder builder(OutputT value) { + return WindowedValues.builder(outputReceiver.builder(null)).withValue(value); + } + }; + invoker.invokeSplitRestriction( (ArgumentProvider) new BaseArgumentProvider() { @@ -662,13 +677,16 @@ public OutputReceiver outputReceiver( DoFn doFn) { return new OutputReceiver() { @Override - public void output(RestrictionT part) { - c.output(KV.of(c.element().getKey(), part)); - } - - @Override - public void outputWithTimestamp(RestrictionT part, Instant timestamp) { - throw new UnsupportedOperationException(); + public OutputBuilder builder(RestrictionT restriction) { + // technically the windows and other aspects should not actually matter on a + // restriction, + // but it is better to propagate them and leave the checks in place than not + // to + return outputBuilderSupplier + .builder(restriction) + .setReceiver( + windowedValue -> + c.output(KV.of(c.element().getKey(), windowedValue.getValue()))); } }; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index d462d422446c..e6394b8810a4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -46,13 +46,16 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.OutputBuilderSupplier; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -188,7 +191,7 @@ public String getErrorContext() { } @ProcessElement - public void process(ProcessContext c, BoundedWindow w) { + public void process(ProcessContext c, BoundedWindow w, OutputReceiver outputReceiver) { WatermarkEstimatorStateT initialWatermarkEstimatorState = (WatermarkEstimatorStateT) invoker.invokeGetInitialWatermarkEstimatorState( @@ -356,10 +359,26 @@ public String getErrorContext() { return NaiveProcessFn.class.getSimpleName() + ".invokeNewWatermarkEstimator"; } }); + + OutputBuilderSupplier outputBuilderSupplier = + new OutputBuilderSupplier() { + @Override + public WindowedValues.Builder builder(X value) { + return WindowedValues.builder(outputReceiver.builder(null)).withValue(value); + } + }; + ProcessContinuation continuation = invoker.invokeProcessElement( new NestedProcessContext<>( - fn, c, c.element().getKey(), w, tracker, watermarkEstimator, sideInputMapping)); + fn, + c, + outputBuilderSupplier, + c.element().getKey(), + w, + tracker, + watermarkEstimator, + sideInputMapping)); if (continuation.shouldResume()) { // Fetch the watermark before splitting to ensure that the watermark applies to both // the primary and the residual. @@ -461,10 +480,12 @@ private static class NestedProcessContext< private final TrackerT tracker; private final WatermarkEstimatorT watermarkEstimator; private final Map> sideInputMapping; + private final OutputBuilderSupplier outputBuilderSupplier; private NestedProcessContext( DoFn fn, DoFn, OutputT>.ProcessContext outerContext, + OutputBuilderSupplier outputBuilderSupplier, InputT element, BoundedWindow window, TrackerT tracker, @@ -472,6 +493,7 @@ private NestedProcessContext( Map> sideInputMapping) { fn.super(); this.window = window; + this.outputBuilderSupplier = outputBuilderSupplier; this.outerContext = outerContext; this.element = element; this.tracker = tracker; @@ -547,22 +569,16 @@ public String timerId(DoFn doFn) { public OutputReceiver outputReceiver(DoFn doFn) { return new OutputReceiver() { @Override - public void output(OutputT output) { - outerContext.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - outerContext.outputWithTimestamp(output, timestamp); - } - - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - outerContext.outputWindowedValue(output, timestamp, windows, paneInfo); + public OutputBuilder builder(OutputT value) { + return outputBuilderSupplier + .builder(value) + .setReceiver( + windowedValue -> + outerContext.outputWindowedValue( + windowedValue.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; } @@ -574,22 +590,17 @@ public MultiOutputReceiver taggedOutputReceiver(DoFn doFn) { public OutputReceiver get(TupleTag tag) { return new OutputReceiver() { @Override - public void output(T output) { - outerContext.output(tag, output); - } - - @Override - public void outputWithTimestamp(T output, Instant timestamp) { - outerContext.outputWithTimestamp(tag, output, timestamp); - } - - @Override - public void outputWindowedValue( - T output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + public OutputBuilder builder(T value) { + return outputBuilderSupplier + .builder(value) + .setReceiver( + windowedValue -> + outerContext.outputWindowedValue( + tag, + windowedValue.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java new file mode 100644 index 000000000000..a7f8bc8e03b1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import java.util.Collection; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +/** + * A builder for an output, to set all the fields and extended metadata of a Beam value. + * + *

Which fields are required or allowed to be set depends on the context of the builder. + * + *

It is allowed to modify an instance and then call {@link #output()} again. + * + *

Not intended to be implemented by Beam users. This interface will be expanded in ways that are + * backwards-incompatible, by requiring implementors to add methods. + */ +public interface OutputBuilder extends WindowedValue { + OutputBuilder setValue(T value); + + OutputBuilder setTimestamp(Instant timestamp); + + OutputBuilder setWindow(BoundedWindow window); + + OutputBuilder setWindows(Collection windows); + + OutputBuilder setPaneInfo(PaneInfo paneInfo); + + OutputBuilder setRecordId(@Nullable String recordId); + + OutputBuilder setRecordOffset(@Nullable Long recordOffset); + + void output(); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java index 0512be524b91..ea6be129ecb4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java @@ -47,17 +47,17 @@ public interface WindowedValue { PaneInfo getPaneInfo(); @Nullable - String getCurrentRecordId(); + String getRecordId(); @Nullable - Long getCurrentRecordOffset(); + Long getRecordOffset(); /** * A representation of each of the actual values represented by this compressed {@link * WindowedValue}, one per window. */ @Pure - Iterable> explodeWindows(); + Iterable> explodeWindows(); /** * A {@link WindowedValue} with identical metadata to the current one, but with the provided diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index 4bbab33a8936..9b079b8699b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -17,8 +17,10 @@ */ package org.apache.beam.sdk.values; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -45,14 +47,17 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.dataflow.qual.Pure; import org.joda.time.Instant; /** - * Implementations of {@link WindowedValue} and static utility methods. + * Implementations of {@link org.apache.beam.sdk.values.WindowedValue} and static utility methods. * *

These are primarily intended for internal use by Beam SDK developers and runner developers. * Backwards incompatible changes will likely occur. @@ -61,6 +66,172 @@ public class WindowedValues { private WindowedValues() {} // non-instantiable utility class + public static Builder builder() { + return new Builder<>(); + } + + /** Create a Builder that takes element metadata from the provideed delegate. */ + public static Builder builder(WindowedValue template) { + return new Builder() + .setValue(template.getValue()) + .setTimestamp(template.getTimestamp()) + .setWindows(template.getWindows()) + .setPaneInfo(template.getPaneInfo()); + } + + public static class Builder implements OutputBuilder { + + // Because T itself can be nullable, checking `maybeValue == null` cannot determine if it is set + // or + // not. + // + // Note also that JDK Optional class is written in such a way that it cannot have a nullable + // type + // for T (rendering it largely useless for its one reason for existing - composable + // presence/absence). + private @Nullable T maybeValue; + private boolean hasValue = false; + + private @MonotonicNonNull WindowedValueReceiver receiver; + private @MonotonicNonNull PaneInfo paneInfo; + private @MonotonicNonNull Instant timestamp; + private @MonotonicNonNull Collection windows; + private @Nullable String recordId; + private @Nullable Long recordOffset; + + @Override + public Builder setValue(T value) { + this.hasValue = true; + this.maybeValue = value; + return this; + } + + @Override + public Builder setTimestamp(Instant timestamp) { + this.timestamp = timestamp; + return this; + } + + @Override + public Builder setWindows(Collection windows) { + this.windows = windows; + return this; + } + + @Override + public Builder setPaneInfo(PaneInfo paneInfo) { + this.paneInfo = paneInfo; + return this; + } + + @Override + public Builder setWindow(BoundedWindow window) { + return setWindows(Collections.singleton(window)); + } + + @Override + public Builder setRecordId(@Nullable String recordId) { + this.recordId = recordId; + return this; + } + + @Override + public Builder setRecordOffset(@Nullable Long recordOffset) { + this.recordOffset = recordOffset; + return this; + } + + public Builder setReceiver(WindowedValueReceiver receiver) { + this.receiver = receiver; + return this; + } + + @Override + public T getValue() { + // If T is itself a nullable type, then this checkState ensures it is set, whether or not it + // is null. + // If T is a non-nullable type, this checkState ensures it is not null. + checkState(hasValue, "Value not set"); + return getValueIgnoringNullness(); + } + + // This method is a way to @Nullable T to polymorphic-in-nullness T + @SuppressWarnings("nullness") + T getValueIgnoringNullness() { + return maybeValue; + } + + @Override + public Instant getTimestamp() { + checkStateNotNull(timestamp, "Timestamp not set"); + return timestamp; + } + + @Override + public Collection getWindows() { + checkStateNotNull(windows, "Windows not set"); + return windows; + } + + @Override + public PaneInfo getPaneInfo() { + checkStateNotNull(paneInfo, "PaneInfo not set"); + return paneInfo; + } + + @Override + public @Nullable String getRecordId() { + return recordId; + } + + @Override + public @Nullable Long getRecordOffset() { + return recordOffset; + } + + @Override + public Collection> explodeWindows() { + throw new UnsupportedOperationException( + "Cannot explodeWindows() on WindowedValue builder; use build().explodeWindows()"); + } + + @Override + @Pure + public Builder withValue(OtherT newValue) { + // because of erasure, this type system lie is safe + return ((Builder) builder(this)).setValue(newValue); + } + + @Override + public void output() { + try { + checkStateNotNull(receiver, "A WindowedValueReceiver must be set via setReceiver()") + .output(build()); + } catch (Exception exc) { + if (exc instanceof RuntimeException) { + throw (RuntimeException) exc; + } else { + throw new RuntimeException("Exception thrown when outputting WindowedValue", exc); + } + } + } + + public WindowedValue build() { + return WindowedValues.of(getValue(), getTimestamp(), getWindows(), getPaneInfo()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("value", getValue()) + .add("timestamp", getTimestamp()) + .add("windows", getWindows()) + .add("paneInfo", getPaneInfo()) + .add("receiver", receiver) + .toString(); + } + } + public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { return of(value, timestamp, windows, paneInfo, null, null); @@ -164,8 +335,8 @@ public static WindowedValue withValue( windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPaneInfo(), - windowedValue.getCurrentRecordId(), - windowedValue.getCurrentRecordOffset()); + windowedValue.getRecordId(), + windowedValue.getRecordOffset()); } public static boolean equals( @@ -218,12 +389,12 @@ private abstract static class SimpleWindowedValue implements WindowedValue private final @Nullable Long currentRecordOffset; @Override - public @Nullable String getCurrentRecordId() { + public @Nullable String getRecordId() { return currentRecordId; } @Override - public @Nullable Long getCurrentRecordOffset() { + public @Nullable Long getRecordOffset() { return currentRecordOffset; } @@ -260,6 +431,20 @@ public Iterable> explodeWindows() { } return windowedValues.build(); } + + @Override + public boolean equals(@Nullable Object other) { + if (!(other instanceof WindowedValue)) { + return false; + } + + return WindowedValues.equals(this, (WindowedValue) other); + } + + @Override + public int hashCode() { + return WindowedValues.hashCode(this); + } } /** The abstract superclass of WindowedValue representations where timestamp == MIN. */ @@ -303,8 +488,7 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { - return new ValueInGlobalWindow<>( - newValue, getPaneInfo(), getCurrentRecordId(), getCurrentRecordOffset()); + return new ValueInGlobalWindow<>(newValue, getPaneInfo(), getRecordId(), getRecordOffset()); } @Override @@ -381,7 +565,7 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { return new TimestampedValueInGlobalWindow<>( - newValue, getTimestamp(), getPaneInfo(), getCurrentRecordId(), getCurrentRecordOffset()); + newValue, getTimestamp(), getPaneInfo(), getRecordId(), getRecordOffset()); } @Override @@ -438,12 +622,7 @@ public TimestampedValueInSingleWindow( @Override public WindowedValue withValue(NewT newValue) { return new TimestampedValueInSingleWindow<>( - newValue, - getTimestamp(), - window, - getPaneInfo(), - getCurrentRecordId(), - getCurrentRecordOffset()); + newValue, getTimestamp(), window, getPaneInfo(), getRecordId(), getRecordOffset()); } @Override @@ -512,12 +691,7 @@ public Collection getWindows() { @Override public WindowedValue withValue(NewT newValue) { return new TimestampedValueInMultipleWindows<>( - newValue, - getTimestamp(), - getWindows(), - getPaneInfo(), - getCurrentRecordId(), - getCurrentRecordOffset()); + newValue, getTimestamp(), getWindows(), getPaneInfo(), getRecordId(), getRecordOffset()); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index c25677ef98ac..299c5d5c5906 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; @@ -41,7 +40,6 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; @@ -78,6 +76,8 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.values.OutputBuilder; +import org.apache.beam.sdk.values.WindowedValues; import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; @@ -549,25 +549,16 @@ public Object restriction() { } @Override - public OutputReceiver outputReceiver(DoFn doFn) { + public OutputReceiver outputReceiver(DoFn doFn) { return new OutputReceiver() { @Override - public void output(SomeRestriction output) { - outputs.add(output); - } - - @Override - public void outputWithTimestamp(SomeRestriction output, Instant timestamp) { - fail("Unexpected output with timestamp"); - } - - @Override - public void outputWindowedValue( - SomeRestriction output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - fail("Unexpected outputWindowedValue"); + public OutputBuilder builder(SomeRestriction value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(mockTimestamp) + .setWindow(mockWindow) + .setPaneInfo(PaneInfo.NO_FIRING) + .setReceiver(windowedValue -> outputs.add(windowedValue.getValue())); } }; } @@ -801,28 +792,18 @@ public OutputReceiver outputReceiver(DoFn doFn) { private boolean invoked; @Override - public void output(String output) { - assertFalse(invoked); - invoked = true; - assertEquals("foo", output); - } - - @Override - public void outputWithTimestamp(String output, Instant instant) { - assertFalse(invoked); - invoked = true; - assertEquals("foo", output); - } - - @Override - public void outputWindowedValue( - String output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - assertFalse(invoked); - invoked = true; - assertEquals("foo", output); + public OutputBuilder builder(String value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(mockTimestamp) + .setWindow(mockWindow) + .setPaneInfo(PaneInfo.NO_FIRING) + .setReceiver( + windowedValue -> { + assertFalse(invoked); + invoked = true; + assertEquals("foo", windowedValue.getValue()); + }); } }; } diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index b213a716dcf9..00a8fa8a5ac5 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -34,6 +34,7 @@ dependencies { provided library.java.jackson_databind provided library.java.joda_time provided library.java.slf4j_api + provided library.java.hamcrest provided library.java.vendored_grpc_1_69_0 provided library.java.vendored_guava_32_1_2_jre @@ -79,4 +80,5 @@ dependencies { shadowTest project(path: ":sdks:java:core", configuration: "shadowTest") shadowTestRuntimeClasspath library.java.slf4j_jdk14 permitUnusedDeclared library.java.avro + permitUnusedDeclared library.java.hamcrest } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java index 0b3c677bb54d..48b87c270807 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import java.io.IOException; -import java.util.Collection; import java.util.Map; import org.apache.beam.fn.harness.MapFnRunners.WindowedValueMapFnFactory; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; @@ -92,7 +91,7 @@ private AssignWindowsRunner(WindowFn windowFn) { WindowedValue assignWindows(WindowedValue input) throws Exception { // TODO: https://github.com/apache/beam/issues/18870 consider allocating only once and updating // the current value per call. - WindowFn.AssignContext ctxt = + WindowFn.AssignContext assignContext = windowFn.new AssignContext() { @Override public T element() { @@ -109,7 +108,7 @@ public BoundedWindow window() { return Iterables.getOnlyElement(input.getWindows()); } }; - Collection windows = windowFn.assignWindows(ctxt); - return WindowedValues.of(input.getValue(), input.getTimestamp(), windows, input.getPaneInfo()); + + return WindowedValues.builder(input).setWindows(windowFn.assignWindows(assignContext)).build(); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 580de80c5da3..0388d3c03f00 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -103,6 +103,7 @@ import org.apache.beam.sdk.util.construction.RehydratedComponents; import org.apache.beam.sdk.util.construction.Timer; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; @@ -1757,6 +1758,13 @@ public T sideInput(PCollectionView view) { private class WindowObservingProcessBundleContext extends WindowObservingProcessBundleContextBase { + @Override + public OutputBuilder builder(OutputT value) { + return WindowedValues.builder() + .setValue(value) + .setReceiver(windowedValue -> outputTo(mainOutputConsumer, windowedValue)); + } + @Override public void output(OutputT output) { // Don't need to check timestamp since we can always output using the input timestamp. @@ -1924,6 +1932,17 @@ public TimerMap timerFamily(String timerFamilyId) { private class NonWindowObservingProcessBundleContext extends NonWindowObservingProcessBundleContextBase { + @Override + public OutputBuilder builder(OutputT value) { + return WindowedValues.builder(currentElement) + .withValue(value) + .setReceiver( + windowedValue -> { + checkTimestamp(windowedValue.getTimestamp()); + outputTo(mainOutputConsumer, windowedValue); + }); + } + @Override public void output(OutputT output) { // Don't need to check timestamp since we can always output using the input timestamp. @@ -1947,11 +1966,7 @@ public void output(TupleTag tag, T output) { @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimestamp(timestamp); - outputTo( - mainOutputConsumer, - WindowedValues.of( - output, timestamp, currentElement.getWindows(), currentElement.getPaneInfo())); + builder(output).setValue(output).setTimestamp(timestamp).output(); } @Override @@ -1960,8 +1975,7 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - checkTimestamp(timestamp); - outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); + builder(output).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); } @Override @@ -2141,6 +2155,12 @@ public OutputReceiver outputReceiver(DoFn doFn) { return this; } + @Override + // OutputT == RestrictionT + public void output(OutputT output) { + OutputReceiver.super.output(output); + } + private final OutputReceiver mainRowOutputReceiver = mainOutputSchemaCoder == null ? null @@ -2149,24 +2169,16 @@ public OutputReceiver outputReceiver(DoFn doFn) { mainOutputSchemaCoder.getFromRowFunction(); @Override - public void output(Row output) { - ProcessBundleContextBase.this.output(fromRowFunction.apply(output)); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - ProcessBundleContextBase.this.outputWithTimestamp( - fromRowFunction.apply(output), timestamp); - } - - @Override - public void outputWindowedValue( - Row output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - ProcessBundleContextBase.this.outputWindowedValue( - fromRowFunction.apply(output), timestamp, windows, paneInfo); + public OutputBuilder builder(Row value) { + return WindowedValues.builder(currentElement) + .withValue(value) + .setReceiver( + windowedRow -> + ProcessBundleContextBase.this.outputWindowedValue( + fromRowFunction.apply(windowedRow.getValue()), + windowedRow.getTimestamp(), + windowedRow.getWindows(), + windowedRow.getPaneInfo())); } }; @@ -2195,23 +2207,17 @@ private OutputReceiver createTaggedOutputReceiver(TupleTag tag) { } return new OutputReceiver() { @Override - public void output(T output) { - ProcessBundleContextBase.this.output(tag, output); - } - - @Override - public void outputWithTimestamp(T output, Instant timestamp) { - ProcessBundleContextBase.this.outputWithTimestamp(tag, output, timestamp); - } - - @Override - public void outputWindowedValue( - T output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - ProcessBundleContextBase.this.outputWindowedValue( - tag, output, timestamp, windows, paneInfo); + public OutputBuilder builder(T value) { + return WindowedValues.builder(currentElement) + .withValue(value) + .setReceiver( + windowedValue -> + ProcessBundleContextBase.this.outputWindowedValue( + tag, + windowedValue.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; } @@ -2239,24 +2245,17 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { ((SchemaCoder) outputCoder).getFromRowFunction(); @Override - public void output(Row output) { - ProcessBundleContextBase.this.output(tag, fromRowFunction.apply(output)); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - ProcessBundleContextBase.this.outputWithTimestamp( - tag, fromRowFunction.apply(output), timestamp); - } - - @Override - public void outputWindowedValue( - Row output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - ProcessBundleContextBase.this.outputWindowedValue( - tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + public OutputBuilder builder(Row value) { + return WindowedValues.builder(currentElement) + .withValue(value) + .setReceiver( + windowedRow -> + ProcessBundleContextBase.this.outputWindowedValue( + tag, + fromRowFunction.apply(windowedRow.getValue()), + windowedRow.getTimestamp(), + windowedRow.getWindows(), + windowedRow.getPaneInfo())); } }; } @@ -2321,12 +2320,12 @@ public Instant timestamp() { @Override public String currentRecordId() { - return currentElement.getCurrentRecordId(); + return currentElement.getRecordId(); } @Override public Long currentRecordOffset() { - return currentElement.getCurrentRecordOffset(); + return currentElement.getRecordOffset(); } @Override @@ -2352,6 +2351,7 @@ public WatermarkEstimator watermarkEstimator() { private class OnWindowExpirationContext extends BaseArgumentProvider { private class Context extends DoFn.OnWindowExpirationContext implements OutputReceiver { + private Context() { doFn.super(); } @@ -2361,28 +2361,14 @@ public PipelineOptions getPipelineOptions() { return pipelineOptions; } - @Override - public BoundedWindow window() { - return currentWindow; - } - @Override public void output(OutputT output) { - outputTo( - mainOutputConsumer, - WindowedValues.of( - output, - currentTimer.getHoldTimestamp(), - currentWindow, - currentTimer.getPaneInfo())); + OutputReceiver.super.output(output); } @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkOnWindowExpirationTimestamp(timestamp); - outputTo( - mainOutputConsumer, - WindowedValues.of(output, timestamp, currentWindow, currentTimer.getPaneInfo())); + OutputReceiver.super.outputWithTimestamp(output, timestamp); } @Override @@ -2391,8 +2377,26 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - checkOnWindowExpirationTimestamp(timestamp); - outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); + OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo); + } + + @Override + public BoundedWindow window() { + return currentWindow; + } + + @Override + public OutputBuilder builder(OutputT value) { + return WindowedValues.builder() + .setValue(value) + .setWindow(currentWindow) + .setTimestamp(currentTimer.getHoldTimestamp()) + .setPaneInfo(currentTimer.getPaneInfo()) + .setReceiver( + windowedValue -> { + checkOnWindowExpirationTimestamp(windowedValue.getTimestamp()); + outputTo(mainOutputConsumer, windowedValue); + }); } @Override @@ -2530,23 +2534,18 @@ public OutputReceiver outputReceiver(DoFn doFn) { mainOutputSchemaCoder.getFromRowFunction(); @Override - public void output(Row output) { - context.output(fromRowFunction.apply(output)); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - context.outputWithTimestamp(fromRowFunction.apply(output), timestamp); - } - - @Override - public void outputWindowedValue( - Row output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - context.outputWindowedValue( - fromRowFunction.apply(output), timestamp, windows, paneInfo); + public OutputBuilder builder(Row value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(currentTimer.getHoldTimestamp()) + .setWindow(currentWindow) + .setReceiver( + windowedValue -> + context.outputWindowedValue( + fromRowFunction.apply(windowedValue.getValue()), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; @@ -2572,22 +2571,19 @@ private OutputReceiver createTaggedOutputReceiver(TupleTag tag) { } return new OutputReceiver() { @Override - public void output(T output) { - context.output(tag, output); - } - - @Override - public void outputWithTimestamp(T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, timestamp); - } - - @Override - public void outputWindowedValue( - T output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - context.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + public OutputBuilder builder(T value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(currentTimer.getHoldTimestamp()) + .setWindow(currentWindow) + .setReceiver( + windowedValue -> + context.outputWindowedValue( + tag, + windowedValue.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; } @@ -2612,23 +2608,19 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { ((SchemaCoder) outputCoder).getFromRowFunction(); @Override - public void output(Row output) { - context.output(tag, fromRowFunction.apply(output)); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - context.outputWithTimestamp(tag, fromRowFunction.apply(output), timestamp); - } - - @Override - public void outputWindowedValue( - Row output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - context.outputWindowedValue( - tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + public OutputBuilder builder(Row value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(currentTimer.getHoldTimestamp()) + .setWindow(currentWindow) + .setReceiver( + windowedValue -> + context.outputWindowedValue( + tag, + fromRowFunction.apply(windowedValue.getValue()), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; } @@ -2698,24 +2690,28 @@ public BoundedWindow window() { return currentWindow; } + @Override + public OutputBuilder builder(OutputT value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(currentTimer.getHoldTimestamp()) + .setWindow(currentWindow) + .setPaneInfo(currentTimer.getPaneInfo()) + .setReceiver( + windowedValue -> { + checkTimerTimestamp(windowedValue.getTimestamp()); + outputTo(mainOutputConsumer, windowedValue); + }); + } + @Override public void output(OutputT output) { - checkTimerTimestamp(currentTimer.getHoldTimestamp()); - outputTo( - mainOutputConsumer, - WindowedValues.of( - output, - currentTimer.getHoldTimestamp(), - currentWindow, - currentTimer.getPaneInfo())); + OutputReceiver.super.output(output); } @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimerTimestamp(timestamp); - outputTo( - mainOutputConsumer, - WindowedValues.of(output, timestamp, currentWindow, currentTimer.getPaneInfo())); + OutputReceiver.super.outputWithTimestamp(output, timestamp); } @Override @@ -2724,8 +2720,7 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - checkTimerTimestamp(timestamp); - outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); + OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo); } @Override @@ -2868,24 +2863,16 @@ public OutputReceiver outputReceiver(DoFn doFn) { mainOutputSchemaCoder.getFromRowFunction(); @Override - public void output(Row output) { - context.outputWithTimestamp( - fromRowFunction.apply(output), currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - context.outputWithTimestamp(fromRowFunction.apply(output), timestamp); - } - - @Override - public void outputWindowedValue( - Row output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - context.outputWindowedValue( - fromRowFunction.apply(output), timestamp, windows, paneInfo); + public OutputBuilder builder(Row value) { + return WindowedValues.builder(currentElement) + .withValue(value) + .setReceiver( + windowedValue -> + context.outputWindowedValue( + fromRowFunction.apply(windowedValue.getValue()), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; @@ -2911,22 +2898,19 @@ private OutputReceiver createTaggedOutputReceiver(TupleTag tag) { } return new OutputReceiver() { @Override - public void output(T output) { - context.output(tag, output); - } - - @Override - public void outputWithTimestamp(T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, timestamp); - } - - @Override - public void outputWindowedValue( - T output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - context.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + public OutputBuilder builder(T value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(currentTimer.getHoldTimestamp()) + .setWindow(currentWindow) + .setPaneInfo(currentTimer.getPaneInfo()) + .setReceiver( + windowedValue -> + context.outputWindowedValue( + windowedValue.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; } @@ -2951,23 +2935,19 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { ((SchemaCoder) outputCoder).getFromRowFunction(); @Override - public void output(Row output) { - context.output(tag, fromRowFunction.apply(output)); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - context.outputWithTimestamp(tag, fromRowFunction.apply(output), timestamp); - } - - @Override - public void outputWindowedValue( - Row output, - Instant timestamp, - Collection windows, - PaneInfo paneInfo) { - context.outputWindowedValue( - tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + public OutputBuilder builder(Row value) { + return WindowedValues.builder() + .withValue(value) + .setTimestamp(currentTimer.getHoldTimestamp()) + .setWindow(currentWindow) + .setPaneInfo(currentTimer.getPaneInfo()) + .setReceiver( + windowedValue -> + context.outputWindowedValue( + fromRowFunction.apply(windowedValue.getValue()), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo())); } }; } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java index 0fd03447f6e5..e42cbdaf6435 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.ParDoTranslation; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -338,22 +339,24 @@ public Object sideInput(String tagId) { } @Override - public void output(RestrictionT subrestriction) { - // This OutputReceiver is only for being passed to SplitRestriction OutputT == RestrictionT - double size = getSize(subrestriction); - - // Don't need to check timestamp since we can always output using the input timestamp. - outputTo( - mainOutputConsumer, - WindowedValues.of( - KV.of( - KV.of( - getCurrentElement().getValue(), - KV.of(subrestriction, getCurrentWatermarkEstimatorState())), - size), - getCurrentElement().getTimestamp(), - getCurrentWindow(), - getCurrentElement().getPaneInfo())); + public OutputBuilder builder(RestrictionT subrestriction) { + return WindowedValues.builder(getCurrentElement()) + .withValue(subrestriction) + .setWindow(getCurrentWindow()) + .setReceiver( + windowedValue -> { + double size = getSize(windowedValue.getValue()); + + outputTo( + mainOutputConsumer, + windowedValue.withValue( + KV.of( + KV.of( + getCurrentElement().getValue(), + KV.of( + windowedValue.getValue(), getCurrentWatermarkEstimatorState())), + size))); + }); } } @@ -361,19 +364,23 @@ public void output(RestrictionT subrestriction) { private class SizedRestrictionNonWindowObservingArgumentProvider extends SplitRestrictionArgumentProvider { @Override - public void output(RestrictionT subrestriction) { - double size = getSize(subrestriction); - - // Don't need to check timestamp since we can always output using the input timestamp. - outputTo( - mainOutputConsumer, - getCurrentElement() - .withValue( - KV.of( - KV.of( - getCurrentElement().getValue(), - KV.of(subrestriction, getCurrentWatermarkEstimatorState())), - size))); + public OutputBuilder builder(RestrictionT subrestriction) { + return WindowedValues.builder(getCurrentElement()) + .withValue(subrestriction) + .setReceiver( + windowedValue -> { + double size = getSize(windowedValue.getValue()); + + outputTo( + mainOutputConsumer, + windowedValue.withValue( + KV.of( + KV.of( + getCurrentElement().getValue(), + KV.of( + windowedValue.getValue(), getCurrentWatermarkEstimatorState())), + size))); + }); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java index f7e2efdbcf35..6c300295eb6d 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java @@ -62,6 +62,7 @@ import org.apache.beam.sdk.util.construction.ParDoTranslation; import org.apache.beam.sdk.util.construction.RehydratedComponents; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -777,19 +778,23 @@ private class TruncateSizedRestrictionWindowObservingArgumentProvider extends TruncateSizedRestrictionArgumentProvider { @Override - public void output(RestrictionT output) { - double size = getSize(output); - outputTo( - mainOutputConsumer, - WindowedValues.of( - KV.of( - KV.of( - getCurrentElement().getValue(), - KV.of(output, getCurrentWatermarkEstimatorState())), - size), - getCurrentElement().getTimestamp(), - getCurrentWindow(), - getCurrentElement().getPaneInfo())); + public OutputBuilder builder(RestrictionT value) { + return WindowedValues.builder(getCurrentElement()) + .withValue(value) + .setWindow(getCurrentWindow()) + .setReceiver( + windowedValue -> { + double size = getSize(windowedValue.getValue()); + outputTo( + mainOutputConsumer, + windowedValue.withValue( + KV.of( + KV.of( + getCurrentElement().getValue(), + KV.of( + windowedValue.getValue(), getCurrentWatermarkEstimatorState())), + size))); + }); } @Override @@ -812,17 +817,24 @@ private class TruncateSizedRestrictionNonWindowObservingArgumentProvider extends TruncateSizedRestrictionArgumentProvider { @Override - public void output(RestrictionT truncatedRestriction) { - double size = getSize(truncatedRestriction); - outputTo( - mainOutputConsumer, - getCurrentElement() - .withValue( - KV.of( - KV.of( - getCurrentElement().getValue(), - KV.of(truncatedRestriction, getCurrentWatermarkEstimatorState())), - size))); + public OutputBuilder builder(RestrictionT value) { + return WindowedValues.builder(getCurrentElement()) + .withValue(value) + .setReceiver( + windowedValue -> { + double size = getSize(windowedValue.getValue()); + outputTo( + mainOutputConsumer, + getCurrentElement() + .withValue( + KV.of( + KV.of( + getCurrentElement().getValue(), + KV.of( + windowedValue.getValue(), + getCurrentWatermarkEstimatorState())), + size))); + }); } } @@ -911,6 +923,16 @@ public void outputWithTimestamp(RestrictionT output, Instant timestamp) { throw new UnsupportedOperationException( "Cannot outputWithTimestamp from TruncateRestriction"); } + + @Override + public void outputWindowedValue( + RestrictionT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException( + "Cannot outputWindowedValue from TruncateRestriction"); + } } /** diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index edd9c4654646..ef19b7c18804 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -18,6 +18,7 @@ package org.apache.beam.fn.harness; import static java.util.Arrays.asList; +import static org.apache.beam.runners.core.WindowMatchers.isValueInGlobalWindow; import static org.apache.beam.sdk.options.ExperimentalOptions.addExperiment; import static org.apache.beam.sdk.values.WindowedValues.timestampedValueInGlobalWindow; import static org.apache.beam.sdk.values.WindowedValues.valueInGlobalWindow; @@ -1002,36 +1003,36 @@ public void testTimers() throws Exception { dynamicTimerInGlobalWindow( "Y", "processing-timer2", new Instant(2100L), new Instant(3100L))); + assertThat( + mainOutputValues.get(0), isValueInGlobalWindow("key:X mainX[X0]", new Instant(1000L))); + assertThat( mainOutputValues, - contains( - timestampedValueInGlobalWindow("key:X mainX[X0]", new Instant(1000L)), - timestampedValueInGlobalWindow("key:Y mainY[]", new Instant(1100L)), - timestampedValueInGlobalWindow("key:X mainX[X0, X1]", new Instant(1200L)), - timestampedValueInGlobalWindow("key:Y mainY[Y1]", new Instant(1300L)), - timestampedValueInGlobalWindow("key:A event[A0]", new Instant(1400L)), - timestampedValueInGlobalWindow("key:B event[]", new Instant(1500L)), - timestampedValueInGlobalWindow("key:A event[A0, event]", new Instant(1400L)), - timestampedValueInGlobalWindow("key:A event[A0, event, event]", new Instant(1400L)), - timestampedValueInGlobalWindow( - "key:A event[A0, event, event, event]", new Instant(1400L)), - timestampedValueInGlobalWindow( + containsInAnyOrder( + isValueInGlobalWindow("key:X mainX[X0]", new Instant(1000L)), + isValueInGlobalWindow("key:Y mainY[]", new Instant(1100L)), + isValueInGlobalWindow("key:X mainX[X0, X1]", new Instant(1200L)), + isValueInGlobalWindow("key:Y mainY[Y1]", new Instant(1300L)), + isValueInGlobalWindow("key:A event[A0]", new Instant(1400L)), + isValueInGlobalWindow("key:B event[]", new Instant(1500L)), + isValueInGlobalWindow("key:A event[A0, event]", new Instant(1400L)), + isValueInGlobalWindow("key:A event[A0, event, event]", new Instant(1400L)), + isValueInGlobalWindow("key:A event[A0, event, event, event]", new Instant(1400L)), + isValueInGlobalWindow( "key:A event[A0, event, event, event, event]", new Instant(1400L)), - timestampedValueInGlobalWindow( + isValueInGlobalWindow( "key:A event[A0, event, event, event, event, event]", new Instant(1400L)), - timestampedValueInGlobalWindow( + isValueInGlobalWindow( "key:A event[A0, event, event, event, event, event, event]", new Instant(1400L)), - timestampedValueInGlobalWindow("key:C processing[C0]", new Instant(1800L)), - timestampedValueInGlobalWindow("key:B processing[event]", new Instant(1500L)), - timestampedValueInGlobalWindow("key:B event[event, processing]", new Instant(1500)), - timestampedValueInGlobalWindow( - "key:B event[event, processing, event]", new Instant(1500)), - timestampedValueInGlobalWindow( + isValueInGlobalWindow("key:C processing[C0]", new Instant(1800L)), + isValueInGlobalWindow("key:B processing[event]", new Instant(1500L)), + isValueInGlobalWindow("key:B event[event, processing]", new Instant(1500)), + isValueInGlobalWindow("key:B event[event, processing, event]", new Instant(1500)), + isValueInGlobalWindow( "key:B event[event, processing, event, event]", new Instant(1500)), - timestampedValueInGlobalWindow( + isValueInGlobalWindow( "key:B event-family[event, processing, event, event, event]", new Instant(2000L)), - timestampedValueInGlobalWindow( - "key:Y processing-family[Y1, Y2]", new Instant(2100L)))); + isValueInGlobalWindow("key:Y processing-family[Y1, Y2]", new Instant(2100L)))); mainOutputValues.clear(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunnerTest.java index 1336d2f4ba9f..34ef3e95b191 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunnerTest.java @@ -17,9 +17,11 @@ */ package org.apache.beam.fn.harness; +import static org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue; +import static org.apache.beam.runners.core.WindowMatchers.isValueInGlobalWindow; +import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue; import static org.apache.beam.sdk.values.WindowedValues.valueInGlobalWindow; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertTrue; @@ -214,20 +216,20 @@ public void testProcessElementForSplitAndSizeRestriction() throws Exception { KV.of("2", KV.of(new OffsetRange(0, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)))); assertThat( mainOutputValues, - contains( - valueInGlobalWindow( + containsInAnyOrder( + isValueInGlobalWindow( KV.of( KV.of("5", KV.of(new OffsetRange(0, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 2.0)), - valueInGlobalWindow( + isValueInGlobalWindow( KV.of( KV.of("5", KV.of(new OffsetRange(2, 5), GlobalWindow.TIMESTAMP_MIN_VALUE)), 3.0)), - valueInGlobalWindow( + isValueInGlobalWindow( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0)), - valueInGlobalWindow( + isValueInGlobalWindow( KV.of( KV.of("2", KV.of(new OffsetRange(1, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0)))); @@ -325,59 +327,60 @@ public void testProcessElementForWindowedSplitAndSizeRestriction() throws Except // Since the DoFn observes the window and it may affect the output, each input is processed // separately and each // output is per-window. + assertThat( mainOutputValues, - contains( - WindowedValues.of( + containsInAnyOrder( + isSingleWindowedValue( KV.of( KV.of("5", KV.of(new OffsetRange(0, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 2.0), firstValue.getTimestamp(), window1, firstValue.getPaneInfo()), - WindowedValues.of( + isSingleWindowedValue( KV.of( KV.of("5", KV.of(new OffsetRange(2, 5), GlobalWindow.TIMESTAMP_MIN_VALUE)), 3.0), firstValue.getTimestamp(), window1, firstValue.getPaneInfo()), - WindowedValues.of( + isSingleWindowedValue( KV.of( KV.of("5", KV.of(new OffsetRange(0, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 2.0), firstValue.getTimestamp(), window2, firstValue.getPaneInfo()), - WindowedValues.of( + isSingleWindowedValue( KV.of( KV.of("5", KV.of(new OffsetRange(2, 5), GlobalWindow.TIMESTAMP_MIN_VALUE)), 3.0), firstValue.getTimestamp(), window2, firstValue.getPaneInfo()), - WindowedValues.of( + isSingleWindowedValue( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), secondValue.getTimestamp(), window1, secondValue.getPaneInfo()), - WindowedValues.of( + isSingleWindowedValue( KV.of( KV.of("2", KV.of(new OffsetRange(1, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), secondValue.getTimestamp(), window1, secondValue.getPaneInfo()), - WindowedValues.of( + isSingleWindowedValue( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), secondValue.getTimestamp(), window2, secondValue.getPaneInfo()), - WindowedValues.of( + isSingleWindowedValue( KV.of( KV.of("2", KV.of(new OffsetRange(1, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), @@ -470,29 +473,29 @@ public void testProcessElementForWindowedSplitAndSizeRestriction() throws Except // Ensure that each output element is in all the windows and not one per window. assertThat( mainOutputValues, - contains( - WindowedValues.of( + containsInAnyOrder( + isWindowedValue( KV.of( KV.of("5", KV.of(new OffsetRange(0, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 2.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), firstValue.getPaneInfo()), - WindowedValues.of( + isWindowedValue( KV.of( KV.of("5", KV.of(new OffsetRange(2, 5), GlobalWindow.TIMESTAMP_MIN_VALUE)), 3.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), firstValue.getPaneInfo()), - WindowedValues.of( + isWindowedValue( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), firstValue.getPaneInfo()), - WindowedValues.of( + isWindowedValue( KV.of( KV.of("2", KV.of(new OffsetRange(1, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index cbcd70753aca..0d483367f7b9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -73,12 +73,15 @@ import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; @@ -1175,35 +1178,59 @@ public void process( numPendingRecordBytes += element.getValue().getPayload().length; } + private OutputReceiver makeSuccessfulRowsreceiver( + FinishBundleContext context, TupleTag successfulRowsTag) { + return new OutputReceiver() { + @Override + public OutputBuilder builder(TableRow value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(GlobalWindow.INSTANCE.maxTimestamp()) + .setWindow(GlobalWindow.INSTANCE) + .setPaneInfo(PaneInfo.NO_FIRING) + .setReceiver( + windowedValue -> { + for (BoundedWindow window : windowedValue.getWindows()) { + context.output( + successfulRowsTag, + windowedValue.getValue(), + windowedValue.getTimestamp(), + window); + } + }); + } + }; + } + @FinishBundle public void finishBundle(FinishBundleContext context) throws Exception { + OutputReceiver failedRowsReceiver = new OutputReceiver() { @Override - public void output(BigQueryStorageApiInsertError output) { - outputWithTimestamp(output, GlobalWindow.INSTANCE.maxTimestamp()); - } - - @Override - public void outputWithTimestamp( - BigQueryStorageApiInsertError output, org.joda.time.Instant timestamp) { - context.output(failedRowsTag, output, timestamp, GlobalWindow.INSTANCE); + public OutputBuilder builder( + BigQueryStorageApiInsertError value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(GlobalWindow.INSTANCE.maxTimestamp()) + .setWindow(GlobalWindow.INSTANCE) + .setPaneInfo(PaneInfo.NO_FIRING) + .setReceiver( + windowedValue -> { + for (BoundedWindow window : windowedValue.getWindows()) { + context.output( + failedRowsTag, + windowedValue.getValue(), + windowedValue.getTimestamp(), + window); + } + }); } }; + @Nullable OutputReceiver successfulRowsReceiver = null; if (successfulRowsTag != null) { - successfulRowsReceiver = - new OutputReceiver() { - @Override - public void output(TableRow output) { - outputWithTimestamp(output, GlobalWindow.INSTANCE.maxTimestamp()); - } - - @Override - public void outputWithTimestamp(TableRow output, org.joda.time.Instant timestamp) { - context.output(successfulRowsTag, output, timestamp, GlobalWindow.INSTANCE); - } - }; + successfulRowsReceiver = makeSuccessfulRowsreceiver(context, successfulRowsTag); } flushAll(failedRowsReceiver, successfulRowsReceiver); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 90a91d10694f..e060766cbd22 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -113,13 +113,17 @@ import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.transforms.WithTimestamps; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.OutputBuilderSupplier; import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -132,6 +136,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; @@ -2308,20 +2313,34 @@ public int compareTo(MutationGroupContainer o) { private static class OutputReceiverForFinishBundle implements OutputReceiver> { - private final FinishBundleContext c; - - OutputReceiverForFinishBundle(FinishBundleContext c) { - this.c = c; - } - - @Override - public void output(Iterable output) { - outputWithTimestamp(output, Instant.now()); + private final OutputBuilderSupplier outputBuilderSupplier; + private final DoFn>.FinishBundleContext context; + + OutputReceiverForFinishBundle(FinishBundleContext context) { + this.context = context; + this.outputBuilderSupplier = + new OutputBuilderSupplier() { + @Override + public WindowedValues.Builder builder(OutputT value) { + return WindowedValues.builder() + .setValue(value) + .setTimestamp(Instant.now()) + .setPaneInfo(PaneInfo.NO_FIRING) + .setWindow(GlobalWindow.INSTANCE); + } + }; } @Override - public void outputWithTimestamp(Iterable output, Instant timestamp) { - c.output(output, timestamp, GlobalWindow.INSTANCE); + public OutputBuilder> builder(Iterable value) { + return outputBuilderSupplier + .builder(value) + .setReceiver( + wv -> { + for (BoundedWindow window : wv.getWindows()) { + context.output(wv.getValue(), wv.getTimestamp(), window); + } + }); } } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 4d22b1d6ea96..5e3e08a60664 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.TestOutputReceiver; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; @@ -337,10 +338,10 @@ public synchronized void seek(TopicPartition partition, long offset) {} private static class MockMultiOutputReceiver implements MultiOutputReceiver { - MockOutputReceiver>> mockOutputReceiver = - new MockOutputReceiver<>(); + TestOutputReceiver>> mockOutputReceiver = + new TestOutputReceiver<>(); - MockOutputReceiver badOutputReceiver = new MockOutputReceiver<>(); + TestOutputReceiver badOutputReceiver = new TestOutputReceiver<>(); @Override public @UnknownKeyFor @NonNull @Initialized OutputReceiver get( @@ -370,26 +371,6 @@ public List getBadRecords() { } } - private static class MockOutputReceiver implements OutputReceiver { - - private final List records = new ArrayList<>(); - - @Override - public void output(T output) { - records.add(output); - } - - @Override - public void outputWithTimestamp( - T output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) { - records.add(output); - } - - public List getOutputs() { - return this.records; - } - } - private List>> createExpectedRecords( KafkaSourceDescriptor descriptor, long startOffset, diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java index adfcbc98c56c..5b58c9511170 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java @@ -20,18 +20,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; -import java.util.List; import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.testing.TestOutputReceiver; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.internal.DefaultImplementation; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -97,7 +93,7 @@ public void testInitialRestrictionWithConsumerEndPosition() throws Exception { @Test public void testProcessElement() throws Exception { - MockOutputReceiver receiver = new MockOutputReceiver(); + TestOutputReceiver receiver = new TestOutputReceiver<>(); long startOffset = fakePulsarReader.getStartTimestamp(); long endOffset = fakePulsarReader.getEndTimestamp(); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, endOffset)); @@ -112,7 +108,7 @@ public void testProcessElement() throws Exception { @Test public void testProcessElementWhenEndMessageIdIsDefined() throws Exception { - MockOutputReceiver receiver = new MockOutputReceiver(); + TestOutputReceiver receiver = new TestOutputReceiver<>(); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)); MessageId endMessageId = DefaultImplementation.getDefaultImplementation().newMessageId(50L, 50L, 50); @@ -125,7 +121,7 @@ public void testProcessElementWhenEndMessageIdIsDefined() throws Exception { @Test public void testProcessElementWithEmptyRecords() throws Exception { - MockOutputReceiver receiver = new MockOutputReceiver(); + TestOutputReceiver receiver = new TestOutputReceiver<>(); fakePulsarReader.emptyMockRecords(); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)); DoFn.ProcessContinuation result = @@ -137,7 +133,7 @@ public void testProcessElementWithEmptyRecords() throws Exception { @Test public void testProcessElementWhenHasReachedEndTopic() throws Exception { - MockOutputReceiver receiver = new MockOutputReceiver(); + TestOutputReceiver receiver = new TestOutputReceiver<>(); fakePulsarReader.setReachedEndOfTopic(true); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)); DoFn.ProcessContinuation result = @@ -145,22 +141,4 @@ public void testProcessElementWhenHasReachedEndTopic() throws Exception { PulsarSourceDescriptor.of(TOPIC, null, null, null), tracker, null, receiver); assertEquals(DoFn.ProcessContinuation.stop(), result); } - - private static class MockOutputReceiver implements DoFn.OutputReceiver { - - private final List records = new ArrayList<>(); - - @Override - public void output(PulsarMessage output) {} - - @Override - public void outputWithTimestamp( - PulsarMessage output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) { - records.add(output); - } - - public List getOutputs() { - return records; - } - } } diff --git a/sdks/java/io/sparkreceiver/3/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java b/sdks/java/io/sparkreceiver/3/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java index 33827164c6b7..6ab5d8393def 100644 --- a/sdks/java/io/sparkreceiver/3/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java +++ b/sdks/java/io/sparkreceiver/3/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java @@ -24,13 +24,11 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.testing.TestOutputReceiver; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.Instant; import org.junit.Test; @@ -51,24 +49,6 @@ private SparkReceiverIO.Read makeReadTransform() { .withTimestampFn(Instant::parse); } - private static class MockOutputReceiver implements DoFn.OutputReceiver { - - private final List records = new ArrayList<>(); - - @Override - public void output(String output) {} - - @Override - public void outputWithTimestamp( - String output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) { - records.add(output); - } - - public List getOutputs() { - return this.records; - } - } - private final ManualWatermarkEstimator mockWatermarkEstimator = new ManualWatermarkEstimator() { @@ -131,7 +111,7 @@ public void testRestrictionTrackerSplit() { @Test public void testProcessElement() { - MockOutputReceiver receiver = new MockOutputReceiver(); + TestOutputReceiver receiver = new TestOutputReceiver<>(); DoFn.ProcessContinuation result = dofnInstance.processElement( TEST_ELEMENT,