Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/trigger_files/beam_PostCommit_Java_DataflowV1.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
Expand Down
1 change: 1 addition & 0 deletions .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31761": "noting that PR #31761 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/32440": "testing datastream optimizations",
"runFor": "#33606",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
Expand All @@ -41,12 +42,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DoFnRunners {
/** Information about how to create output receivers and output to them. */
public interface OutputManager {
/** Outputs a single element to the receiver indicated by the given {@link TupleTag}. */
<T> void output(TupleTag<T> tag, WindowedValue<T> output);
}

/**
* Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
*
Expand All @@ -58,7 +53,7 @@ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
PipelineOptions options,
DoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
Expand Down Expand Up @@ -168,7 +163,7 @@ ProcessFnRunner<InputT, OutputT, RestrictionT> newProcessFnRunner(
PipelineOptions options,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader,
OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@
*/
package org.apache.beam.runners.core;

import java.util.Collection;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.util.construction.TriggerTranslation;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/**
* A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the {@link
Expand All @@ -51,7 +48,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
TimerInternalsFactory<K> timerInternalsFactory,
SideInputReader sideInputReader,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
DoFnRunners.OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<KV<K, OutputT>> mainTag) {
return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
strategy,
Expand All @@ -68,7 +65,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
private transient StateInternalsFactory<K> stateInternalsFactory;
private transient TimerInternalsFactory<K> timerInternalsFactory;
private transient SideInputReader sideInputReader;
private transient DoFnRunners.OutputManager outputManager;
private transient WindowedValueMultiReceiver outputManager;
private TupleTag<KV<K, OutputT>> mainTag;

public GroupAlsoByWindowViaWindowSetNewDoFn(
Expand All @@ -77,7 +74,7 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
TimerInternalsFactory<K> timerInternalsFactory,
SideInputReader sideInputReader,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
DoFnRunners.OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<KV<K, OutputT>> mainTag) {
this.timerInternalsFactory = timerInternalsFactory;
this.sideInputReader = sideInputReader;
Expand All @@ -91,29 +88,6 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
this.triggerProto = TriggerTranslation.toProto(windowingStrategy.getTrigger());
}

private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
return new OutputWindowedValue<KV<K, OutputT>>() {
@Override
public void outputWindowedValue(
KV<K, OutputT> output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
outputManager.output(mainTag, WindowedValues.of(output, timestamp, windows, pane));
}

@Override
public <AdditionalOutputT> void outputWindowedValue(
TupleTag<AdditionalOutputT> tag,
AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
outputManager.output(tag, WindowedValues.of(output, timestamp, windows, pane));
}
};
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
Expand All @@ -130,7 +104,7 @@ public void processElement(ProcessContext c) throws Exception {
TriggerStateMachines.stateMachineForTrigger(triggerProto)),
stateInternals,
timerInternals,
outputWindowedValue(),
windowedValue -> outputManager.output(mainTag, windowedValue),
sideInputReader,
reduceFn,
c.getPipelineOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -72,19 +74,20 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> {
private final DoFn<InputT, OutputT> fn;
private final PipelineOptions pipelineOptions;
private final OutputWindowedValue<OutputT> output;
private final WindowedValueMultiReceiver outputReceiver;
private final SideInputReader sideInputReader;
private final ScheduledExecutorService executor;
private final int maxNumOutputs;
private final Duration maxDuration;
private final Supplier<BundleFinalizer> bundleFinalizer;
private final TupleTag<OutputT> mainOutputTag;

/**
* Creates a new invoker from components.
*
* @param fn The original {@link DoFn}.
* @param pipelineOptions {@link PipelineOptions} to include in the {@link DoFn.ProcessContext}.
* @param output Hook for outputting from the {@link DoFn.ProcessElement} method.
* @param outputReceiver Hook for outputting from the {@link DoFn.ProcessElement} method.
* @param sideInputReader Hook for accessing side inputs.
* @param executor Executor on which a checkpoint will be scheduled after the given duration.
* @param maxNumOutputs Maximum number of outputs, in total over all output tags, after which a
Expand All @@ -98,15 +101,17 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
public OutputAndTimeBoundedSplittableProcessElementInvoker(
DoFn<InputT, OutputT> fn,
PipelineOptions pipelineOptions,
OutputWindowedValue<OutputT> output,
WindowedValueMultiReceiver outputReceiver,
TupleTag<OutputT> mainOutputTag,
SideInputReader sideInputReader,
ScheduledExecutorService executor,
int maxNumOutputs,
Duration maxDuration,
Supplier<BundleFinalizer> bundleFinalizer) {
this.fn = fn;
this.pipelineOptions = pipelineOptions;
this.output = output;
this.outputReceiver = outputReceiver;
this.mainOutputTag = mainOutputTag;
this.sideInputReader = sideInputReader;
this.executor = executor;
this.maxNumOutputs = maxNumOutputs;
Expand Down Expand Up @@ -403,7 +408,7 @@ public void outputWindowedValue(
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
output.outputWindowedValue(value, timestamp, windows, paneInfo);
outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo));
}

@Override
Expand All @@ -413,7 +418,8 @@ public <T> void output(TupleTag<T> tag, T value) {

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
outputReceiver.output(
tag, WindowedValues.of(value, timestamp, element.getWindows(), element.getPane()));
}

@Override
Expand All @@ -427,7 +433,7 @@ public <T> void outputWindowedValue(
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
output.outputWindowedValue(tag, value, timestamp, windows, paneInfo);
outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo));
}

private void noteOutput() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValueReceiver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -106,7 +108,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
*/
private final WindowingStrategy<Object, W> windowingStrategy;

private final OutputWindowedValue<KV<K, OutputT>> outputter;
private final WindowedValueReceiver<KV<K, OutputT>> outputter;

private final StateInternals stateInternals;

Expand Down Expand Up @@ -214,7 +216,7 @@ public ReduceFnRunner(
ExecutableTriggerStateMachine triggerStateMachine,
StateInternals stateInternals,
TimerInternals timerInternals,
OutputWindowedValue<KV<K, OutputT>> outputter,
WindowedValueReceiver<KV<K, OutputT>> outputter,
@Nullable SideInputReader sideInputReader,
ReduceFn<K, InputT, OutputT, W> reduceFn,
@Nullable PipelineOptions options) {
Expand Down Expand Up @@ -1055,7 +1057,8 @@ private void prefetchOnTrigger(
}

// Output the actual value.
outputter.outputWindowedValue(KV.of(key, toOutput), outputTimestamp, windows, pane);
outputter.output(
WindowedValues.of(KV.of(key, toOutput), outputTimestamp, windows, pane));
});

reduceFn.onTrigger(renamedTriggerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.SchemaCoder;
Expand Down Expand Up @@ -54,6 +53,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
Expand Down Expand Up @@ -94,7 +94,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
private final DoFnInvoker<InputT, OutputT> invoker;

private final SideInputReader sideInputReader;
private final OutputManager outputManager;
private final WindowedValueMultiReceiver outputManager;

private final TupleTag<OutputT> mainOutputTag;
/** The set of known output tags. */
Expand Down Expand Up @@ -124,7 +124,7 @@ public SimpleDoFnRunner(
PipelineOptions options,
DoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
Expand Down
Loading
Loading