Skip to content

Commit 1c2d953

Browse files
authored
[FLINK-36625] Add lineage helper class for connector integration
1 parent 0022ff1 commit 1c2d953

File tree

9 files changed

+260
-12
lines changed

9 files changed

+260
-12
lines changed

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ public DefaultLineageVertex() {
3232
this.lineageDatasets = new ArrayList<>();
3333
}
3434

35+
public DefaultLineageVertex(List<LineageDataset> lineageDatasets) {
36+
this.lineageDatasets = lineageDatasets;
37+
}
38+
3539
public void addLineageDataset(LineageDataset lineageDataset) {
3640
this.lineageDatasets.add(lineageDataset);
3741
}

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,13 @@ public class DefaultSourceLineageVertex implements SourceLineageVertex {
3232
private List<LineageDataset> lineageDatasets;
3333

3434
public DefaultSourceLineageVertex(Boundedness boundedness) {
35-
this.lineageDatasets = new ArrayList<>();
35+
this(boundedness, new ArrayList<>());
36+
}
37+
38+
public DefaultSourceLineageVertex(
39+
Boundedness boundedness, List<LineageDataset> lineageDatasets) {
3640
this.boundedness = boundedness;
41+
this.lineageDatasets = lineageDatasets;
3742
}
3843

3944
public void addDataset(LineageDataset lineageDataset) {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.api.lineage;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
24+
import java.util.Objects;
25+
26+
/** Default implementation of {@link TypeDatasetFacet}. */
27+
@PublicEvolving
28+
public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
29+
30+
public static final String TYPE_FACET_NAME = "type";
31+
32+
private final TypeInformation typeInformation;
33+
34+
public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
35+
this.typeInformation = typeInformation;
36+
}
37+
38+
public TypeInformation getTypeInformation() {
39+
return typeInformation;
40+
}
41+
42+
public boolean equals(Object o) {
43+
if (this == o) {
44+
return true;
45+
}
46+
if (o == null || getClass() != o.getClass()) {
47+
return false;
48+
}
49+
DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
50+
return Objects.equals(typeInformation, that.typeInformation);
51+
}
52+
53+
@Override
54+
public int hashCode() {
55+
return Objects.hash(typeInformation);
56+
}
57+
58+
@Override
59+
public String name() {
60+
return TYPE_FACET_NAME;
61+
}
62+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.api.lineage;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.connector.source.Boundedness;
23+
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.stream.Collectors;
27+
28+
/** Util class for creating default LineageDataset and LineageVertex. */
29+
@Internal
30+
public class LineageUtils {
31+
32+
public static LineageDataset datasetOf(
33+
String name, String namespace, TypeDatasetFacet typeDatasetFacet) {
34+
return datasetOf(name, namespace, Collections.singletonList(typeDatasetFacet));
35+
}
36+
37+
public static LineageDataset datasetOf(
38+
String name, String namespace, List<LineageDatasetFacet> facets) {
39+
return new DefaultLineageDataset(
40+
name,
41+
namespace,
42+
facets.stream().collect(Collectors.toMap(LineageDatasetFacet::name, item -> item)));
43+
}
44+
45+
public static SourceLineageVertex sourceLineageVertexOf(
46+
Boundedness boundedness, LineageDataset dataset) {
47+
return new DefaultSourceLineageVertex(boundedness, Collections.singletonList(dataset));
48+
}
49+
50+
public static LineageVertex lineageVertexOf(LineageDataset dataset) {
51+
return new DefaultLineageVertex(Collections.singletonList(dataset));
52+
}
53+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.api.lineage;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
24+
import javax.annotation.Nonnull;
25+
26+
/** Facet definition to contain type information of source and sink. */
27+
@PublicEvolving
28+
public interface TypeDatasetFacet extends LineageDatasetFacet {
29+
30+
@Nonnull
31+
TypeInformation getTypeInformation();
32+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.api.lineage;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
23+
import java.util.Optional;
24+
25+
/** Contains method to extract {@link TypeDatasetFacet}. */
26+
@PublicEvolving
27+
public interface TypeDatasetFacetProvider {
28+
29+
/**
30+
* Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to
31+
* resolve type.
32+
*/
33+
Optional<TypeDatasetFacet> getTypeDatasetFacet();
34+
}

flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,7 @@ public LineageVertex getLineageVertex() {
194194
LineageDataset lineageDataset =
195195
new DefaultLineageDataset(
196196
SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>());
197-
DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
198-
lineageVertex.addLineageDataset(lineageDataset);
199-
return lineageVertex;
197+
return LineageUtils.lineageVertexOf(lineageDataset);
200198
}
201199
}
202200

@@ -212,10 +210,7 @@ public LineageVertex getLineageVertex() {
212210
LineageDataset lineageDataset =
213211
new DefaultLineageDataset(
214212
SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new HashMap<>());
215-
DefaultSourceLineageVertex lineageVertex =
216-
new DefaultSourceLineageVertex(Boundedness.BOUNDED);
217-
lineageVertex.addDataset(lineageDataset);
218-
return lineageVertex;
213+
return LineageUtils.sourceLineageVertexOf(Boundedness.BOUNDED, lineageDataset);
219214
}
220215
}
221216

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.api.lineage;
20+
21+
import org.apache.flink.api.common.typeinfo.Types;
22+
import org.apache.flink.api.connector.source.Boundedness;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
28+
/** Testing for lineage util. */
29+
public class LineageUtilsTest {
30+
private static final String TEST_NAME = "testName";
31+
private static final String TEST_NAMESPACE = "testNameSpace";
32+
33+
@Test
34+
public void testDataSetOf() {
35+
DefaultTypeDatasetFacet typeDatasetFacet = new DefaultTypeDatasetFacet(Types.BIG_INT);
36+
LineageDataset dataset =
37+
LineageUtils.datasetOf(TEST_NAME, TEST_NAMESPACE, typeDatasetFacet);
38+
39+
assertThat(dataset.name()).isEqualTo(TEST_NAME);
40+
assertThat(dataset.namespace()).isEqualTo(TEST_NAMESPACE);
41+
assertThat(dataset.facets()).size().isEqualTo(1);
42+
assertThat(dataset.facets().get(typeDatasetFacet.name())).isEqualTo(typeDatasetFacet);
43+
}
44+
45+
@Test
46+
public void testSourceLineageVertexOf() {
47+
LineageDataset dataset =
48+
LineageUtils.datasetOf(
49+
TEST_NAME, TEST_NAMESPACE, new DefaultTypeDatasetFacet(Types.BIG_INT));
50+
SourceLineageVertex sourceLineageVertex =
51+
LineageUtils.sourceLineageVertexOf(Boundedness.CONTINUOUS_UNBOUNDED, dataset);
52+
53+
assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
54+
assertThat(sourceLineageVertex.datasets()).containsExactly(dataset);
55+
}
56+
57+
@Test
58+
public void testLineageVertexOf() {
59+
LineageDataset dataset =
60+
LineageUtils.datasetOf(
61+
TEST_NAME, TEST_NAMESPACE, new DefaultTypeDatasetFacet(Types.BIG_INT));
62+
LineageVertex lineageVertex = LineageUtils.lineageVertexOf(dataset);
63+
assertThat(lineageVertex.datasets()).containsExactly(dataset);
64+
}
65+
}

flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@
3737
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
3838
import org.apache.flink.streaming.api.graph.StreamGraph;
3939
import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
40-
import org.apache.flink.streaming.api.lineage.DefaultLineageVertex;
4140
import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex;
4241
import org.apache.flink.streaming.api.lineage.LineageDataset;
4342
import org.apache.flink.streaming.api.lineage.LineageGraph;
43+
import org.apache.flink.streaming.api.lineage.LineageUtils;
4444
import org.apache.flink.streaming.api.lineage.LineageVertex;
4545
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
4646
import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
@@ -322,9 +322,7 @@ public LineageVertex getLineageVertex() {
322322
LineageDataset lineageDataset =
323323
new DefaultLineageDataset(
324324
SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>());
325-
DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
326-
lineageVertex.addLineageDataset(lineageDataset);
327-
return lineageVertex;
325+
return LineageUtils.lineageVertexOf(lineageDataset);
328326
}
329327
}
330328
}

0 commit comments

Comments
 (0)