Skip to content

FLIP 27 integration

Brian Zhou edited this page Jul 24, 2020 · 7 revisions

Background

Flink 1.11 introduced new Data Source API as part of FLIP-27. Here is the API introduction documents. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/sources.html

Constraints

  • We will first focus on the streaming API with the implementation, batch is not considered here because it will go along another way to implement.
  • Considering the abstraction of Split is on reader level, the end-to-end auto-scaling support is not considered. Increase reader number if we monitor scale-up events in the stream will be a separate task for this.
  • The Pravega watermark support is synchronized under reader group level, this is different from Flink assumption which can be split level. So we cannot take the Pravega watermark support with this new API.

Concept mapping

Source

The Source is a factory class to create the instances of the below concepts. It manages a reader group with a builder style constructor with user provided ReaderGroupConfig. We can reuse the current AbstractStreamingReaderBuilder to build such a source. We will keep a list of PravegaSplit in the Flink state for recovery. This will be different with triggering Pravega checkpoint in the current FlinkPravegaReader API.

public class FlinkPravegaSource<T> implements Source<T, PravegaSplit, List<PravegaSplit>> {}

Split

A Split is an EventStreamReader with a Position. To keep it serializable, we will not keep the reader inside, it will just contain the position, the reader ID as the split ID. The position is recorded for recovery. Dependency from Pravega team: We should get Position to implement Serializable.

public class PravegaSplit implements SourceSplit, Serializable {}

Alternatives: It can also be considered as a segment, we also have this implementation using Pravega Batch Client in FlinkPravegaInputFormat. However, the ordering is not guaranteed, so this thought is abandoned.

SplitEnumerator

The SplitEnumerator is a single instance on Flink jobmanager. It connects to a Pravega reader group with the pravega stream. It is the "brain" of the source to initialize the reader group when it starts and create the readers. The reader number is the same as the subtask number(parallelism).

public class PravegaSplitEnumerator implements SplitEnumerator<PravegaSplit, List<PravegaSplit>> {}

addSplitsBack(List<PravegaSplit> splits, int subtaskId) function will deal with the data recovery. addReader(int subtaskId) function will check with the current parallelism, create the reader accordingly and update the assignment (always one-to-one mapping).

SourceReader

The SourceReader has a default recommended Flink implementation SourceReaderBase. This one constructs with three major components, SplitReader and SplitFetcherManager and RecordEmitter. With this recommended API, it allows us to just provide a SplitReader abstraction to implement this as a whole.

public class PravegaSourceReader<T>
        extends SourceReaderBase<EventRead<T>, T, PravegaSplit, PravegaSplit> {}
SplitReader

The SplitReader is actually an instance of a EventStreamReader. It has a fetch() interface to read one or more EventRead<T> from an EventStreamReader.

public class PravegaSplitReader<T> implements SplitReader<EventRead<T>, PravegaSplit> {}

We can use the Flink SingleThreadFetcherManager as the SplitFetcherManager, This fetcher is single threaded and will add all the readers to it.

RecordEmitter is actually a function to turn EventRead<T> into T. We should offer a default implementation for just get the event, but we can also let user to DIY, especially when they want to index with the Pravega metadata. We have similar implementation in https://github.com/pravega/flink-connectors/issues/180

Clone this wiki locally