Skip to content

Commit 6c80b72

Browse files
committed
Introduce ValueWithMetadata and its receivers
When we introduce OutputBuilder, it will implement ValueWithMetadata, and every time we create an OutputBuilder we will want to provide 1. the default metadata for the output 2. the receiver for the output This change unblocks 2 and reduces the size of the final change.
1 parent 60307b4 commit 6c80b72

File tree

37 files changed

+273
-319
lines changed

37 files changed

+273
-319
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
3030
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
3131
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
32+
import org.apache.beam.sdk.util.ValueWithMetadataMultiReceiver;
3233
import org.apache.beam.sdk.util.WindowedValue;
3334
import org.apache.beam.sdk.values.KV;
3435
import org.apache.beam.sdk.values.PCollectionView;
@@ -41,12 +42,6 @@
4142
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
4243
})
4344
public class DoFnRunners {
44-
/** Information about how to create output receivers and output to them. */
45-
public interface OutputManager {
46-
/** Outputs a single element to the receiver indicated by the given {@link TupleTag}. */
47-
<T> void output(TupleTag<T> tag, WindowedValue<T> output);
48-
}
49-
5045
/**
5146
* Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
5247
*
@@ -58,7 +53,7 @@ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
5853
PipelineOptions options,
5954
DoFn<InputT, OutputT> fn,
6055
SideInputReader sideInputReader,
61-
OutputManager outputManager,
56+
ValueWithMetadataMultiReceiver outputManager,
6257
TupleTag<OutputT> mainOutputTag,
6358
List<TupleTag<?>> additionalOutputTags,
6459
StepContext stepContext,
@@ -168,7 +163,7 @@ ProcessFnRunner<InputT, OutputT, RestrictionT> newProcessFnRunner(
168163
PipelineOptions options,
169164
Collection<PCollectionView<?>> views,
170165
ReadyCheckingSideInputReader sideInputReader,
171-
OutputManager outputManager,
166+
ValueWithMetadataMultiReceiver outputManager,
172167
TupleTag<OutputT> mainOutputTag,
173168
List<TupleTag<?>> additionalOutputTags,
174169
StepContext stepContext,

runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717
*/
1818
package org.apache.beam.runners.core;
1919

20-
import java.util.Collection;
2120
import org.apache.beam.model.pipeline.v1.RunnerApi;
2221
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
2322
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
2423
import org.apache.beam.sdk.transforms.DoFn;
2524
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
26-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
2725
import org.apache.beam.sdk.util.SystemDoFnInternal;
28-
import org.apache.beam.sdk.util.WindowedValue;
26+
import org.apache.beam.sdk.util.ValueWithMetadataMultiReceiver;
2927
import org.apache.beam.sdk.util.construction.TriggerTranslation;
3028
import org.apache.beam.sdk.values.KV;
3129
import org.apache.beam.sdk.values.TupleTag;
3230
import org.apache.beam.sdk.values.WindowingStrategy;
33-
import org.joda.time.Instant;
3431

3532
/**
3633
* A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the {@link
@@ -51,7 +48,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
5148
TimerInternalsFactory<K> timerInternalsFactory,
5249
SideInputReader sideInputReader,
5350
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
54-
DoFnRunners.OutputManager outputManager,
51+
ValueWithMetadataMultiReceiver outputManager,
5552
TupleTag<KV<K, OutputT>> mainTag) {
5653
return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
5754
strategy,
@@ -68,7 +65,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
6865
private transient StateInternalsFactory<K> stateInternalsFactory;
6966
private transient TimerInternalsFactory<K> timerInternalsFactory;
7067
private transient SideInputReader sideInputReader;
71-
private transient DoFnRunners.OutputManager outputManager;
68+
private transient ValueWithMetadataMultiReceiver outputManager;
7269
private TupleTag<KV<K, OutputT>> mainTag;
7370

7471
public GroupAlsoByWindowViaWindowSetNewDoFn(
@@ -77,7 +74,7 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
7774
TimerInternalsFactory<K> timerInternalsFactory,
7875
SideInputReader sideInputReader,
7976
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
80-
DoFnRunners.OutputManager outputManager,
77+
ValueWithMetadataMultiReceiver outputManager,
8178
TupleTag<KV<K, OutputT>> mainTag) {
8279
this.timerInternalsFactory = timerInternalsFactory;
8380
this.sideInputReader = sideInputReader;
@@ -91,29 +88,6 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
9188
this.triggerProto = TriggerTranslation.toProto(windowingStrategy.getTrigger());
9289
}
9390

94-
private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
95-
return new OutputWindowedValue<KV<K, OutputT>>() {
96-
@Override
97-
public void outputWindowedValue(
98-
KV<K, OutputT> output,
99-
Instant timestamp,
100-
Collection<? extends BoundedWindow> windows,
101-
PaneInfo pane) {
102-
outputManager.output(mainTag, WindowedValue.of(output, timestamp, windows, pane));
103-
}
104-
105-
@Override
106-
public <AdditionalOutputT> void outputWindowedValue(
107-
TupleTag<AdditionalOutputT> tag,
108-
AdditionalOutputT output,
109-
Instant timestamp,
110-
Collection<? extends BoundedWindow> windows,
111-
PaneInfo pane) {
112-
outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
113-
}
114-
};
115-
}
116-
11791
@ProcessElement
11892
public void processElement(ProcessContext c) throws Exception {
11993
KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
@@ -130,7 +104,7 @@ public void processElement(ProcessContext c) throws Exception {
130104
TriggerStateMachines.stateMachineForTrigger(triggerProto)),
131105
stateInternals,
132106
timerInternals,
133-
outputWindowedValue(),
107+
outputManager.forTag(mainTag),
134108
sideInputReader,
135109
reduceFn,
136110
c.getPipelineOptions());

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
4646
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4747
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
48+
import org.apache.beam.sdk.util.ValueWithMetadataMultiReceiver;
4849
import org.apache.beam.sdk.util.WindowedValue;
4950
import org.apache.beam.sdk.values.KV;
5051
import org.apache.beam.sdk.values.PCollectionView;
@@ -72,19 +73,20 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
7273
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> {
7374
private final DoFn<InputT, OutputT> fn;
7475
private final PipelineOptions pipelineOptions;
75-
private final OutputWindowedValue<OutputT> output;
76+
private final ValueWithMetadataMultiReceiver outputReceiver;
7677
private final SideInputReader sideInputReader;
7778
private final ScheduledExecutorService executor;
7879
private final int maxNumOutputs;
7980
private final Duration maxDuration;
8081
private final Supplier<BundleFinalizer> bundleFinalizer;
82+
private final TupleTag<OutputT> mainOutputTag;
8183

8284
/**
8385
* Creates a new invoker from components.
8486
*
8587
* @param fn The original {@link DoFn}.
8688
* @param pipelineOptions {@link PipelineOptions} to include in the {@link DoFn.ProcessContext}.
87-
* @param output Hook for outputting from the {@link DoFn.ProcessElement} method.
89+
* @param outputReceiver Hook for outputting from the {@link DoFn.ProcessElement} method.
8890
* @param sideInputReader Hook for accessing side inputs.
8991
* @param executor Executor on which a checkpoint will be scheduled after the given duration.
9092
* @param maxNumOutputs Maximum number of outputs, in total over all output tags, after which a
@@ -98,15 +100,17 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
98100
public OutputAndTimeBoundedSplittableProcessElementInvoker(
99101
DoFn<InputT, OutputT> fn,
100102
PipelineOptions pipelineOptions,
101-
OutputWindowedValue<OutputT> output,
103+
ValueWithMetadataMultiReceiver outputReceiver,
104+
TupleTag<OutputT> mainOutputTag,
102105
SideInputReader sideInputReader,
103106
ScheduledExecutorService executor,
104107
int maxNumOutputs,
105108
Duration maxDuration,
106109
Supplier<BundleFinalizer> bundleFinalizer) {
107110
this.fn = fn;
108111
this.pipelineOptions = pipelineOptions;
109-
this.output = output;
112+
this.outputReceiver = outputReceiver;
113+
this.mainOutputTag = mainOutputTag;
110114
this.sideInputReader = sideInputReader;
111115
this.executor = executor;
112116
this.maxNumOutputs = maxNumOutputs;
@@ -403,7 +407,9 @@ public void outputWindowedValue(
403407
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
404408
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
405409
}
406-
output.outputWindowedValue(value, timestamp, windows, paneInfo);
410+
outputReceiver
411+
.forTag(mainOutputTag)
412+
.output(WindowedValue.of(value, timestamp, windows, paneInfo));
407413
}
408414

409415
@Override
@@ -413,7 +419,9 @@ public <T> void output(TupleTag<T> tag, T value) {
413419

414420
@Override
415421
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
416-
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
422+
outputReceiver
423+
.forTag(tag)
424+
.output(WindowedValue.of(value, timestamp, element.getWindows(), element.getPane()));
417425
}
418426

419427
@Override
@@ -427,7 +435,7 @@ public <T> void outputWindowedValue(
427435
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
428436
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
429437
}
430-
output.outputWindowedValue(tag, value, timestamp, windows, paneInfo);
438+
outputReceiver.forTag(tag).output(WindowedValue.of(value, timestamp, windows, paneInfo));
431439
}
432440

433441
private void noteOutput() {

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.beam.sdk.transforms.windowing.Window;
4747
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
4848
import org.apache.beam.sdk.transforms.windowing.WindowFn;
49+
import org.apache.beam.sdk.util.ValueWithMetadataReceiver;
4950
import org.apache.beam.sdk.util.WindowTracing;
5051
import org.apache.beam.sdk.util.WindowedValue;
5152
import org.apache.beam.sdk.values.KV;
@@ -106,7 +107,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
106107
*/
107108
private final WindowingStrategy<Object, W> windowingStrategy;
108109

109-
private final OutputWindowedValue<KV<K, OutputT>> outputter;
110+
private final ValueWithMetadataReceiver<KV<K, OutputT>> outputter;
110111

111112
private final StateInternals stateInternals;
112113

@@ -214,7 +215,7 @@ public ReduceFnRunner(
214215
ExecutableTriggerStateMachine triggerStateMachine,
215216
StateInternals stateInternals,
216217
TimerInternals timerInternals,
217-
OutputWindowedValue<KV<K, OutputT>> outputter,
218+
ValueWithMetadataReceiver<KV<K, OutputT>> outputter,
218219
@Nullable SideInputReader sideInputReader,
219220
ReduceFn<K, InputT, OutputT, W> reduceFn,
220221
@Nullable PipelineOptions options) {
@@ -1055,7 +1056,8 @@ private void prefetchOnTrigger(
10551056
}
10561057

10571058
// Output the actual value.
1058-
outputter.outputWindowedValue(KV.of(key, toOutput), outputTimestamp, windows, pane);
1059+
outputter.output(
1060+
WindowedValue.of(KV.of(key, toOutput), outputTimestamp, windows, pane));
10591061
});
10601062

