Skip to content

Commit b7e7b0e

Browse files
committed
Make WindowedValue a public interface and receivers for it
When we introduce OutputBuilder, it will implement WindowedValue, and every time we create an OutputBuilder we will want to provide the following: 1. the default metadata for the output 2. the receiver for the output This change unblocks 2 and reduces the size of the final change.
1 parent 60307b4 commit b7e7b0e

File tree

474 files changed

+2622
-2643
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

474 files changed

+2622
-2643
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,7 @@ class BeamModulePlugin implements Plugin<Project> {
12331233
"-AskipUses=${skipUsesCombinedRegex}",
12341234
"-AnoWarnMemoryConstraints",
12351235
"-AsuppressWarnings=annotation.not.completed,keyfor",
1236+
"-implicit:none"
12361237
]
12371238

12381239
project.dependencies {

runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.beam.sdk.state.TimeDomain;
2121
import org.apache.beam.sdk.transforms.DoFn;
2222
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
23-
import org.apache.beam.sdk.util.WindowedValue;
23+
import org.apache.beam.sdk.values.WindowedValue;
2424
import org.checkerframework.checker.nullness.qual.Nullable;
2525
import org.joda.time.Instant;
2626

runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@
2929
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
3030
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
3131
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
32-
import org.apache.beam.sdk.util.WindowedValue;
32+
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
3333
import org.apache.beam.sdk.values.KV;
3434
import org.apache.beam.sdk.values.PCollectionView;
3535
import org.apache.beam.sdk.values.TupleTag;
36+
import org.apache.beam.sdk.values.WindowedValue;
3637
import org.apache.beam.sdk.values.WindowingStrategy;
3738
import org.checkerframework.checker.nullness.qual.Nullable;
3839

@@ -41,12 +42,6 @@
4142
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
4243
})
4344
public class DoFnRunners {
44-
/** Information about how to create output receivers and output to them. */
45-
public interface OutputManager {
46-
/** Outputs a single element to the receiver indicated by the given {@link TupleTag}. */
47-
<T> void output(TupleTag<T> tag, WindowedValue<T> output);
48-
}
49-
5045
/**
5146
* Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
5247
*
@@ -58,7 +53,7 @@ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
5853
PipelineOptions options,
5954
DoFn<InputT, OutputT> fn,
6055
SideInputReader sideInputReader,
61-
OutputManager outputManager,
56+
WindowedValueMultiReceiver outputManager,
6257
TupleTag<OutputT> mainOutputTag,
6358
List<TupleTag<?>> additionalOutputTags,
6459
StepContext stepContext,
@@ -168,7 +163,7 @@ ProcessFnRunner<InputT, OutputT, RestrictionT> newProcessFnRunner(
168163
PipelineOptions options,
169164
Collection<PCollectionView<?>> views,
170165
ReadyCheckingSideInputReader sideInputReader,
171-
OutputManager outputManager,
166+
WindowedValueMultiReceiver outputManager,
172167
TupleTag<OutputT> mainOutputTag,
173168
List<TupleTag<?>> additionalOutputTags,
174169
StepContext stepContext,

runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717
*/
1818
package org.apache.beam.runners.core;
1919

20-
import java.util.Collection;
2120
import org.apache.beam.model.pipeline.v1.RunnerApi;
2221
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
2322
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
2423
import org.apache.beam.sdk.transforms.DoFn;
2524
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
26-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
2725
import org.apache.beam.sdk.util.SystemDoFnInternal;
28-
import org.apache.beam.sdk.util.WindowedValue;
26+
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
2927
import org.apache.beam.sdk.util.construction.TriggerTranslation;
3028
import org.apache.beam.sdk.values.KV;
3129
import org.apache.beam.sdk.values.TupleTag;
3230
import org.apache.beam.sdk.values.WindowingStrategy;
33-
import org.joda.time.Instant;
3431

3532
/**
3633
* A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the {@link
@@ -51,7 +48,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
5148
TimerInternalsFactory<K> timerInternalsFactory,
5249
SideInputReader sideInputReader,
5350
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
54-
DoFnRunners.OutputManager outputManager,
51+
WindowedValueMultiReceiver outputManager,
5552
TupleTag<KV<K, OutputT>> mainTag) {
5653
return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
5754
strategy,
@@ -68,7 +65,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
6865
private transient StateInternalsFactory<K> stateInternalsFactory;
6966
private transient TimerInternalsFactory<K> timerInternalsFactory;
7067
private transient SideInputReader sideInputReader;
71-
private transient DoFnRunners.OutputManager outputManager;
68+
private transient WindowedValueMultiReceiver outputManager;
7269
private TupleTag<KV<K, OutputT>> mainTag;
7370

7471
public GroupAlsoByWindowViaWindowSetNewDoFn(
@@ -77,7 +74,7 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
7774
TimerInternalsFactory<K> timerInternalsFactory,
7875
SideInputReader sideInputReader,
7976
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
80-
DoFnRunners.OutputManager outputManager,
77+
WindowedValueMultiReceiver outputManager,
8178
TupleTag<KV<K, OutputT>> mainTag) {
8279
this.timerInternalsFactory = timerInternalsFactory;
8380
this.sideInputReader = sideInputReader;
@@ -91,29 +88,6 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
9188
this.triggerProto = TriggerTranslation.toProto(windowingStrategy.getTrigger());
9289
}
9390

94-
private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
95-
return new OutputWindowedValue<KV<K, OutputT>>() {
96-
@Override
97-
public void outputWindowedValue(
98-
KV<K, OutputT> output,
99-
Instant timestamp,
100-
Collection<? extends BoundedWindow> windows,
101-
PaneInfo pane) {
102-
outputManager.output(mainTag, WindowedValue.of(output, timestamp, windows, pane));
103-
}
104-
105-
@Override
106-
public <AdditionalOutputT> void outputWindowedValue(
107-
TupleTag<AdditionalOutputT> tag,
108-
AdditionalOutputT output,
109-
Instant timestamp,
110-
Collection<? extends BoundedWindow> windows,
111-
PaneInfo pane) {
112-
outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
113-
}
114-
};
115-
}
116-
11791
@ProcessElement
11892
public void processElement(ProcessContext c) throws Exception {
11993
KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
@@ -130,7 +104,7 @@ public void processElement(ProcessContext c) throws Exception {
130104
TriggerStateMachines.stateMachineForTrigger(triggerProto)),
131105
stateInternals,
132106
timerInternals,
133-
outputWindowedValue(),
107+
windowedValue -> outputManager.output(mainTag, windowedValue),
134108
sideInputReader,
135109
reduceFn,
136110
c.getPipelineOptions());

runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
import org.apache.beam.sdk.transforms.GroupByKey;
3030
import org.apache.beam.sdk.transforms.PTransform;
3131
import org.apache.beam.sdk.transforms.ParDo;
32-
import org.apache.beam.sdk.util.WindowedValue;
33-
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
3432
import org.apache.beam.sdk.values.KV;
3533
import org.apache.beam.sdk.values.PCollection;
34+
import org.apache.beam.sdk.values.WindowedValue;
35+
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
3636
import org.apache.beam.sdk.values.WindowingStrategy;
3737

3838
/**

runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.beam.runners.core;
1919

2020
import org.apache.beam.runners.core.TimerInternals.TimerData;
21-
import org.apache.beam.sdk.util.WindowedValue;
21+
import org.apache.beam.sdk.values.WindowedValue;
2222

2323
/**
2424
* Interface that contains all the timers and elements associated with a specific work item.

runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import org.apache.beam.sdk.coders.IterableCoder;
2929
import org.apache.beam.sdk.coders.StructuredCoder;
3030
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
31-
import org.apache.beam.sdk.util.WindowedValue;
32-
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
31+
import org.apache.beam.sdk.values.WindowedValue;
32+
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
3333
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
3434

3535
/** A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}. */

runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Collections;
2121
import java.util.Objects;
2222
import org.apache.beam.runners.core.TimerInternals.TimerData;
23-
import org.apache.beam.sdk.util.WindowedValue;
23+
import org.apache.beam.sdk.values.WindowedValue;
2424
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
2525
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
2626
import org.checkerframework.checker.nullness.qual.Nullable;

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@
2525
import org.apache.beam.sdk.transforms.DoFn;
2626
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2727
import org.apache.beam.sdk.util.WindowTracing;
28-
import org.apache.beam.sdk.util.WindowedValue;
2928
import org.apache.beam.sdk.values.KV;
29+
import org.apache.beam.sdk.values.WindowedValue;
30+
import org.apache.beam.sdk.values.WindowedValues;
3031
import org.apache.beam.sdk.values.WindowingStrategy;
3132
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
3233
import org.joda.time.Instant;
@@ -140,7 +141,7 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
140141
timerInternals.currentOutputWatermarkTime());
141142
} else {
142143
nonLateElements.add(
143-
WindowedValue.of(
144+
WindowedValues.of(
144145
element.getValue(), element.getTimestamp(), window, element.getPane()));
145146
}
146147
}

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2222
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
2323
import org.apache.beam.sdk.util.WindowTracing;
24-
import org.apache.beam.sdk.util.WindowedValue;
24+
import org.apache.beam.sdk.values.WindowedValue;
2525
import org.apache.beam.sdk.values.WindowingStrategy;
2626
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
2727
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

0 commit comments

Comments
 (0)