Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -41,12 +42,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DoFnRunners {
/** Information about how to create output receivers and output to them. */
public interface OutputManager {
/** Outputs a single element to the receiver indicated by the given {@link TupleTag}. */
<T> void output(TupleTag<T> tag, WindowedValue<T> output);
}

/**
* Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
*
Expand All @@ -58,7 +53,7 @@ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
PipelineOptions options,
DoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
Expand Down Expand Up @@ -168,7 +163,7 @@ ProcessFnRunner<InputT, OutputT, RestrictionT> newProcessFnRunner(
PipelineOptions options,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader,
OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@
*/
package org.apache.beam.runners.core;

import java.util.Collection;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.util.construction.TriggerTranslation;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/**
* A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the {@link
Expand All @@ -51,7 +48,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
TimerInternalsFactory<K> timerInternalsFactory,
SideInputReader sideInputReader,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
DoFnRunners.OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<KV<K, OutputT>> mainTag) {
return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
strategy,
Expand All @@ -68,7 +65,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
private transient StateInternalsFactory<K> stateInternalsFactory;
private transient TimerInternalsFactory<K> timerInternalsFactory;
private transient SideInputReader sideInputReader;
private transient DoFnRunners.OutputManager outputManager;
private transient WindowedValueMultiReceiver outputManager;
private TupleTag<KV<K, OutputT>> mainTag;

public GroupAlsoByWindowViaWindowSetNewDoFn(
Expand All @@ -77,7 +74,7 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
TimerInternalsFactory<K> timerInternalsFactory,
SideInputReader sideInputReader,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
DoFnRunners.OutputManager outputManager,
WindowedValueMultiReceiver outputManager,
TupleTag<KV<K, OutputT>> mainTag) {
this.timerInternalsFactory = timerInternalsFactory;
this.sideInputReader = sideInputReader;
Expand All @@ -91,29 +88,6 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
this.triggerProto = TriggerTranslation.toProto(windowingStrategy.getTrigger());
}

private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
return new OutputWindowedValue<KV<K, OutputT>>() {
@Override
public void outputWindowedValue(
KV<K, OutputT> output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
outputManager.output(mainTag, WindowedValue.of(output, timestamp, windows, pane));
}

@Override
public <AdditionalOutputT> void outputWindowedValue(
TupleTag<AdditionalOutputT> tag,
AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
}
};
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
Expand All @@ -130,7 +104,7 @@ public void processElement(ProcessContext c) throws Exception {
TriggerStateMachines.stateMachineForTrigger(triggerProto)),
stateInternals,
timerInternals,
outputWindowedValue(),
windowedValue -> outputManager.output(mainTag, windowedValue),
sideInputReader,
reduceFn,
c.getPipelineOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.beam.runners.core;

import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;

/**
* Interface that contains all the timers and elements associated with a specific work item.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/** A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Collections;
import java.util.Objects;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
Expand Down Expand Up @@ -140,7 +141,7 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
timerInternals.currentOutputWatermarkTime());
} else {
nonLateElements.add(
WindowedValue.of(
WindowedValues.of(
element.getValue(), element.getTimestamp(), window, element.getPane()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -72,19 +74,20 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> {
private final DoFn<InputT, OutputT> fn;
private final PipelineOptions pipelineOptions;
private final OutputWindowedValue<OutputT> output;
private final WindowedValueMultiReceiver outputReceiver;
private final SideInputReader sideInputReader;
private final ScheduledExecutorService executor;
private final int maxNumOutputs;
private final Duration maxDuration;
private final Supplier<BundleFinalizer> bundleFinalizer;
private final TupleTag<OutputT> mainOutputTag;

/**
* Creates a new invoker from components.
*
* @param fn The original {@link DoFn}.
* @param pipelineOptions {@link PipelineOptions} to include in the {@link DoFn.ProcessContext}.
* @param output Hook for outputting from the {@link DoFn.ProcessElement} method.
* @param outputReceiver Hook for outputting from the {@link DoFn.ProcessElement} method.
* @param sideInputReader Hook for accessing side inputs.
* @param executor Executor on which a checkpoint will be scheduled after the given duration.
* @param maxNumOutputs Maximum number of outputs, in total over all output tags, after which a
Expand All @@ -98,15 +101,17 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
public OutputAndTimeBoundedSplittableProcessElementInvoker(
DoFn<InputT, OutputT> fn,
PipelineOptions pipelineOptions,
OutputWindowedValue<OutputT> output,
WindowedValueMultiReceiver outputReceiver,
TupleTag<OutputT> mainOutputTag,
SideInputReader sideInputReader,
ScheduledExecutorService executor,
int maxNumOutputs,
Duration maxDuration,
Supplier<BundleFinalizer> bundleFinalizer) {
this.fn = fn;
this.pipelineOptions = pipelineOptions;
this.output = output;
this.outputReceiver = outputReceiver;
this.mainOutputTag = mainOutputTag;
this.sideInputReader = sideInputReader;
this.executor = executor;
this.maxNumOutputs = maxNumOutputs;
Expand Down Expand Up @@ -403,7 +408,7 @@ public void outputWindowedValue(
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
output.outputWindowedValue(value, timestamp, windows, paneInfo);
outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo));
}

@Override
Expand All @@ -413,7 +418,8 @@ public <T> void output(TupleTag<T> tag, T value) {

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
outputReceiver.output(
tag, WindowedValues.of(value, timestamp, element.getWindows(), element.getPane()));
}

@Override
Expand All @@ -427,7 +433,7 @@ public <T> void outputWindowedValue(
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
output.outputWindowedValue(tag, value, timestamp, windows, paneInfo);
outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo));
}

private void noteOutput() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;

/**
Expand Down
Loading
Loading