10611063
reduceFn.onTrigger(renamedTriggerContext);

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Set;
29-
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
3029
import org.apache.beam.sdk.coders.Coder;
3130
import org.apache.beam.sdk.options.PipelineOptions;
3231
import org.apache.beam.sdk.schemas.SchemaCoder;
@@ -54,6 +53,7 @@
5453
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
5554
import org.apache.beam.sdk.util.SystemDoFnInternal;
5655
import org.apache.beam.sdk.util.UserCodeException;
56+
import org.apache.beam.sdk.util.ValueWithMetadataMultiReceiver;
5757
import org.apache.beam.sdk.util.WindowedValue;
5858
import org.apache.beam.sdk.values.PCollectionView;
5959
import org.apache.beam.sdk.values.Row;
@@ -93,7 +93,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
9393
private final DoFnInvoker<InputT, OutputT> invoker;
9494

9595
private final SideInputReader sideInputReader;
96-
private final OutputManager outputManager;
96+
private final ValueWithMetadataMultiReceiver outputManager;
9797

9898
private final TupleTag<OutputT> mainOutputTag;
9999
/** The set of known output tags. */
@@ -123,7 +123,7 @@ public SimpleDoFnRunner(
123123
PipelineOptions options,
124124
DoFn<InputT, OutputT> fn,
125125
SideInputReader sideInputReader,
126-
OutputManager outputManager,
126+
ValueWithMetadataMultiReceiver outputManager,
127127
TupleTag<OutputT> mainOutputTag,
128128
List<TupleTag<?>> additionalOutputTags,
129129
StepContext stepContext,
@@ -273,7 +273,7 @@ private void checkTimestamp(Instant elemTimestamp, Instant timestamp) {
273273

274274
private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
275275
checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag);
276-
outputManager.output(tag, windowedElem);
276+
outputManager.forTag(tag).output(windowedElem);
277277
}
278278

