diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index 156f95fbeb1c..ca1b701693f8 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 1, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 085929db9af8..3f4759213f78 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 3, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json index 27b4484dbd23..befe325aff7b 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31761": "noting that PR #31761 should run this test", "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json index 6572defd2538..b6d8417412d5 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/32440": "testing datastream optimizations", "runFor": "#33606", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json index f838c61661aa..1fd497f4748d 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json index 9649514a5df5..5c8755813ac7 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test", diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json index 77f63217b86d..8ce9bcbb62c4 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test", diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index abe8b99b1a26..2cab758d36e0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -41,12 +42,6 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class DoFnRunners { - /** Information about how to create output receivers and output to them. */ - public interface OutputManager { - /** Outputs a single element to the receiver indicated by the given {@link TupleTag}. */ - void output(TupleTag tag, WindowedValue output); - } - /** * Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}. * @@ -58,7 +53,7 @@ public static DoFnRunner simpleRunner( PipelineOptions options, DoFn fn, SideInputReader sideInputReader, - OutputManager outputManager, + WindowedValueMultiReceiver outputManager, TupleTag mainOutputTag, List> additionalOutputTags, StepContext stepContext, @@ -168,7 +163,7 @@ ProcessFnRunner newProcessFnRunner( PipelineOptions options, Collection> views, ReadyCheckingSideInputReader sideInputReader, - OutputManager outputManager, + WindowedValueMultiReceiver outputManager, TupleTag mainOutputTag, List> additionalOutputTags, StepContext stepContext, diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 9e2f4f4efce1..f242c7d10003 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -17,20 +17,17 @@ */ package org.apache.beam.runners.core; -import java.util.Collection; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; -import org.joda.time.Instant; /** * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the {@link @@ -51,7 +48,7 @@ DoFn, KV> create( TimerInternalsFactory timerInternalsFactory, SideInputReader sideInputReader, SystemReduceFn reduceFn, - DoFnRunners.OutputManager outputManager, + WindowedValueMultiReceiver outputManager, TupleTag> mainTag) { return new GroupAlsoByWindowViaWindowSetNewDoFn<>( strategy, @@ -68,7 +65,7 @@ DoFn, KV> create( private transient StateInternalsFactory stateInternalsFactory; private transient TimerInternalsFactory timerInternalsFactory; private transient SideInputReader sideInputReader; - private transient DoFnRunners.OutputManager outputManager; + private transient WindowedValueMultiReceiver outputManager; private TupleTag> mainTag; public GroupAlsoByWindowViaWindowSetNewDoFn( @@ -77,7 +74,7 @@ public GroupAlsoByWindowViaWindowSetNewDoFn( TimerInternalsFactory timerInternalsFactory, SideInputReader sideInputReader, SystemReduceFn reduceFn, - DoFnRunners.OutputManager outputManager, + WindowedValueMultiReceiver outputManager, TupleTag> mainTag) { this.timerInternalsFactory = timerInternalsFactory; this.sideInputReader = sideInputReader; @@ -91,29 +88,6 @@ public GroupAlsoByWindowViaWindowSetNewDoFn( this.triggerProto = TriggerTranslation.toProto(windowingStrategy.getTrigger()); } - private OutputWindowedValue> outputWindowedValue() { - return new OutputWindowedValue>() { - @Override - public void outputWindowedValue( - KV output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output(mainTag, WindowedValues.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output(tag, WindowedValues.of(output, timestamp, windows, pane)); - } - }; - } - @ProcessElement public void processElement(ProcessContext c) throws Exception { KeyedWorkItem keyedWorkItem = c.element(); @@ -130,7 +104,7 @@ public void processElement(ProcessContext c) throws Exception { TriggerStateMachines.stateMachineForTrigger(triggerProto)), stateInternals, timerInternals, - outputWindowedValue(), + windowedValue -> outputManager.output(mainTag, windowedValue), sideInputReader, reduceFn, c.getPipelineOptions()); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index b7ba99e32f55..7a2d47be8b0b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -45,11 +45,13 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures; import org.checkerframework.checker.nullness.qual.Nullable; @@ -72,19 +74,20 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> { private final DoFn fn; private final PipelineOptions pipelineOptions; - private final OutputWindowedValue output; + private final WindowedValueMultiReceiver outputReceiver; private final SideInputReader sideInputReader; private final ScheduledExecutorService executor; private final int maxNumOutputs; private final Duration maxDuration; private final Supplier bundleFinalizer; + private final TupleTag mainOutputTag; /** * Creates a new invoker from components. * * @param fn The original {@link DoFn}. * @param pipelineOptions {@link PipelineOptions} to include in the {@link DoFn.ProcessContext}. - * @param output Hook for outputting from the {@link DoFn.ProcessElement} method. + * @param outputReceiver Hook for outputting from the {@link DoFn.ProcessElement} method. * @param sideInputReader Hook for accessing side inputs. * @param executor Executor on which a checkpoint will be scheduled after the given duration. * @param maxNumOutputs Maximum number of outputs, in total over all output tags, after which a @@ -98,7 +101,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< public OutputAndTimeBoundedSplittableProcessElementInvoker( DoFn fn, PipelineOptions pipelineOptions, - OutputWindowedValue output, + WindowedValueMultiReceiver outputReceiver, + TupleTag mainOutputTag, SideInputReader sideInputReader, ScheduledExecutorService executor, int maxNumOutputs, @@ -106,7 +110,8 @@ public OutputAndTimeBoundedSplittableProcessElementInvoker( Supplier bundleFinalizer) { this.fn = fn; this.pipelineOptions = pipelineOptions; - this.output = output; + this.outputReceiver = outputReceiver; + this.mainOutputTag = mainOutputTag; this.sideInputReader = sideInputReader; this.executor = executor; this.maxNumOutputs = maxNumOutputs; @@ -403,7 +408,7 @@ public void outputWindowedValue( if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } - output.outputWindowedValue(value, timestamp, windows, paneInfo); + outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo)); } @Override @@ -413,7 +418,8 @@ public void output(TupleTag tag, T value) { @Override public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) { - outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane()); + outputReceiver.output( + tag, WindowedValues.of(value, timestamp, element.getWindows(), element.getPane())); } @Override @@ -427,7 +433,7 @@ public void outputWindowedValue( if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } - output.outputWindowedValue(tag, value, timestamp, windows, paneInfo); + outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo)); } private void noteOutput() { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java deleted file mode 100644 index 2ee8c1ec6e23..000000000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.util.Collection; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; - -/** - * An object that can output a value with all of its windowing information to the main output or any - * tagged output. - */ -public interface OutputWindowedValue { - /** Outputs a value with windowing information to the main output. */ - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane); - - /** Outputs a value with windowing information to a tagged output. */ - void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane); -} diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index ead53515558f..a40f4e1205c7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -47,9 +47,11 @@ import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -106,7 +108,7 @@ public class ReduceFnRunner { */ private final WindowingStrategy windowingStrategy; - private final OutputWindowedValue> outputter; + private final WindowedValueReceiver> outputter; private final StateInternals stateInternals; @@ -214,7 +216,7 @@ public ReduceFnRunner( ExecutableTriggerStateMachine triggerStateMachine, StateInternals stateInternals, TimerInternals timerInternals, - OutputWindowedValue> outputter, + WindowedValueReceiver> outputter, @Nullable SideInputReader sideInputReader, ReduceFn reduceFn, @Nullable PipelineOptions options) { @@ -1055,7 +1057,8 @@ private void prefetchOnTrigger( } // Output the actual value. - outputter.outputWindowedValue(KV.of(key, toOutput), outputTimestamp, windows, pane); + outputter.output( + WindowedValues.of(KV.of(key, toOutput), outputTimestamp, windows, pane)); }); reduceFn.onTrigger(renamedTriggerContext); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 1b9c4640b85e..d167bf63c8b7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.SchemaCoder; @@ -54,6 +53,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; @@ -94,7 +94,7 @@ public class SimpleDoFnRunner implements DoFnRunner invoker; private final SideInputReader sideInputReader; - private final OutputManager outputManager; + private final WindowedValueMultiReceiver outputManager; private final TupleTag mainOutputTag; /** The set of known output tags. */ @@ -124,7 +124,7 @@ public SimpleDoFnRunner( PipelineOptions options, DoFn fn, SideInputReader sideInputReader, - OutputManager outputManager, + WindowedValueMultiReceiver outputManager, TupleTag mainOutputTag, List> additionalOutputTags, StepContext stepContext, diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index 13a6709a43fd..8b2b83b4d9a3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.Collection; import java.util.Collections; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -38,10 +37,11 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; @@ -112,22 +112,13 @@ private SplittableProcessElementInvoker.R new OutputAndTimeBoundedSplittableProcessElementInvoker<>( fn, PipelineOptionsFactory.create(), - new OutputWindowedValue() { + new WindowedValueMultiReceiver() { @Override - public void outputWindowedValue( - String output, - Instant timestamp, - Collection windows, - PaneInfo pane) {} - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) {} + public void output(TupleTag tag, WindowedValue output) { + // discard + } }, + null, NullSideInputReader.empty(), Executors.newSingleThreadScheduledExecutor(), 1000, diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 193ff51f2051..55791d582717 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -57,10 +57,10 @@ import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; @@ -95,7 +95,7 @@ public class ReduceFnTester { private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); private final WindowFn windowFn; - private final TestOutputWindowedValue testOutputter; + private final TestWindowedValueReceiver testOutputter; private final SideInputReader sideInputReader; private final Coder outputCoder; private final WindowingStrategy objectStrategy; @@ -277,7 +277,7 @@ private ReduceFnTester( this.objectStrategy = objectStrategy; this.reduceFn = reduceFn; this.windowFn = objectStrategy.getWindowFn(); - this.testOutputter = new TestOutputWindowedValue(); + this.testOutputter = new TestWindowedValueReceiver(); this.sideInputReader = sideInputReader; this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine); this.outputCoder = outputCoder; @@ -594,36 +594,19 @@ public void fireTimers(W window, TimestampedValue... timers) throws runner.persist(); } - /** - * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output - * elements. - */ - private class TestOutputWindowedValue implements OutputWindowedValue> { + private class TestWindowedValueReceiver implements WindowedValueReceiver> { private List>> outputs = new ArrayList<>(); @Override - public void outputWindowedValue( - KV output, - Instant timestamp, - Collection windows, - PaneInfo pane) { + public void output(WindowedValue> output) { // Copy the output value (using coders) before capturing it. KV copy = SerializableUtils.ensureSerializableByCoder( - KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow"); - WindowedValue> value = WindowedValues.of(copy, timestamp, windows, pane); + KvCoder.of(StringUtf8Coder.of(), outputCoder), output.getValue(), "outputForWindow"); + WindowedValue> value = + WindowedValues.of(copy, output.getTimestamp(), output.getWindows(), output.getPaneInfo()); outputs.add(value); } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs"); - } } private static class TestAssignContext diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index a0435fe3026d..40bcb70e32c6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -30,7 +30,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.TimeDomain; @@ -46,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -790,11 +790,11 @@ public Duration getAllowedTimestampSkew() { } } - private static class ListOutputManager implements OutputManager { - private ListMultimap, WindowedValue> outputs = ArrayListMultimap.create(); + private static class ListOutputManager implements WindowedValueMultiReceiver { + private final ListMultimap, WindowedValue> outputs = ArrayListMultimap.create(); @Override - public void output(TupleTag tag, WindowedValue output) { + public void output(TupleTag tag, WindowedValue output) { outputs.put(tag, output); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index 7820fd36a835..3aebe2840a9f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; @@ -61,6 +60,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -171,7 +171,8 @@ private static class ProcessFnTester< new OutputAndTimeBoundedSplittableProcessElementInvoker<>( fn, tester.getPipelineOptions(), - new OutputWindowedValueToDoFnTester<>(tester), + new DoFnTesterWindowedValueReceiver(tester), + tester.getMainOutputTag(), new SideInputReader() { @Override public T get(PCollectionView view, BoundedWindow window) { @@ -256,32 +257,21 @@ public Instant getWatermarkHold() { } } - private static class OutputWindowedValueToDoFnTester - implements OutputWindowedValue { - private final DoFnTester tester; + private static class DoFnTesterWindowedValueReceiver implements WindowedValueMultiReceiver { + private final DoFnTester tester; - private OutputWindowedValueToDoFnTester(DoFnTester tester) { + private DoFnTesterWindowedValueReceiver(DoFnTester tester) { this.tester = tester; } @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputWindowedValue(tester.getMainOutputTag(), output, timestamp, windows, pane); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - for (BoundedWindow window : windows) { - tester.getMutableOutput(tag).add(ValueInSingleWindow.of(output, timestamp, window, pane)); + public void output(TupleTag tag, WindowedValue output) { + for (BoundedWindow window : output.getWindows()) { + tester + .getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + output.getValue(), output.getTimestamp(), window, output.getPaneInfo())); } } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index 11f3d244b37f..4ed4f09fe62b 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.function.BiFunction; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -44,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -291,7 +291,8 @@ private void testOutput(boolean ordered) throws Exception { private void testOutput( boolean ordered, - BiFunction, Integer>> runnerFactory) + BiFunction, Integer>> + runnerFactory) throws Exception { timerInternals.advanceInputWatermark(new Instant(1L)); @@ -300,7 +301,7 @@ private void testOutput( StateTag> stateTag = StateTags.tagForSpec(MyDoFn.STATE_ID, fn.intState()); List, WindowedValue>> outputs = new ArrayList<>(); - OutputManager output = asOutputManager(outputs); + WindowedValueMultiReceiver output = asOutputManager(outputs); DoFnRunner, Integer> runner = runnerFactory.apply(fn, output); Instant elementTime = new Instant(5); @@ -405,13 +406,13 @@ private DoFnRunner createStatefulDoFnRunner(DoFn, Integer> f } private DoFnRunner createStatefulDoFnRunner( - DoFn, Integer> fn, OutputManager outputManager) { + DoFn, Integer> fn, WindowedValueMultiReceiver outputManager) { return createStatefulDoFnRunner(fn, outputManager, true); } private DoFnRunner createStatefulDoFnRunner( DoFn, Integer> fn, - OutputManager outputManager, + WindowedValueMultiReceiver outputManager, boolean supportTimeSortedInput) { return DoFnRunners.defaultStatefulDoFnRunner( fn, @@ -426,7 +427,7 @@ private DoFnRunner createStatefulDoFnRunner( } private DoFnRunner, Integer> getDoFnRunner( - DoFn, Integer> fn, @Nullable OutputManager outputManager) { + DoFn, Integer> fn, @Nullable WindowedValueMultiReceiver outputManager) { return new SimpleDoFnRunner<>( null, fn, @@ -442,10 +443,10 @@ private DoFnRunner, Integer> getDoFnRunner( Collections.emptyMap()); } - private OutputManager discardingOutputManager() { - return new OutputManager() { + private WindowedValueMultiReceiver discardingOutputManager() { + return new WindowedValueMultiReceiver() { @Override - public void output(TupleTag tag, WindowedValue output) { + public void output(TupleTag tag, WindowedValue output) { // discard } }; @@ -471,11 +472,13 @@ private static void advanceInputWatermark( } } - private static OutputManager asOutputManager(List, WindowedValue>> outputs) { - return new OutputManager() { + private static WindowedValueMultiReceiver asOutputManager( + List, WindowedValue>> outputs) { + return new WindowedValueMultiReceiver() { @Override - public void output(TupleTag tag, WindowedValue output) { - outputs.add(KV.of(tag, output)); + public void output(TupleTag tag, WindowedValue output) { + // silly cast needed to forget the inner type parameter + outputs.add((KV, WindowedValue>) (Object) KV.of(tag, output)); } }; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 45cb58b4a492..0e011aa5cd9b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -25,7 +25,6 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; @@ -42,18 +41,15 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; -import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; -import org.joda.time.Instant; /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link @@ -180,7 +176,7 @@ public void processElement(WindowedValue> element) throws Ex TriggerStateMachines.stateMachineForTrigger(runnerApiTrigger)), stateInternals, timerInternals, - new OutputWindowedValueToBundle<>(bundle), + new BundleWindowedValueReceiver<>(bundle), new UnsupportedSideInputReader(DirectGroupAlsoByWindow.class.getSimpleName()), reduceFn, options); @@ -241,33 +237,17 @@ Iterable> dropExpiredWindows( } } - private static class OutputWindowedValueToBundle - implements OutputWindowedValue>> { + private static class BundleWindowedValueReceiver + implements WindowedValueReceiver>> { private final UncommittedBundle>> bundle; - private OutputWindowedValueToBundle(UncommittedBundle>> bundle) { + private BundleWindowedValueReceiver(UncommittedBundle>> bundle) { this.bundle = bundle; } @Override - public void outputWindowedValue( - KV> output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - bundle.add(WindowedValues.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException( - String.format( - "%s should not use tagged outputs", DirectGroupAlsoByWindow.class.getSimpleName())); + public void output(WindowedValue>> valueWithMetadata) { + bundle.add(valueWithMetadata); } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 6be73b645c68..6e7763e4b796 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -25,7 +25,6 @@ import java.util.stream.Collectors; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.ReadyCheckingSideInputReader; @@ -42,6 +41,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -61,7 +61,7 @@ PushbackSideInputDoFnRunner createRunner( DoFn fn, List> sideInputs, ReadyCheckingSideInputReader sideInputReader, - OutputManager outputManager, + WindowedValueMultiReceiver outputManager, TupleTag mainOutputTag, List> additionalOutputTags, DirectStepContext stepContext, @@ -287,7 +287,7 @@ public TransformResult finishBundle() { .build(); } - static class BundleOutputManager implements OutputManager { + static class BundleOutputManager implements WindowedValueMultiReceiver { private final Map, UncommittedBundle> bundles; public static BundleOutputManager create(Map, UncommittedBundle> outputBundles) { @@ -298,11 +298,10 @@ private BundleOutputManager(Map, UncommittedBundle> bundles) { this.bundles = bundles; } - @SuppressWarnings({"unchecked", "rawtypes"}) @Override - public void output(TupleTag tag, WindowedValue output) { + public void output(TupleTag tag, WindowedValue output) { checkArgument(bundles.containsKey(tag), "Unknown output tag %s", tag); - ((UncommittedBundle) bundles.get(tag)).add(output); + (bundles.get(tag)).add((WindowedValue) output); } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 373490f69c44..57ac8a4e73d2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -19,14 +19,11 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import java.util.Collection; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ProcessFnRunner; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; @@ -34,18 +31,13 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; -import org.joda.time.Instant; @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -144,30 +136,6 @@ private TransformEvaluator>> crea processFn.setStateInternalsFactory(key -> stepContext.stateInternals()); processFn.setTimerInternalsFactory(key -> stepContext.timerInternals()); - OutputWindowedValue outputWindowedValue = - new OutputWindowedValue() { - private final OutputManager outputManager = pde.getOutputManager(); - - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output( - transform.getMainOutputTag(), WindowedValues.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output(tag, WindowedValues.of(output, timestamp, windows, pane)); - } - }; SideInputReader sideInputReader = evaluationContext.createSideInputReader(transform.getSideInputs()); processFn.setSideInputReader(sideInputReader); @@ -175,7 +143,8 @@ public void outputWindowedValue( new OutputAndTimeBoundedSplittableProcessElementInvoker<>( transform.getFn(), options, - outputWindowedValue, + pde.getOutputManager(), + transform.getMainOutputTag(), sideInputReader, ses, // Setting small values here to stimulate frequent checkpointing and better exercise diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java index f153d9144be0..5b69cce0cd3f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.joda.time.Instant; @@ -35,7 +36,7 @@ class FlinkAssignContext throw new IllegalArgumentException( String.format( "%s passed to window assignment must be in a single window, but it was in %s: %s", - WindowedValue.class.getSimpleName(), + WindowedValues.class.getSimpleName(), Iterables.size(value.getWindows()), value.getWindows())); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index e12610dfe51c..a707e366c8a5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.flink.translation.functions; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import java.util.List; import java.util.Map; import java.util.Objects; @@ -36,6 +38,7 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -47,6 +50,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -139,12 +143,12 @@ public void open(Configuration parameters) { // setup DoFnRunner final RuntimeContext runtimeContext = getRuntimeContext(); - final DoFnRunners.OutputManager outputManager; + final WindowedValueMultiReceiver outputManager; if (outputMap.size() == 1) { outputManager = new DoFnOutputManager(); } else { // it has some additional outputs - outputManager = new MultiDoFnOutputManager(outputMap); + outputManager = new MultiDoFnOutputManagerWindowed(outputMap); } final List> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); @@ -199,9 +203,9 @@ interface CollectorAware { void setCollector(Collector> collector); } - static class DoFnOutputManager implements DoFnRunners.OutputManager, CollectorAware { + static class DoFnOutputManager implements WindowedValueMultiReceiver, CollectorAware { - private @Nullable Collector> collector; + private @MonotonicNonNull Collector> collector; DoFnOutputManager() { this(null); @@ -218,26 +222,27 @@ public void setCollector(Collector> collector) { @Override public void output(TupleTag tag, WindowedValue output) { - Objects.requireNonNull(collector) - .collect( - WindowedValues.of( - new RawUnionValue(0 /* single output */, output.getValue()), - output.getTimestamp(), - output.getWindows(), - output.getPane())); + checkStateNotNull(collector); + collector.collect( + WindowedValues.of( + new RawUnionValue(0 /* single output */, output.getValue()), + output.getTimestamp(), + output.getWindows(), + output.getPaneInfo())); } } - static class MultiDoFnOutputManager implements DoFnRunners.OutputManager, CollectorAware { + static class MultiDoFnOutputManagerWindowed + implements WindowedValueMultiReceiver, CollectorAware { - private @Nullable Collector> collector; + private @MonotonicNonNull Collector> collector; private final Map, Integer> outputMap; - MultiDoFnOutputManager(Map, Integer> outputMap) { + MultiDoFnOutputManagerWindowed(Map, Integer> outputMap) { this.outputMap = outputMap; } - MultiDoFnOutputManager( + MultiDoFnOutputManagerWindowed( @Nullable Collector> collector, Map, Integer> outputMap) { this.collector = collector; @@ -251,13 +256,14 @@ public void setCollector(Collector> collector) { @Override public void output(TupleTag tag, WindowedValue output) { - Objects.requireNonNull(collector) - .collect( - WindowedValues.of( - new RawUnionValue(outputMap.get(tag), output.getValue()), - output.getTimestamp(), - output.getWindows(), - output.getPane())); + checkStateNotNull(collector); + + collector.collect( + WindowedValues.of( + new RawUnionValue(outputMap.get(tag), output.getValue()), + output.getTimestamp(), + output.getWindows(), + output.getPaneInfo())); } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 95fc4e8ab8b6..3aa5a0802b38 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -120,12 +121,12 @@ public void reduce( throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); - DoFnRunners.OutputManager outputManager; + WindowedValueMultiReceiver outputManager; if (outputMap.size() == 1) { outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); } else { // it has some additional Outputs - outputManager = new FlinkDoFnFunction.MultiDoFnOutputManager(out, outputMap); + outputManager = new FlinkDoFnFunction.MultiDoFnOutputManagerWindowed(out, outputMap); } final Iterator>> iterator = values.iterator(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 701df1c14267..13414682f8e6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -84,6 +84,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.NoopLock; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -1218,7 +1220,7 @@ BufferedOutputManager create( } /** - * A {@link DoFnRunners.OutputManager} that can buffer its outputs. Uses {@link + * A {@link WindowedValueReceiver} that can buffer its outputs. Uses {@link * PushedBackElementsHandler} to buffer the data. Buffering data is necessary because no elements * can be emitted during {@code snapshotState} which is called when the checkpoint barrier already * has been sent downstream. Emitting elements would break the flow of checkpoint barrier and @@ -1230,7 +1232,7 @@ BufferedOutputManager create( * prepareSnapshotPreBarrier}. When Flink supports unaligned checkpoints, this should become the * default and this class should be removed as in https://github.com/apache/beam/pull/9652. */ - public static class BufferedOutputManager implements DoFnRunners.OutputManager { + public static class BufferedOutputManager implements WindowedValueMultiReceiver { private final TupleTag mainTag; private final Map, OutputTag>> tagsToOutputTags; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 3f8758f610b1..3400e79b225e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -30,7 +30,6 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; @@ -42,8 +41,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -54,7 +52,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.StateInitializationContext; import org.joda.time.Duration; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,27 +142,13 @@ public void initializeState(StateInitializationContext context) throws Exception new OutputAndTimeBoundedSplittableProcessElementInvoker<>( doFn, serializedOptions.get(), - new OutputWindowedValue() { + new WindowedValueMultiReceiver() { @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output( - mainOutputTag, WindowedValues.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output(tag, WindowedValues.of(output, timestamp, windows, pane)); + public void output(TupleTag tag, WindowedValue windowedValue) { + outputManager.output(tag, windowedValue); } }, + mainOutputTag, sideInputReader, executorService, 10000, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnRunnerFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnRunnerFactory.java index 93ecccafe3f4..7b2ac38c9d61 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnRunnerFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnRunnerFactory.java @@ -20,12 +20,12 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -43,7 +43,7 @@ DoFnRunner createRunner( WindowingStrategy windowingStrategy, DataflowExecutionContext.DataflowStepContext stepContext, DataflowExecutionContext.DataflowStepContext userStepContext, - OutputManager outputManager, + WindowedValueMultiReceiver outputManager, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFn.java index 8c467023d63a..3fa44b49a1b5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFn.java @@ -18,15 +18,15 @@ package org.apache.beam.runners.dataflow.worker; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValueReceiver; /** * An interface for functions that execute {@link GroupAlsoByWindow}-like machinery and need access - * to worker internals, such as {@link TimerInternals} and {@link OutputWindowedValue}. + * to worker internals, such as {@link TimerInternals} and {@link WindowedValueReceiver}. */ public abstract class GroupAlsoByWindowFn { /** Processes one input element. */ @@ -35,6 +35,6 @@ public abstract void processElement( PipelineOptions options, StepContext stepContext, SideInputReader sideInputReader, - OutputWindowedValue output) + WindowedValueReceiver output) throws Exception; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java index d21ff3023914..1909a73dc8b9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java @@ -17,20 +17,15 @@ */ package org.apache.beam.runners.dataflow.worker; -import java.util.Collection; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.WindowedValue; -import org.apache.beam.sdk.values.WindowedValues; import org.joda.time.Instant; /** @@ -46,22 +41,19 @@ public class GroupAlsoByWindowFnRunner implements DoFnRunner fn; private final SideInputReader sideInputReader; - private final OutputManager outputManager; - private final TupleTag mainOutputTag; + private final WindowedValueReceiver outputManager; private final StepContext stepContext; public GroupAlsoByWindowFnRunner( PipelineOptions options, GroupAlsoByWindowFn fn, SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag mainOutputTag, + WindowedValueReceiver outputManager, StepContext stepContext) { this.options = options; this.fn = fn; this.sideInputReader = sideInputReader; this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; this.stepContext = stepContext; } @@ -97,29 +89,7 @@ public void onTimer( private void invokeProcessElement(WindowedValue elem) { // This can contain user code. Wrap it in case it throws an exception. try { - OutputWindowedValue output = - new OutputWindowedValue() { - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - WindowedValue windowed = WindowedValues.of(output, timestamp, windows, pane); - outputManager.output(mainOutputTag, windowed); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException(); - } - }; - fn.processElement(elem.getValue(), options, stepContext, sideInputReader, output); + fn.processElement(elem.getValue(), options, stepContext, sideInputReader, outputManager); } catch (Exception ex) { if (ex instanceof RuntimeException) { throw (RuntimeException) ex; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowParDoFnFactory.java index 3424d3fe2380..8f84020f1329 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowParDoFnFactory.java @@ -193,7 +193,6 @@ public ParDoFn create( ((AppliedCombineFn) maybeMergingCombineFn).getSideInputViews(), gabwInputCoder, sideInputReader, - mainOutputTag, executionContext.getStepContext(operationContext)); } else { return new GroupAlsoByWindowsParDoFn( @@ -203,7 +202,6 @@ public ParDoFn create( null, gabwInputCoder, sideInputReader, - mainOutputTag, executionContext.getStepContext(operationContext)); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java index 203883f4a34e..9f5e96297e4b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.LateDataDroppingDoFnRunner; @@ -35,9 +34,9 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.checkerframework.checker.nullness.qual.Nullable; @@ -50,7 +49,6 @@ public class GroupAlsoByWindowsParDoFn im private final PipelineOptions options; private final SideInputReader sideInputReader; - private final TupleTag>> mainOutputTag; private final DataflowExecutionContext.DataflowStepContext stepContext; private final GroupAlsoByWindowFn>> doFn; private final WindowingStrategy windowingStrategy; @@ -77,12 +75,10 @@ public class GroupAlsoByWindowsParDoFn im Iterable> sideInputViews, Coder inputCoder, SideInputReader sideInputReader, - TupleTag>> mainOutputTag, DataflowExecutionContext.DataflowStepContext stepContext) { this.options = options; this.sideInputReader = sideInputReader; - this.mainOutputTag = mainOutputTag; this.stepContext = stepContext; this.doFn = doFn; this.windowingStrategy = windowingStrategy; @@ -172,20 +168,12 @@ public void abort() throws Exception { * {@link StreamingGroupAlsoByWindowViaWindowSetFn}. */ private DoFnRunner>> createRunner() { - OutputManager outputManager = - new OutputManager() { - @Override - public void output(TupleTag tag, WindowedValue output) { - checkState( - tag.equals(mainOutputTag), - "Must only output to main output tag (%s), but was %s", - tag, - mainOutputTag); - try { - receiver.process(output); - } catch (Throwable t) { - throw new RuntimeException(t); - } + WindowedValueReceiver>> outputReceiver = + output -> { + try { + receiver.process(output); + } catch (Throwable t) { + throw new RuntimeException(t); } }; @@ -194,7 +182,7 @@ public void output(TupleTag tag, WindowedValue output) { DoFnRunner>> basicRunner = new GroupAlsoByWindowFnRunner<>( - options, doFn, sideInputReader, outputManager, mainOutputTag, stepContext); + options, doFn, sideInputReader, outputReceiver, stepContext); if (doFn instanceof StreamingGroupAlsoByWindowViaWindowSetFn) { DoFnRunner, KV>> streamingGABWRunner = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java index 45fca54d58fd..52fcec439aaf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java @@ -21,13 +21,13 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -51,7 +51,7 @@ public DoFnRunner createRunner( WindowingStrategy windowingStrategy, DataflowExecutionContext.DataflowStepContext stepContext, DataflowExecutionContext.DataflowStepContext userStepContext, - OutputManager outputManager, + WindowedValueMultiReceiver outputManager, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping) { DoFnRunner fnRunner = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java index 16b198704449..00c8192b1e40 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; @@ -53,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.DoFnInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -249,8 +249,8 @@ public void startBundle(Receiver... receivers) throws Exception { private void reallyStartBundle() throws Exception { checkState(fnRunner == null, "bundle already started (or not properly finished)"); - OutputManager outputManager = - new OutputManager() { + WindowedValueMultiReceiver outputManager = + new WindowedValueMultiReceiver() { final Map, OutputReceiver> undeclaredOutputs = new HashMap<>(); private @Nullable Receiver getReceiverOrNull(TupleTag tag) { @@ -263,7 +263,7 @@ private void reallyStartBundle() throws Exception { } @Override - public void output(TupleTag tag, WindowedValue output) { + public void output(TupleTag tag, WindowedValue output) { outputsPerElementTracker.onOutput(); Receiver receiver = getReceiverOrNull(tag); if (receiver == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java index d0461b4165af..4473dff8e94a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java @@ -24,18 +24,15 @@ import static org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.SimpleDoFnRunner; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn; @@ -50,17 +47,14 @@ import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.DoFnInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; -import org.joda.time.Instant; /** * A {@link ParDoFnFactory} to create instances of user {@link ProcessFn} according to @@ -134,7 +128,7 @@ public DoFnRunner>, OutputT> crea WindowingStrategy windowingStrategy, DataflowExecutionContext.DataflowStepContext stepContext, DataflowExecutionContext.DataflowStepContext userStepContext, - OutputManager outputManager, + WindowedValueMultiReceiver outputManager, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping) { if (this.ses.get() == null) { @@ -153,27 +147,8 @@ public DoFnRunner>, OutputT> crea new OutputAndTimeBoundedSplittableProcessElementInvoker<>( processFn.getFn(), options, - new OutputWindowedValue() { - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output( - mainOutputTag, WindowedValues.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - T output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output(tag, WindowedValues.of(output, timestamp, windows, pane)); - } - }, + outputManager, + mainOutputTag, sideInputReader, ses.get(), // Commit at least once every 10 seconds or 10k records. This keeps the watermark diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowReshuffleFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowReshuffleFn.java index cf334d451ebc..b231540eb5f9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowReshuffleFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowReshuffleFn.java @@ -19,12 +19,12 @@ import java.util.Collections; import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.dataflow.worker.util.StreamingGroupAlsoByWindowFn; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; @@ -49,16 +49,11 @@ public void processElement( PipelineOptions options, StepContext stepContext, SideInputReader sideInputReader, - OutputWindowedValue>> output) + WindowedValueReceiver>> output) throws Exception { - @SuppressWarnings("unchecked") K key = element.key(); for (WindowedValue item : element.elementsIterable()) { - output.outputWindowedValue( - KV.of(key, Collections.singletonList(item.getValue())), - item.getTimestamp(), - item.getWindows(), - item.getPane()); + output.output(item.withValue(KV.of(key, Collections.singletonList(item.getValue())))); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowViaWindowSetFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowViaWindowSetFn.java index bfe51b31b116..ec36644d1e68 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowViaWindowSetFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowViaWindowSetFn.java @@ -19,7 +19,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; @@ -32,6 +31,7 @@ import org.apache.beam.runners.dataflow.worker.util.StreamingGroupAlsoByWindowFn; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowingStrategy; @@ -75,7 +75,7 @@ public void processElement( PipelineOptions options, StepContext stepContext, SideInputReader sideInputReader, - OutputWindowedValue> output) + WindowedValueReceiver> output) throws Exception { K key = keyedWorkItem.key(); StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowAndCombineFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowAndCombineFn.java index 77f45eed23e3..1a66f4484292 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowAndCombineFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowAndCombineFn.java @@ -19,13 +19,11 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; import org.apache.beam.sdk.options.PipelineOptions; @@ -36,9 +34,11 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -76,7 +76,7 @@ public void processElement( PipelineOptions options, StepContext stepContext, SideInputReader sideInputReader, - OutputWindowedValue> output) + WindowedValueReceiver> output) throws Exception { final PerKeyCombineFnRunner perKeyCombineFnRunner; if (perKeyCombineFn instanceof CombineFn) { @@ -190,15 +190,16 @@ private void closeWindow( W window, Map accumulators, Map accumulatorOutputTimes, - OutputWindowedValue> output) { + WindowedValueReceiver> output) { AccumT accum = accumulators.remove(window); Instant timestamp = accumulatorOutputTimes.remove(window); checkState(accum != null && timestamp != null); - output.outputWindowedValue( - KV.of(key, perKeyCombineFnRunner.extractOutput(accum, window)), - timestamp, - Arrays.asList(window), - PaneInfo.ON_TIME_AND_ONLY_FIRING); + output.output( + WindowedValues.of( + KV.of(key, perKeyCombineFnRunner.extractOutput(accum, window)), + timestamp, + window, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); } private interface PerKeyCombineFnRunner { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleFn.java index 8a1602a50eea..1ad2a7ee365f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleFn.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.dataflow.worker.util; import java.util.Collections; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; @@ -49,15 +49,11 @@ public void processElement( PipelineOptions options, StepContext stepContext, SideInputReader sideInputReader, - OutputWindowedValue>> output) + WindowedValueReceiver>> output) throws Exception { K key = element.getKey(); for (WindowedValue item : element.getValue()) { - output.outputWindowedValue( - KV.>of(key, Collections.singletonList(item.getValue())), - item.getTimestamp(), - item.getWindows(), - item.getPane()); + output.output(item.withValue(KV.of(key, Collections.singletonList(item.getValue())))); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java index aabbf08be726..3fe6167634c2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java @@ -20,24 +20,24 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.PeekingReiterator; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable; import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator; import org.apache.beam.sdk.util.common.Reiterable; import org.apache.beam.sdk.util.common.Reiterator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap; @@ -90,7 +90,7 @@ public void processElement( PipelineOptions options, StepContext stepContext, SideInputReader sideInputReader, - OutputWindowedValue>> output) + WindowedValueReceiver>> output) throws Exception { K key = element.getKey(); // This iterable is required to be in order of increasing timestamps @@ -125,11 +125,12 @@ public void processElement( // Iterating through the WindowReiterable may advance iterator as an optimization // for as long as it detects that there are no new windows. windows.put(window.maxTimestamp(), window); - output.outputWindowedValue( - KV.of(key, (Iterable) new WindowReiterable(iterator, window)), - strategy.getTimestampCombiner().assign(typedWindow, e.getTimestamp()), - Arrays.asList(window), - PaneInfo.ON_TIME_AND_ONLY_FIRING); + output.output( + WindowedValues.of( + KV.of(key, (Iterable) new WindowReiterable(iterator, window)), + strategy.getTimestampCombiner().assign(typedWindow, e.getTimestamp()), + window, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); } } // Copy the iterator in case the next DoFn cached its version of the iterator instead diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaOutputBufferFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaOutputBufferFn.java index 767063661149..12fc97ca9015 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaOutputBufferFn.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.NullSideInputReader; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; @@ -33,6 +32,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; @@ -65,7 +65,7 @@ public void processElement( PipelineOptions options, StepContext stepContext, SideInputReader sideInputReader, - OutputWindowedValue> output) + WindowedValueReceiver> output) throws Exception { K key = element.getKey(); // Used with Batch, we know that all the data is available for this key. We can't use the diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index c27b386a76c8..c89a031b3728 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -72,6 +72,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -124,7 +125,8 @@ public void testEmpty() throws Exception { ListOutputManager outputManager = new ListOutputManager(); DoFnRunner, KV>> runner = makeRunner( - outputTag, outputManager, WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))); + output -> outputManager.output(outputTag, output), + WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))); runner.startBundle(); @@ -195,7 +197,8 @@ public void testFixedWindows() throws Exception { ListOutputManager outputManager = new ListOutputManager(); DoFnRunner, KV>> runner = makeRunner( - outputTag, outputManager, WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))); + output -> outputManager.output(outputTag, output), + WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))); when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(0)); @@ -252,8 +255,7 @@ public void testSlidingWindows() throws Exception { ListOutputManager outputManager = new ListOutputManager(); DoFnRunner, KV>> runner = makeRunner( - outputTag, - outputManager, + output -> outputManager.output(outputTag, output), WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) .withTimestampCombiner(TimestampCombiner.EARLIEST)); @@ -340,7 +342,8 @@ public void testSlidingWindowsAndLateData() throws Exception { new StepContextStateInternalsFactory(stepContext), StringUtf8Coder.of()); DoFnRunner, KV>> runner = - makeRunner(outputTag, outputManager, windowingStrategy, fn); + makeRunnerForGabwFn( + output -> outputManager.output(outputTag, output), windowingStrategy, fn); when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(15)); @@ -426,8 +429,7 @@ public void testSessions() throws Exception { ListOutputManager outputManager = new ListOutputManager(); DoFnRunner, KV>> runner = makeRunner( - outputTag, - outputManager, + output -> outputManager.output(outputTag, output), WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) .withTimestampCombiner(TimestampCombiner.EARLIEST)); @@ -543,9 +545,8 @@ public void testSessionsCombine() throws Exception { ListOutputManager outputManager = new ListOutputManager(); DoFnRunner, KV> runner = - makeRunner( - outputTag, - outputManager, + makeRunnerForAppliedCombineFn( + output -> outputManager.output(outputTag, output), WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))), appliedCombineFn); @@ -601,8 +602,7 @@ public void testSessionsCombine() throws Exception { } private DoFnRunner, KV>> makeRunner( - TupleTag>> outputTag, - DoFnRunners.OutputManager outputManager, + WindowedValueReceiver>> outputReceiver, WindowingStrategy windowingStrategy) throws Exception { @@ -612,12 +612,11 @@ private DoFnRunner, KV>> new StepContextStateInternalsFactory(stepContext), StringUtf8Coder.of()); - return makeRunner(outputTag, outputManager, windowingStrategy, fn); + return makeRunnerForGabwFn(outputReceiver, windowingStrategy, fn); } - private DoFnRunner, KV> makeRunner( - TupleTag> outputTag, - DoFnRunners.OutputManager outputManager, + private DoFnRunner, KV> makeRunnerForAppliedCombineFn( + WindowedValueReceiver> outputManager, WindowingStrategy windowingStrategy, AppliedCombineFn combineFn) throws Exception { @@ -629,13 +628,12 @@ private DoFnRunner, KV> makeRunner( combineFn, StringUtf8Coder.of()); - return makeRunner(outputTag, outputManager, windowingStrategy, fn); + return makeRunnerForGabwFn(outputManager, windowingStrategy, fn); } private - DoFnRunner, KV> makeRunner( - TupleTag> outputTag, - DoFnRunners.OutputManager outputManager, + DoFnRunner, KV> makeRunnerForGabwFn( + WindowedValueReceiver> outputManager, WindowingStrategy windowingStrategy, GroupAlsoByWindowFn, KV> fn) throws Exception { @@ -645,7 +643,6 @@ DoFnRunner, KV> makeRunner( fn, NullSideInputReader.empty(), outputManager, - outputTag, stepContext); return DoFnRunners.lateDataDroppingRunner( doFnRunner, stepContext.timerInternals(), windowingStrategy); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java index 3574aa22b70d..c169c9b46a57 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java @@ -25,7 +25,6 @@ import java.util.Collection; import java.util.List; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.StateInternals; @@ -44,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -184,7 +184,7 @@ public void testFixedWindows() throws Exception { } private DoFnRunner, KV>> makeRunner( - TupleTag>> outputTag, DoFnRunners.OutputManager outputManager) { + TupleTag>> outputTag, WindowedValueMultiReceiver outputManager) { GroupAlsoByWindowFn, KV>> fn = new StreamingGroupAlsoByWindowReshuffleFn<>(); @@ -195,14 +195,13 @@ private DoFnRunner, KV>> private DoFnRunner, KV> makeRunner( TupleTag> outputTag, - DoFnRunners.OutputManager outputManager, + WindowedValueMultiReceiver outputManager, GroupAlsoByWindowFn, KV> fn) { return new GroupAlsoByWindowFnRunner<>( PipelineOptionsFactory.create(), fn, NullSideInputReader.empty(), - outputManager, - outputTag, + output -> outputManager.output(outputTag, output), stepContext); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java index c400c686f764..de4ec8e5aa9e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Set; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; @@ -50,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -176,7 +176,7 @@ private IntervalWindow window(long start, long end) { @SuppressWarnings("unchecked") private StreamingKeyedWorkItemSideInputDoFnRunner< String, Integer, KV, IntervalWindow> - createRunner(DoFnRunners.OutputManager outputManager) throws Exception { + createRunner(WindowedValueMultiReceiver outputManager) throws Exception { CoderRegistry registry = CoderRegistry.createDefault(); Coder keyCoder = StringUtf8Coder.of(); Coder inputCoder = BigEndianIntegerCoder.of(); @@ -197,8 +197,8 @@ private IntervalWindow window(long start, long end) { PipelineOptionsFactory.create(), doFn.asDoFn(), mockSideInputReader, - outputManager, - mainOutputTag, + (WindowedValue> windowedValue) -> + outputManager.output(mainOutputTag, windowedValue), stepContext); return new StreamingKeyedWorkItemSideInputDoFnRunner< String, Integer, KV, IntervalWindow>( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java index 9a6544122c69..9b92d7d48431 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java @@ -59,6 +59,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -383,19 +384,17 @@ public void testMultipleSideInputs() throws Exception { assertThat(sideInputFetcher.elementBag(createWindow(0)).read(), Matchers.emptyIterable()); } - @SuppressWarnings("unchecked") private StreamingSideInputDoFnRunner createRunner( - DoFnRunners.OutputManager outputManager, + WindowedValueMultiReceiver outputManager, List> views, StreamingSideInputFetcher sideInputFetcher) throws Exception { return createRunner(WINDOW_FN, outputManager, views, sideInputFetcher); } - @SuppressWarnings("unchecked") private StreamingSideInputDoFnRunner createRunner( WindowFn windowFn, - DoFnRunners.OutputManager outputManager, + WindowedValueMultiReceiver outputManager, List> views, StreamingSideInputFetcher sideInputFetcher) throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleDoFnTest.java index 613fa619452b..79e2aee7fedd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleDoFnTest.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.StateInternals; @@ -38,6 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -87,7 +87,8 @@ List>> runGABW( ListOutputManager outputManager = new ListOutputManager(); DoFnRunner>>, KV> runner = - makeRunner(gabwFactory, windowingStrategy, outputTag, outputManager); + makeRunner( + gabwFactory, windowingStrategy, output -> outputManager.output(outputTag, output)); runner.startBundle(); @@ -115,8 +116,7 @@ List>> runGABW( DoFnRunner>>, KV> makeRunner( GroupAlsoByWindowDoFnFactory fnFactory, WindowingStrategy windowingStrategy, - TupleTag> outputTag, - DoFnRunners.OutputManager outputManager) { + WindowedValueReceiver> outputManager) { final StepContext stepContext = new TestStepContext(STEP_NAME); @@ -130,7 +130,6 @@ DoFnRunner>>, KV> makeRunner( fn, NullSideInputReader.empty(), outputManager, - outputTag, stepContext); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/GroupAlsoByWindowProperties.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/GroupAlsoByWindowProperties.java index 2ade098c9ef8..bca4efa518f2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/GroupAlsoByWindowProperties.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/GroupAlsoByWindowProperties.java @@ -30,7 +30,6 @@ import java.util.List; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.NullSideInputReader; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.sdk.options.PipelineOptions; @@ -43,9 +42,9 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; @@ -608,30 +607,16 @@ private static TimestampedValue> getOnlyElementInWin return TimestampedValue.of(res.getValue(), res.getTimestamp()); } - private static class TestOutput implements OutputWindowedValue> { + private static class TestOutput implements WindowedValueReceiver> { private final List>> output = new ArrayList<>(); @Override - public void outputWindowedValue( - KV output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - this.output.add(WindowedValues.of(output, timestamp, windows, pane)); + public void output(WindowedValue> valueWithMetadata) { + this.output.add(valueWithMetadata); } public List>> getOutput() { return output; } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException(); - } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/ListOutputManager.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/ListOutputManager.java index eccd6ec41666..de9572e4b7f7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/ListOutputManager.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/ListOutputManager.java @@ -17,10 +17,9 @@ */ package org.apache.beam.runners.dataflow.worker.util; -import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -30,38 +29,24 @@ * An implementation of {@code OutputManager} using simple lists, for testing and in-memory contexts * such as the {@code DirectRunner}. */ -public class ListOutputManager implements OutputManager { +public class ListOutputManager implements WindowedValueMultiReceiver { private Map, List>> outputLists = Maps.newHashMap(); - @Override - public void output(TupleTag tag, WindowedValue output) { - @SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "unchecked" - }) - List> outputList = (List) outputLists.get(tag); - - if (outputList == null) { + public List> getOutput(TupleTag tag) { + List> outputList = (List>) (Object) outputLists.get(tag); + if (outputList != null) { + return outputList; + } else { outputList = Lists.newArrayList(); - @SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "unchecked" - }) - List> untypedList = (List) outputList; - outputLists.put(tag, untypedList); + outputLists.put(tag, (List>) (Object) outputList); + return outputList; } - - outputList.add(output); } - public List> getOutput(TupleTag tag) { - // Safe cast by design, inexpressible in Java without rawtypes - @SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "unchecked" - }) - List> outputList = (List) outputLists.get(tag); - return (outputList != null) ? outputList : Collections.>emptyList(); + @Override + public void output(TupleTag tag, WindowedValue output) { + List> outputList = getOutput(tag); + outputList.add(output); } } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java index dcc4c39cd954..8727a7952ed2 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java @@ -42,7 +42,6 @@ import javax.annotation.CheckReturnValue; import javax.annotation.Nonnull; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.SideInputHandler; @@ -59,6 +58,7 @@ import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -325,7 +325,7 @@ private static Boolean isCooperativenessAllowed( * An output manager that stores the output in an ArrayList, one for each output ordinal, and a * way to drain to outbox ({@link #tryFlush()}). */ - static class JetOutputManager implements DoFnRunners.OutputManager { + static class JetOutputManager implements WindowedValueMultiReceiver { private final Outbox outbox; private final Map, Coder> outputCoders; diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AssignWindowP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AssignWindowP.java index da940a2b8cc7..04d843414525 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AssignWindowP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AssignWindowP.java @@ -111,7 +111,7 @@ public void setValue(WindowedValue value) { throw new IllegalArgumentException( String.format( "%s passed to window assignment must be in a single window, but it was in %s: %s", - WindowedValue.class.getSimpleName(), + WindowedValues.class.getSimpleName(), Iterables.size(value.getWindows()), value.getWindows())); } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java index 4aad7091f2da..333c2c74a127 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java @@ -37,7 +37,6 @@ import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.runners.core.NullSideInputReader; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; @@ -50,11 +49,10 @@ import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; @@ -227,29 +225,13 @@ private class KeyManager { TriggerTranslation.toProto(windowingStrategy.getTrigger()))), stateInternals, timerInternals, - new OutputWindowedValue>>() { + new WindowedValueReceiver>>() { @Override - public void outputWindowedValue( - KV> output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - WindowedValue>> windowedValue = - WindowedValues.of(output, timestamp, windows, pane); + public void output(WindowedValue>> windowedValue) { byte[] encodedValue = Utils.encode(windowedValue, outputCoder); //noinspection ResultOfMethodCallIgnored appendableTraverser.append(encodedValue); } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException("Grouping should not use side outputs"); - } }, NullSideInputReader.empty(), SystemReduceFn.buffering(inputValueValueCoder), diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 11c1e8e3955a..9c508f14e00e 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -30,7 +30,6 @@ import java.util.function.Function; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; @@ -52,6 +51,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.util.construction.graph.ExecutableStage; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -490,7 +490,7 @@ static CompletionStage> createOutputFuture( } /** - * Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that + * Factory class to create an {@link org.apache.beam.sdk.util.WindowedValueMultiReceiver} that * emits values to the main output only, which is a single {@link * org.apache.beam.sdk.values.PCollection}. * @@ -498,19 +498,19 @@ static CompletionStage> createOutputFuture( */ public static class SingleOutputManagerFactory implements OutputManagerFactory { @Override - public DoFnRunners.OutputManager create(OpEmitter emitter) { + public WindowedValueMultiReceiver create(OpEmitter emitter) { return createOutputManager(emitter, null); } @Override - public DoFnRunners.OutputManager create( + public WindowedValueMultiReceiver create( OpEmitter emitter, FutureCollector collector) { return createOutputManager(emitter, collector); } - private DoFnRunners.OutputManager createOutputManager( + private WindowedValueMultiReceiver createOutputManager( OpEmitter emitter, FutureCollector collector) { - return new DoFnRunners.OutputManager() { + return new WindowedValueMultiReceiver() { @Override @SuppressWarnings("unchecked") public void output(TupleTag tupleTag, WindowedValue windowedValue) { @@ -530,7 +530,7 @@ public void output(TupleTag tupleTag, WindowedValue windowedValue) { } /** - * Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that + * Factory class to create an {@link org.apache.beam.runners.core.WindowedValueMultiReceiver} that * emits values to the main output as well as the side outputs via union type {@link * RawUnionValue}. */ @@ -542,19 +542,19 @@ public MultiOutputManagerFactory(Map, Integer> tagToIndexMap) { } @Override - public DoFnRunners.OutputManager create(OpEmitter emitter) { + public WindowedValueMultiReceiver create(OpEmitter emitter) { return createOutputManager(emitter, null); } @Override - public DoFnRunners.OutputManager create( + public WindowedValueMultiReceiver create( OpEmitter emitter, FutureCollector collector) { return createOutputManager(emitter, collector); } - private DoFnRunners.OutputManager createOutputManager( + private WindowedValueMultiReceiver createOutputManager( OpEmitter emitter, FutureCollector collector) { - return new DoFnRunners.OutputManager() { + return new WindowedValueMultiReceiver() { @Override @SuppressWarnings("unchecked") public void output(TupleTag tupleTag, WindowedValue windowedValue) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java index b77a9de56a00..a9fe11de4d92 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.TupleTag; @@ -119,7 +120,7 @@ public void open( SamzaStoreStateInternals.createNonKeyedStateInternalsFactory( transformId, context.getTaskContext(), pipelineOptions); - final DoFnRunners.OutputManager outputManager = outputManagerFactory.create(emitter); + final WindowedValueMultiReceiver outputManager = outputManagerFactory.create(emitter); this.stateInternalsFactory = new SamzaStoreStateInternals.Factory<>( diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java index 5d4047dc9672..c7bb7caa19aa 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java @@ -18,13 +18,13 @@ package org.apache.beam.runners.samza.runtime; import java.io.Serializable; -import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; -/** Factory class to create {@link DoFnRunners.OutputManager}. */ +/** Factory class to create {@link WindowedValueMultiReceiver}. */ public interface OutputManagerFactory extends Serializable { - DoFnRunners.OutputManager create(OpEmitter emitter); + WindowedValueMultiReceiver create(OpEmitter emitter); - default DoFnRunners.OutputManager create( + default WindowedValueMultiReceiver create( OpEmitter emitter, FutureCollector collector) { return create(emitter); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index f1fd1b303044..7129ba9145eb 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.util.construction.Timer; import org.apache.beam.sdk.util.construction.graph.ExecutableStage; import org.apache.beam.sdk.util.construction.graph.PipelineNode; @@ -89,7 +90,7 @@ public static DoFnRunner create( SideInputHandler sideInputHandler, SamzaTimerInternalsFactory timerInternalsFactory, Coder keyCoder, - DoFnRunners.OutputManager outputManager, + WindowedValueMultiReceiver outputManager, Coder inputCoder, List> sideOutputTags, Map, Coder> outputCoders, @@ -210,7 +211,7 @@ public static DoFnRunner createPortable( SamzaStoreStateInternals.Factory nonKeyedStateInternalsFactory, SamzaTimerInternalsFactory timerInternalsFactory, SamzaPipelineOptions pipelineOptions, - DoFnRunners.OutputManager outputManager, + WindowedValueMultiReceiver outputManager, StageBundleFactory stageBundleFactory, SamzaExecutionContext samzaExecutionContext, TupleTag mainOutputTag, @@ -263,7 +264,7 @@ static class SdkHarnessDoFnRunner implements DoFnRunner> idToTupleTagMap; private final LinkedBlockingQueue> outputQueue = new LinkedBlockingQueue<>(); @@ -281,7 +282,7 @@ private SdkHarnessDoFnRunner( String stepName, SamzaTimerInternalsFactory timerInternalsFactory, WindowingStrategy windowingStrategy, - DoFnRunners.OutputManager outputManager, + WindowedValueMultiReceiver outputManager, StageBundleFactory stageBundleFactory, Map> idToTupleTagMap, BagState> bundledEventsBag, diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java index e8d8b64c381a..f7886c95f123 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java @@ -27,7 +27,6 @@ import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; import org.apache.beam.runners.core.StateInternals; @@ -43,7 +42,7 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -121,7 +120,7 @@ public void open( SamzaStoreStateInternals.createNonKeyedStateInternalsFactory( transformId, context.getTaskContext(), pipelineOptions); - final DoFnRunners.OutputManager outputManager = outputManagerFactory.create(emitter); + final WindowedValueMultiReceiver outputManager = outputManagerFactory.create(emitter); this.stateInternalsFactory = new SamzaStoreStateInternals.Factory<>( @@ -162,26 +161,8 @@ public void open( new OutputAndTimeBoundedSplittableProcessElementInvoker<>( processElements.getFn(), pipelineOptions, - new OutputWindowedValue() { - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputWindowedValue(mainOutputTag, output, timestamp, windows, pane); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputManager.output(tag, WindowedValues.of(output, timestamp, windows, pane)); - } - }, + outputManager, + mainOutputTag, NullSideInputReader.empty(), ses, 10000, diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory.java index 8d289f6b870c..1696a5c81cb1 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory.java @@ -26,10 +26,10 @@ import java.util.Map; import java.util.function.Supplier; import javax.annotation.CheckForNull; -import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnRunnerFactory.DoFnRunnerWithTeardown; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator; @@ -40,7 +40,7 @@ /** * Abstract factory to create a {@link DoFnPartitionIt DoFn partition iterator} using a customizable - * {@link DoFnRunners.OutputManager}. + * {@link WindowedValueMultiReceiver}. */ abstract class DoFnPartitionIteratorFactory implements Function1>, Iterator>, Serializable { @@ -91,7 +91,7 @@ public Iterator apply(Iterator> it) { } /** Output manager emitting outputs of type {@link OutT} to the buffer. */ - abstract DoFnRunners.OutputManager outputManager(Deque buffer); + abstract WindowedValueMultiReceiver outputManager(Deque buffer); /** * {@link DoFnPartitionIteratorFactory} emitting a single output of type {@link WindowedValue} of @@ -107,8 +107,8 @@ private SingleOut( } @Override - DoFnRunners.OutputManager outputManager(Deque> buffer) { - return new DoFnRunners.OutputManager() { + WindowedValueMultiReceiver outputManager(Deque> buffer) { + return new WindowedValueMultiReceiver() { @Override public void output(TupleTag tag, WindowedValue output) { buffer.add((WindowedValue) output); @@ -136,8 +136,8 @@ public MultiOut( } @Override - DoFnRunners.OutputManager outputManager(Deque>> buffer) { - return new DoFnRunners.OutputManager() { + WindowedValueMultiReceiver outputManager(Deque>> buffer) { + return new WindowedValueMultiReceiver() { @Override public void output(TupleTag tag, WindowedValue output) { // Additional unused outputs can be skipped here. In that case columnIdx is null. diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java index a2f81019910f..350f7daa56cd 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader; @@ -37,6 +36,8 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.ParDoTranslation; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -66,7 +67,7 @@ interface DoFnRunnerWithTeardown extends DoFnRunner { * {@link DoFnRunner#startBundle()} are already invoked by the factory. */ abstract DoFnRunnerWithTeardown create( - PipelineOptions options, MetricsAccumulator metrics, OutputManager output); + PipelineOptions options, MetricsAccumulator metrics, WindowedValueMultiReceiver output); /** * Fuses the factory for the following {@link DoFnRunner} into a single factory that processes @@ -124,13 +125,13 @@ DoFnRunnerFactory fuse(DoFnRunnerFactory next) { @Override DoFnRunnerWithTeardown create( - PipelineOptions options, MetricsAccumulator metrics, OutputManager output) { + PipelineOptions options, MetricsAccumulator metrics, WindowedValueMultiReceiver output) { DoFnRunner simpleRunner = DoFnRunners.simpleRunner( options, doFn, CachedSideInputReader.of(sideInputReader, sideInputs.values()), - filterMainOutput ? new FilteredOutput(output, mainOutput) : output, + filterMainOutput ? new FilteredOutput<>(output, mainOutput) : output, mainOutput, additionalOutputs, new NoOpStepContext(), @@ -153,22 +154,23 @@ DoFnRunnerWithTeardown create( } /** - * Delegate {@link OutputManager} that only forwards outputs matching the provided tag. This is - * used in cases where unused, obsolete outputs get dropped to avoid unnecessary caching. + * Delegate {@link WindowedValueMultiReceiver} that only forwards outputs matching the provided + * tag. This is used in cases where unused, obsolete outputs get dropped to avoid unnecessary + * caching. */ - private static class FilteredOutput implements OutputManager { - final OutputManager outputManager; - final TupleTag tupleTag; + private static class FilteredOutput implements WindowedValueMultiReceiver { + final WindowedValueMultiReceiver outputManager; + final TupleTag tupleTag; - FilteredOutput(OutputManager outputManager, TupleTag tupleTag) { + FilteredOutput(WindowedValueMultiReceiver outputManager, TupleTag tupleTag) { this.outputManager = outputManager; this.tupleTag = tupleTag; } @Override - public void output(TupleTag tag, WindowedValue output) { - if (tupleTag.equals(tag)) { - outputManager.output(tag, output); + public void output(TupleTag tag, WindowedValue value) { + if (this.tupleTag.equals(tag)) { + outputManager.output(tag, value); } } } @@ -204,7 +206,7 @@ private static class FusedRunnerFactory extends DoFnRunnerFactory create( - PipelineOptions options, MetricsAccumulator metrics, OutputManager output) { + PipelineOptions options, MetricsAccumulator metrics, WindowedValueMultiReceiver output) { return new FusedRunner<>(options, metrics, output, factories); } @@ -220,7 +222,7 @@ private static class FusedRunner implements DoFnRunnerWithTeardown> factories) { runners = new DoFnRunnerWithTeardown[factories.size()]; runners[runners.length - 1] = @@ -230,8 +232,8 @@ private static class FusedRunner implements DoFnRunnerWithTeardown runner; FusedOutput(DoFnRunnerWithTeardown runner) { diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java index 61c568dc2585..ea436c24634b 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java @@ -18,12 +18,10 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.function.Supplier; import org.apache.beam.runners.core.InMemoryTimerInternals; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; @@ -34,12 +32,10 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; -import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.joda.time.Instant; @@ -88,7 +84,7 @@ public Iterator>>> call( timerInternals.advanceProcessingTime(Instant.now()); timerInternals.advanceSynchronizedProcessingTime(Instant.now()); StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); - GABWOutputWindowedValue outputter = new GABWOutputWindowedValue<>(); + GABWWindowedValueReceiver outputter = new GABWWindowedValueReceiver<>(); ReduceFnRunner, W> reduceFnRunner = new ReduceFnRunner<>( @@ -145,27 +141,13 @@ private void fireEligibleTimers( } } - private static class GABWOutputWindowedValue - implements OutputWindowedValue>> { + private static class GABWWindowedValueReceiver + implements WindowedValueReceiver>> { private final List>>> outputs = new ArrayList<>(); @Override - public void outputWindowedValue( - KV> output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputs.add(WindowedValues.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs."); + public void output(WindowedValue>> value) { + outputs.add(value); } Iterable>>> getOutputs() { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 1b9290ca7175..2c54f90badbe 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.LateDataUtils; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; @@ -50,12 +49,10 @@ import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; -import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; @@ -102,32 +99,17 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class); private static class OutputWindowedValueHolder - implements OutputWindowedValue>> { + implements WindowedValueReceiver>> { private final List>>> windowedValues = new ArrayList<>(); @Override - public void outputWindowedValue( - final KV> output, - final Instant timestamp, - final Collection windows, - final PaneInfo pane) { - windowedValues.add(WindowedValues.of(output, timestamp, windows, pane)); + public void output(final WindowedValue>> value) { + windowedValues.add(value); } private List>>> getWindowedValues() { return windowedValues; } - - @Override - public void outputWindowedValue( - final TupleTag tag, - final AdditionalOutputT output, - final Instant timestamp, - final Collection windows, - final PaneInfo pane) { - throw new UnsupportedOperationException( - "Tagged outputs are not allowed in GroupAlsoByWindow."); - } } private static class UpdateStateByKeyFunction diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index ea620bfe6605..eb1a6078ad87 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -18,12 +18,10 @@ package org.apache.beam.runners.spark.translation; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.InMemoryTimerInternals; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; @@ -34,12 +32,10 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; -import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.api.java.function.FlatMapFunction; import org.joda.time.Instant; @@ -80,7 +76,7 @@ public Iterator>>> call( timerInternals.advanceProcessingTime(Instant.now()); timerInternals.advanceSynchronizedProcessingTime(Instant.now()); StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); - GABWOutputWindowedValue outputter = new GABWOutputWindowedValue<>(); + GABWWindowedValueReceiver outputter = new GABWWindowedValueReceiver<>(); ReduceFnRunner, W> reduceFnRunner = new ReduceFnRunner<>( @@ -137,27 +133,13 @@ private void fireEligibleTimers( } } - private static class GABWOutputWindowedValue - implements OutputWindowedValue>> { + private static class GABWWindowedValueReceiver + implements WindowedValueReceiver>> { private final List>>> outputs = new ArrayList<>(); @Override - public void outputWindowedValue( - KV> output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputs.add(WindowedValues.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs."); + public void output(WindowedValue>> value) { + outputs.add(value); } Iterable>>> getOutputs() { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java index d202ac04cac0..8f24c4f0329f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java @@ -28,12 +28,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; @@ -50,10 +50,10 @@ public interface SparkInputDataProcessor { /** - * @return {@link OutputManager} to be used by {@link org.apache.beam.runners.core.DoFnRunner} for - * emitting processing results + * @return {@link WindowedValueMultiReceiver} to be used by {@link + * org.apache.beam.runners.core.DoFnRunner} for emitting processing results */ - OutputManager getOutputManager(); + WindowedValueMultiReceiver getOutputManager(); /** * Creates a transformation which processes input partition data and returns output results as @@ -94,7 +94,7 @@ class UnboundedSparkInputDataProcessor private final UnboundedDoFnOutputManager outputManager = new UnboundedDoFnOutputManager(); @Override - public OutputManager getOutputManager() { + public WindowedValueMultiReceiver getOutputManager() { return outputManager; } @@ -105,7 +105,7 @@ public Iterator, WindowedValue>> createOutputIterator( } private static class UnboundedDoFnOutputManager - implements OutputManager, Iterable, WindowedValue>> { + implements WindowedValueMultiReceiver, Iterable, WindowedValue>> { private final ArrayDeque, WindowedValue>> outputs = new ArrayDeque<>(); @@ -209,7 +209,7 @@ class BoundedSparkInputDataProcessor private final BoundedDoFnOutputManager outputManager = new BoundedDoFnOutputManager(); @Override - public OutputManager getOutputManager() { + public WindowedValueMultiReceiver getOutputManager() { return outputManager; } @@ -224,7 +224,7 @@ public Iterator, WindowedValue>> createOutputIterator( * attempt to output more elements will block until some elements are consumed. */ private static class BoundedDoFnOutputManager - implements OutputManager, Iterable, WindowedValue>> { + implements WindowedValueMultiReceiver, Iterable, WindowedValue>> { private final LinkedBlockingQueue, WindowedValue>> queue = new LinkedBlockingQueue<>(500); @@ -269,10 +269,10 @@ public Tuple2, WindowedValue> next() { } @Override - public void output(TupleTag tag, WindowedValue output) { + public void output(TupleTag tag, WindowedValue output) { try { Preconditions.checkState(!stopped, "Output called on already stopped manager"); - queue.put(new Tuple2<>(tag, output)); + queue.put(new Tuple2<>(tag, (WindowedValue) output)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java index c6ccec7bf451..2ff06b59f8eb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java @@ -29,12 +29,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.sdk.fn.test.TestExecutors; import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; @@ -210,7 +210,7 @@ public void testUnboundedProcessLifecycle() { @SuppressWarnings({"unchecked", "rawtypes"}) private SparkProcessContext setUpCtx( - DoFnRunners.OutputManager output, int desiredCount, AtomicInteger producedCount) { + WindowedValueMultiReceiver output, int desiredCount, AtomicInteger producedCount) { SparkProcessContext ctx = Mockito.mock(SparkProcessContext.class); TestDoFnRunner runner = new TestDoFnRunner(output, desiredCount, producedCount); @@ -224,13 +224,13 @@ private SparkProcessContext setUpCtx( private static class TestDoFnRunner implements DoFnRunner { - private final DoFnRunners.OutputManager output; + private final WindowedValueMultiReceiver output; private final AtomicInteger producedCount; private final int desiredCount; private final TestDoFn fn = new TestDoFn(); TestDoFnRunner( - DoFnRunners.OutputManager output, int desiredCount, AtomicInteger producedCount) { + WindowedValueMultiReceiver output, int desiredCount, AtomicInteger producedCount) { this.output = output; this.producedCount = producedCount; this.desiredCount = desiredCount; diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/DoFnFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/DoFnFunction.java index 8cef3618023e..4a63b717912e 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/DoFnFunction.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/DoFnFunction.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.DoFnWithExecutionInformation; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.util.construction.Environments; import org.apache.beam.sdk.util.construction.ParDoTranslation; import org.apache.beam.sdk.util.construction.RehydratedComponents; @@ -208,7 +209,7 @@ public void close() { Optional.ofNullable(doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown); } - private static class DoFnOutputManager implements DoFnRunners.OutputManager, Serializable { + private static class DoFnOutputManager implements WindowedValueMultiReceiver, Serializable { // todo need to figure out how this class types are handled private static final long serialVersionUID = 4967375172737408160L; private transient List outputs; diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/GroupByWindowFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/GroupByWindowFunction.java index 3880b8e907cd..ba8999c20c4c 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/GroupByWindowFunction.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/GroupByWindowFunction.java @@ -23,14 +23,12 @@ import java.io.IOException; import java.io.ObjectStreamException; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.logging.Logger; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.InMemoryTimerInternals; -import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.SystemReduceFn; @@ -41,16 +39,14 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.Environments; import org.apache.beam.sdk.util.construction.RehydratedComponents; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TriggerTranslation; import org.apache.beam.sdk.util.construction.WindowingStrategyTranslation; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; -import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; import org.joda.time.Instant; @@ -107,7 +103,7 @@ public void flatMap( timerInternals.advanceProcessingTime(Instant.now()); timerInternals.advanceSynchronizedProcessingTime(Instant.now()); StateInternals stateInternals = InMemoryStateInternals.forKey(key); - GABWOutputWindowedValue outputter = new GABWOutputWindowedValue<>(); + GABWWindowedValueReceiver outputter = new GABWWindowedValueReceiver<>(); ReduceFnRunner, W> reduceFnRunner = new ReduceFnRunner<>( @@ -173,27 +169,13 @@ public void prepare(TSetContext context) { initTransient(); } - private static class GABWOutputWindowedValue - implements OutputWindowedValue>> { + private static class GABWWindowedValueReceiver + implements WindowedValueReceiver>> { private final List>>> outputs = new ArrayList<>(); @Override - public void outputWindowedValue( - KV> output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - outputs.add(WindowedValues.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs."); + public void output(WindowedValue>> output) { + outputs.add(output); } Iterable>>> getOutputs() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueMultiReceiver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueMultiReceiver.java new file mode 100644 index 000000000000..266a410ae3a3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueMultiReceiver.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; + +/** + * Encapsulation of a method of output that can output a value with all of its windowing information + * to a tagged destination. + */ +@Internal +public interface WindowedValueMultiReceiver { + /** + * Outputs to the given {@link TupleTag}. + * + *

Sometiems it is useful to fix a tag to produce a {@link WindowedValueReceiver}. To do so, + * use a lambda to curry this method, as in {@code value -> receiver.output(tag, value)}. + */ + void output(TupleTag tag, WindowedValue value); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueReceiver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueReceiver.java new file mode 100644 index 000000000000..8c5b2434ae5a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValueReceiver.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.values.WindowedValue; + +/** An encapsulated method of output that can output a value with all of its metadata. */ +@Internal +@FunctionalInterface +public interface WindowedValueReceiver { + /** Outputs a value with windowing information. */ + void output(WindowedValue output); +}