Backgroud
At present, the Flink translation layer does not implement an Enumerator singleton as expected by the Source API design, and shards are distributed to Readers through the pull push model. Instead, each Reader will instantiate an Enumerator, resulting in the entire Source API implementation not meeting the expectations of batch flow integration.
Motivation
Correctly implement the Source API pull push model, enable normal rpc communication between Reader and Enumerator, and ensure that CDC Connector runs correctly on Flink.
Architecture design
Before introducing the systematic architecture design, let's first review the runtime process of the Source API:
1. Source create enumerator
2. Source create reader
3. Enumerator discovery and generate source splits
4. Reader pull source split from enumerator and enumerator push split to reader
5. Reader read data from data source
Based on the above process, enumerator has implemented unified sharding distribution, reader has implemented parallel reading function, and source benefits from this clear running mechanism to achieve batch flow integration.
Since our source design is completely based on Flink's source design, the entire API has a natural affinity for integration. Therefore, we only need to implement a set of Flink source connector warpers in the translation layer to warp our own source API.
FlinkSource
/**
* The source implementation of {@link Source}, used for proxy all {@link SeaTunnelSource} in flink.
*
* @param <SplitT> The generic type of source split
* @param <EnumStateT> The generic type of enumerator state
*/
public class FlinkSource<SplitT extends SourceSplit, EnumStateT extends Serializable>
implements Source<Row, SplitWrapper<SplitT>, EnumStateT>, ResultTypeQueryable<Row> {
private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;
public FlinkSource(SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source) {
this.source = source;
}
@Override
public Boundedness getBoundedness() {
org.apache.seatunnel.api.source.Boundedness boundedness = source.getBoundedness();
return boundedness == org.apache.seatunnel.api.source.Boundedness.BOUNDED
? Boundedness.BOUNDED
: Boundedness.CONTINUOUS_UNBOUNDED;
}
@Override
public SourceReader<Row, SplitWrapper<SplitT>> createReader(SourceReaderContext readerContext)
throws Exception {
org.apache.seatunnel.api.source.SourceReader.Context context =
new FlinkSourceReaderContext(readerContext, source);
org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> reader =
source.createReader(context);
return new FlinkSourceReader<>(reader, (SeaTunnelRowType) source.getProducedType());
}
@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> createEnumerator(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) throws Exception {
SourceSplitEnumerator.Context<SplitT> context =
new FlinkSourceSplitEnumeratorContext<>(enumContext);
SourceSplitEnumerator<SplitT, EnumStateT> enumerator = source.createEnumerator(context);
return new FlinkSourceEnumerator<>(enumerator, enumContext);
}
@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> restoreEnumerator(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext, EnumStateT checkpoint)
throws Exception {
FlinkSourceSplitEnumeratorContext<SplitT> context =
new FlinkSourceSplitEnumeratorContext<>(enumContext);
SourceSplitEnumerator<SplitT, EnumStateT> enumerator =
source.restoreEnumerator(context, checkpoint);
return new FlinkSourceEnumerator<>(enumerator, enumContext);
}
@Override
public SimpleVersionedSerializer<SplitWrapper<SplitT>> getSplitSerializer() {
return new SplitWrapperSerializer<>(source.getSplitSerializer());
}
@Override
public SimpleVersionedSerializer<EnumStateT> getEnumeratorCheckpointSerializer() {
Serializer<EnumStateT> enumeratorStateSerializer = source.getEnumeratorStateSerializer();
return new FlinkSimpleVersionedSerializer<>(enumeratorStateSerializer);
}
@Override
public TypeInformation<Row> getProducedType() {
return (TypeInformation<Row>) TypeConverterUtils.convert(source.getProducedType());
}
}
FlinkSourceEnumerator
/**
* The implementation of {@link SplitEnumerator}, used for proxy all {@link SourceSplitEnumerator}
* in flink.
*
* @param <SplitT> The generic type of source split
* @param <EnumStateT> The generic type of enumerator state
*/
public class FlinkSourceEnumerator<SplitT extends SourceSplit, EnumStateT>
implements SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> {
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceEnumerator.class);
private final SourceSplitEnumerator<SplitT, EnumStateT> sourceSplitEnumerator;
private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumeratorContext;
private final int parallelism;
private final Object lock = new Object();
private volatile boolean isRun = false;
private volatile int currentRegisterReaders = 0;
public FlinkSourceEnumerator(
SourceSplitEnumerator<SplitT, EnumStateT> enumerator,
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
this.sourceSplitEnumerator = enumerator;
this.enumeratorContext = enumContext;
this.parallelism = enumeratorContext.currentParallelism();
}
@Override
public void start() {
sourceSplitEnumerator.open();
}
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
sourceSplitEnumerator.handleSplitRequest(subtaskId);
}
@Override
public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int subtaskId) {
sourceSplitEnumerator.addSplitsBack(
splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
subtaskId);
}
@Override
public void addReader(int subtaskId) {
sourceSplitEnumerator.registerReader(subtaskId);
synchronized (lock) {
currentRegisterReaders++;
if (!isRun && currentRegisterReaders == parallelism) {
try {
sourceSplitEnumerator.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
isRun = true;
}
}
}
@Override
public EnumStateT snapshotState(long checkpointId) throws Exception {
return sourceSplitEnumerator.snapshotState(checkpointId);
}
@Override
public void close() throws IOException {
sourceSplitEnumerator.close();
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof NoMoreElementEvent) {
LOGGER.info(
"Received NoMoreElementEvent from reader [{}], current registered readers [{}]",
subtaskId,
enumeratorContext.currentParallelism());
enumeratorContext.sendEventToSourceReader(subtaskId, sourceEvent);
}
if (sourceEvent instanceof SourceEventWrapper) {
sourceSplitEnumerator.handleSourceEvent(
subtaskId, (((SourceEventWrapper) sourceEvent).getSourceEvent()));
}
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
sourceSplitEnumerator.notifyCheckpointComplete(checkpointId);
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
sourceSplitEnumerator.notifyCheckpointAborted(checkpointId);
}
}
FlinkSourceReader
/**
* The implementation of {@link SourceReader}, used for proxy all {@link
* org.apache.seatunnel.api.source.SourceReader} in flink.
*
* @param <SplitT>
*/
public class FlinkSourceReader<SplitT extends SourceSplit>
implements SourceReader<Row, SplitWrapper<SplitT>> {
private final org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> sourceReader;
private final FlinkRowCollector flinkRowCollector;
private InputStatus inputStatus = InputStatus.MORE_AVAILABLE;
public FlinkSourceReader(
org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> sourceReader,
SeaTunnelRowType seaTunnelRowType) {
this.sourceReader = sourceReader;
this.flinkRowCollector = new FlinkRowCollector(seaTunnelRowType);
}
@Override
public void start() {
try {
sourceReader.open();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public InputStatus pollNext(ReaderOutput<Row> output) throws Exception {
sourceReader.pollNext(flinkRowCollector.withReaderOutput(output));
return inputStatus;
}
@Override
public List<SplitWrapper<SplitT>> snapshotState(long checkpointId) {
try {
List<SplitT> splitTS = sourceReader.snapshotState(checkpointId);
return splitTS.stream().map(SplitWrapper::new).collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public CompletableFuture<Void> isAvailable() {
return CompletableFuture.completedFuture(null);
}
@Override
public void addSplits(List<SplitWrapper<SplitT>> splits) {
sourceReader.addSplits(
splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()));
}
@Override
public void notifyNoMoreSplits() {
sourceReader.handleNoMoreSplits();
}
@Override
public void handleSourceEvents(SourceEvent sourceEvent) {
if (sourceEvent instanceof NoMoreElementEvent) {
inputStatus = InputStatus.END_OF_INPUT;
}
if (sourceEvent instanceof SourceEventWrapper) {
sourceReader.handleSourceEvent((((SourceEventWrapper) sourceEvent).getSourceEvent()));
}
}
@Override
public void close() throws Exception {
sourceReader.close();
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
sourceReader.notifyCheckpointComplete(checkpointId);
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
sourceReader.notifyCheckpointAborted(checkpointId);
}
}
For more code details, you can reference: https://github.com/apache/seatunnel/pull/5100
Technical difficulties
Due to the close integration of Zeta engine side and SeaTunnel Source API, after the Reader sends a singleNoMoreElement signal, the engine side can perceive that the current Reader Task has no data to read and will close it.
However, for the Flink engine, after the Reader sends out the singleNoMoreElement signal, it can only notify the ReaderContext that we have encapsulated ourselves, but cannot notify the outer Flink Reader. How to transmit the signal that the current Reader data has been read to the end to the Flink engine has become a challenge. Thanks to the custom event rpc communication design of the Reader and Enumerator, signal transmission has become very simple, The overall process is as follows:
This model may seem perfect, but in reality, the effect is not as good as we expected. Due to the network overhead in RPC communication, the engine side will always call the reader's pullNext method during information transmission. In some connectors that do not achieve split deduplication, the reader will repeatedly issue data before receiving the end signal.
To solve this problem, after sending the end signal, the reader side will actively sleep for 2 seconds to avoid the risk of duplicate data issuance. However, this is not an optimal solution, and the optimal solution is to rewrite all Source reader logic.
Renovation plan
Stage 1:Refactor Flink translation layer,to achieve functional alignment,temporarily using thread sleep to avoid the logic of duplicate data distribution.
Stage 2:Refactor the logic of source reader & source enumerator, ensure the correctness of the model.
Stage 3:Flink engine supports rate limiting function.
Stage 4:Flink engine supports metric reporting function.
Stage 5:Flink engine supports multi table read/sink function.