279279
/** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle @StartBundle}. */

runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import static org.junit.Assert.assertNull;
2828
import static org.junit.Assert.assertTrue;
2929

30-
import java.util.Collection;
3130
import java.util.Collections;
3231
import java.util.concurrent.Executors;
3332
import java.util.concurrent.TimeUnit;
@@ -38,9 +37,10 @@
3837
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
3938
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
4039
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
41-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4240
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
4341
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
42+
import org.apache.beam.sdk.util.ValueWithMetadataMultiReceiver;
43+
import org.apache.beam.sdk.util.ValueWithMetadataReceiver;
4444
import org.apache.beam.sdk.util.WindowedValue;
4545
import org.apache.beam.sdk.values.TupleTag;
4646
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
@@ -112,22 +112,13 @@ private SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.R
112112
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
113113
fn,
114114
PipelineOptionsFactory.create(),
115-
new OutputWindowedValue<String>() {
115+
new ValueWithMetadataMultiReceiver() {
116116
@Override
117-
public void outputWindowedValue(
118-
String output,
119-
Instant timestamp,
120-
Collection<? extends BoundedWindow> windows,
121-
PaneInfo pane) {}
122-
123-
@Override
124-
public <AdditionalOutputT> void outputWindowedValue(
125-
TupleTag<AdditionalOutputT> tag,
126-
AdditionalOutputT output,
127-
Instant timestamp,
128-
Collection<? extends BoundedWindow> windows,
129-
PaneInfo pane) {}
117+
public <OutputT> ValueWithMetadataReceiver<OutputT> forTag(TupleTag<OutputT> tag) {
118+
return output -> {};
119+
}
130120
},
121+
null,
131122
NullSideInputReader.empty(),
132123
Executors.newSingleThreadScheduledExecutor(),
133124
1000,

0 commit comments

Comments
 (0)