Issues Address
https://github.com/apache/seatunnel/issues/5395
Motivation
There are two problems in the current multi-table writing solution:
1. The shuffle capability will be used, and the current shuffle implementation has a great impact on performance.
2. Too many links problem.
The purpose of this project is to solve these two main problems
Implementation plan
Implement a MultiTableSink, this Sink will implement the interface corresponding to the current SeaTunnelSink, and will manage multiple internal SeaTunnelSinks at the same time.
In the same way, implement MultiTableSinkWriter, MultiTableSinkCommitter, MultiTableSinkAggCommitter
When data is written into the MultiTableSink, it will find the corresponding SeaTunnelSink that needs to be written according to the tableId on the Row. This relationship is maintained in the MultiTableSink
Create multiple threads, multiple threads poll row data, and then slice the row data into different SeaTunnelSinks
Support for custom slicers
Create N SeaTunnelSinks per thread
The number of threads in the thread pool is the same as the number of connections in the connection pool
When the engine needs to call various interfaces implemented by Sink, MultiTableSink will implement forwarding and summarization
When there are resources that can be shared, such as Connection, or other resources that can be shared by multiple SeaTunnelSinks, we can manage them through SharedResourceManager.
Public Interfaces
SupportMultiTableSink
It is used to identify the SeaTunnelSink interface that supports multiple tables. After implementing this interface, we will package the corresponding SeaTunnelSink in the logic plan generation stage. For each degree of parallelism, MultiTableSink wraps multiple SeaTunnelSinks to complete the replacement of Sinks
/** * The Sink Connectors which support multi table should implement this interface */ public interface SupportMultiTableSink<T extends SharedResource> extends SupportResourceShare<T> { }
MultiTableResourceManager
It is specially designed for a single MultiTableSink/MultiTableSinkWriter/MultiTableSinkCommitter/MultiTableSinkAggCommitter to handle multiple SeaTunnelSink or SinkWriter resource sharing interfaces. Users can implement a custom resource sharer by implementing this interface
/** * The multi table resource manager */ public interface MultiTableResourceManager<T extends SharedResource> { default Optional<T> getSharedResource() { return Optional.empty(); } }
Shared Resource
Resource identification interface, used to identify resources that can be shared
public interface SharedResource { }
SupportMultiTableSinkWriter
Used to identify the SinkWriter interface that supports multiple tables, currently only for identification
/** * The Sink Connector Writer which support multi table should implement this interface */ public interface SupportMultiTableSinkWriter<T extends SharedResource> extends SupportResourceShare<T> { }
SupportResourceShare
For the initialization of MultiTableResourceManager, MultiTableSink will call the initMultiTableResourceManager method to realize one-time resource initialization, and then distribute the corresponding resources to different SeaTunnelSinks
public interface SupportResourceShare<T extends SharedResource> { default Optional<MultiTableResourceManager<T>> initMultiTableResourceManager() { return Optional.empty(); } default void setMultiTableResourceManager(MultiTableResourceManager<T> multiTableResourceManager) {} }
Proposed Changes
In MultiTableJobConfigParser, the configuration `seatunnel.optimizer.multitablesink` will be added to enable or disable the current MultiTableSink generation function
Add SeaTunnelOptimizer for logical plan optimization, currently supports conversion of multiple SeaTunnelSinks into a MultiTableSink
Compatibility
New features, compatible with historical features