Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The old JDBC connector based on the XXXFormat had already completed. This FLIP will Port JDBC Connector to FLIP-27&FLIP-143 to adapt to the unified connector abstraction and address the disadvantages in the old connector.

This proposal is mainly divided into two parts: the new JDBC source & the new JDBC Sink.

Disadvantages encountered in the old JDBC connector.

  • Couldn't connect with secret or other properties.
  • Can't support customized jdbc calalogs.
  • The uncollected issues...

Public Interfaces

Please include the Flink interfaces that you will use when implementing your connector. You most likely will use one or more of these:

Proposed Changes

Describe the new connector you are proposing in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change. Please include details on implementation including what destinations your connector will support, batch and stream support, async or synchronous connector implementation, etc.


We need to make some changes on SinkV2#InitContext to allow us to reuse the same functionality that already exists on previous Sinks, but the SinkV2#InitContext does not expose it.

  • JdbcOutputFormat relies on isObjectReuseEnabled/ExecutionConfig in RuntimeContext, this is because the jdbc sink needs to decide whether to buffer the copies or the original records based on isObjectReuseEnabled. When the object reuse is enabled, we should buffer the copies(because the content of the objects may be changed before flush), otherwise we should buffer the original record.

public interface Sink<InputT> extends Serializable {
	... current methods

    public interface InitContext {
        ... current methods
	    ExecutionConfig getExecutionConfig();

New JDBC Source

DataStream Source


Is it necessary to introduce connection pool or data source ?

/** Adjust the JDBC connection provider. org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider */
public interface JdbcConnectionProvider {

    // The old methods declaration placeholder....
    // Add the new method declaration.
     * Get existing connection properties.
     * @return existing connection properties
    default Properties getProperties() {
        return new Properties();

	// Get or create sharding connections.
    default Collection<Connection> getOrCreateShardConnections(
            String remoteCluster, String remoteDataBase) throws SQLException {
        return Collections.emptyList();

	// Get or create connection for one shard.
    Connection getOrCreateShardConnection(String url, String database) throws SQLException;

	// Get all shard urls.
    default List<String> getShardUrls(String remoteCluster) throws SQLException {
        return new ArrayList<>();

	// Close all connections.
    void closeConnections();  
/** Adjust JDBC connection options. org.apache.flink.connector.jdbc.JdbcConnectionOptions */
public class JdbcConnectionOptions implements Serializable {
    // Old placeholders....

    // Add the extendProps.
    @Nullable protected final Properties extendProps;

    protected JdbcConnectionOptions(
            String url,
            @Nullable String driverName,
            @Nullable String username,
            @Nullable String password,
            int connectionCheckTimeoutSeconds,
            @Nullable Properties extendProps) {}
    // placeholders......

    /** Builder for {@link JdbcConnectionOptions}. */
    public static class JdbcConnectionOptionsBuilder {
        private String url;
        private String driverName;
        private String username;
        private String password;
        private int connectionCheckTimeoutSeconds = 60;
        protected Properties extendProps;
        // placeholders......

        public JdbcConnectionOptions build() {
            return new JdbcConnectionOptions(


 * A {@link SourceSplit} that represents a JDBC sqlTemplate with optional parameters.
 * <p>The split has an offset, which defines the offset of the queries {@link java.sql.ResultSet}
 * for the split. The offset is the checkpointed position from a reader previously reading this
 * split. This position is typically zero when the split is assigned from the assigner to the
 * readers, and is positive when the readers checkpoint their state in a Jdbc source split.
public class JdbcSourceSplit implements SourceSplit, Serializable {     
    private final String id;     
    private final String sqlTemplate;     
    private final @Nullable Serializable[] parameters; 
    // The offset of ResultSet on the sql rendered by the split     
    private final int offset;     
    // .... 
 * State of the JdbcSourceSplit.
 * <p>The {@link JdbcSourceSplit} assigned to the reader or stored in the checkpoint points to the
 * position from where to start reading (after recovery), so the current offset need to always point
 * to the record after the last emitted record.
public class JdbcSourceSplitState<SplitT extends JdbcSourceSplit> implements Serializable {

    private final SplitT split;
    private int offset;

    // ...

    // methods ...
    public void setPosition(int offset)
    public SplitT toJdbcSourceSplit()
/** A serializer for the {@link JdbcSourceSplit}. */
public class JdbcSourceSplitSerializer implements SimpleVersionedSerializer<JdbcSourceSplit> {


/** The enumerator class for Jdbc source. */
public class JdbcSourceEnumerator
        implements SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> {

    private final SplitEnumeratorContext<JdbcSourceSplit> context;
    private final Boundedness boundedness;

	// The map to store the info of split-request.
    private final LinkedHashMap<Integer, String> readersAwaitingSplit;

    private final List<JdbcSourceSplit> unassigned;

    // The jdbcSqlSplitAssigner will be responsible for determining whether it can respond to the allocation of a JdbcSourceSplit
    private final JdbcSqlSplitAssigner<JdbcSourceSplit> jdbcSqlSplitAssigner;

	// ....
/** The state of Jdbc source enumerator. */
public class JdbcSourceEnumeratorState implements Serializable {

    private @Nonnull List<JdbcSourceSplit> remainingSplits;
    private final @Nullable Serializable optionalUserDefinedSplitAssignerState;
    // ... placeholders...
public class JdbcSourceEnumStateSerializer
        implements SimpleVersionedSerializer<JdbcSourceEnumeratorState> {}
 * The {@code OptionalUserDefinedState} is responsible for getting and setting the
 * user-defined-state.
public interface OptionalUserDefinedState<T extends Serializable> extends Serializable {

     * Get the optional user-defined-state. The default value is null.
     * @return {@code null} or an object implemented from {@link}.
    default T getOptionalUserDefinedState() {
        return null;

     * Set the optional user-defined-state.
     * @param optionalUserDefinedState user-defined-state
    default void setOptionalUserDefinedState(@Nullable T optionalUserDefinedState) {}
 * The {@code JdbcSqlSplitAssigner} is responsible for getting the split should be processed
 * next .
public interface JdbcSqlSplitAssigner<SplitT extends JdbcSourceSplit>
        extends Serializable, OptionalUserDefinedState<Serializable> {          /**
     * Called to open the assigner to acquire any resources, like threads or network connections.
     * Integer splitCapacity,
    void open(ReadableConfig config);

     * Gets the next split.
     * <p>When this method returns an empty {@code Optional}, then the set of splits is assumed to
     * be done and the source will finish once the readers finished their current splits.
    Optional<JdbcSourceSplit> getNextSplit();

     * Adds a set of splits to this assigner. This happens for example when some split processing
     * failed and the splits need to be re-added.
    void addSplits(Collection<JdbcSourceSplit> splits);

     * Notifies the listener that the checkpoint with the given {@code checkpointId} completed and
     * was committed.
     * @see CheckpointListener#notifyCheckpointComplete(long)
    void notifyCheckpointComplete(long checkpointId);

    default Serializable snapshotState(long checkpointId) {
        return getOptionalUserDefinedState();

     * Called to close the assigner, in case it holds on to any resources, like threads or network
     * connections.
    void close();

    char[] getCurrentSplitId();


Stream execution mode
  • JdbcSourceSplit-based for 'At least once'
    • If any exception is encountered, just reprocess the JdbcSourceSplit for the default offset value 0.
  • ResultSet offset based for 'Exactly Once':
    • If any exception is encountered, just reprocess the JdbcSourceSplit based on the offset value from JdbcSourceSplitState.
    • If the offset value is greater than the number of result set messages,  skip the current JdbcSourceSplit.
      • If offset is less than or equal to the number of result set messages, continue processing based on the  offset position.
    • Disadvantages: It only makes sense for Exactly Once that the ResultSet corresponding to this SQL(JdbcSourceSplit) remains unchanged in the whole lifecycle of JdbcSourceSplit processing. Unfortunately, this condition is not met in most databases and data scenarios.
  • JdbcSourceSplit-based for 'At most once'
    • If the offset value of the current JdbcSourceSplit is not 0, it indicates that this is a processed JdbcSourceSplit for now,just skip this JdbcSourceSplit. In short, once we process the current JdbcSourceSplit with failure, it will be ignored and the processing of the next JdbcSourceSplit will begin.
batch execution mode

If any exception is encountered, just reprocess the JdbcSourceSplit for the default offset value 0.

JdbcSourceReader & JdbcSourceSplitReader 
/** The source reader for Jdbc splits. */
public class JdbcSourceReader<OUT>
        extends SingleThreadMultiplexSourceReaderBase<
                RecordAndOffset<OUT>, OUT, JdbcSourceSplit, JdbcSourceSplitState<JdbcSourceSplit>> {}

/** The {@link SplitReader} implementation for the jdbc source. */
public class JdbcSourceSplitReader<T>
        implements SplitReader<RecordAndOffset<T>, JdbcSourceSplit>, ResultTypeQueryable<T> {

    private final Configuration config;
    protected TypeInformation<T> typeInformation;
    private final ResultExtractor<T> resultExtractor;
    private final DeliveryGuarantee deliveryGuarantee;
    protected JdbcConnectionProvider connectionProvider;
    private final SourceReaderContext context;
    @Nullable private JdbcSourceSplit currentSplit;
    private final Queue<JdbcSourceSplit> splits;

    protected transient Connection connection;
    protected transient PreparedStatement statement;
    protected transient ResultSet resultSet;
    protected boolean hasNextRecordCurrentSplit;

    // config for ResultSet 
    private final int splitReaderFetchBatchSize;
    protected int resultSetType;
    protected int resultSetConcurrency;
    protected int resultSetFetchSize;
    // Boolean to distinguish between default value and explicitly set autoCommit mode.
    protected Boolean autoCommit;

    //... placeholder.

public class RecordAndOffset<E> {
    final E record;
    final int offset;
    // ------------------------------------------------------------------------
    // methods list
    public E getRecord()
    public int getOffset()

public interface ResultExtractor<T> extends Serializable {
    T extract(ResultSet resultSet) throws SQLException;


 * The Source implementation of JdbcSource. Please use a {@link JdbcSourceBuilder} to construct a
 * {@link JdbcSource}. The following example shows how to create a JdbcSource emitting records of
 * <code>
 * Row</code> type.
 * <pre>{@code
 * JdbcSource<Row> source = JdbcSource
 *     .<Row>builder()
 *     .setResultExtractor(...)
 *     .setJdbcSqlSplitAssignerProvider(...)
 *     .setConnectionProvider(...)
 *     .build();
 * }</pre>
 * <p>See {@link JdbcSourceBuilder} for more details.
 * @param <OUT> the output type of the source.
public class JdbcSource<OUT>
        implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
                ResultTypeQueryable<OUT> {

    private final Boundedness boundedness;
    private final TypeInformation<OUT> typeInformation;

    private final JdbcSqlSplitAssigner.Provider<JdbcSourceSplit> jdbcSqlSplitAssignerProvider;

    protected JdbcConnectionProvider connectionProvider;
    private final ResultExtractor<OUT> resultExtractor;
    private final DeliveryGuarantee deliveryGuarantee;
    // methods placeholders.......
 * The @builder class for {@link JdbcSource} to make it easier for the users to construct a {@link
 * JdbcSource}.
public class JdbcSourceBuilder<OUT> {

    private final Configuration configuration;
    private int splitReaderFetchBatchSize;
    private int resultSetType;
    private int resultSetConcurrency;
    private int resultSetFetchSize;
    // Boolean to distinguish between default value and explicitly set autoCommit mode.
    private Boolean autoCommit;
    private DeliveryGuarantee deliveryGuarantee;
    private TypeInformation<OUT> typeInformation;
    private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
    private ContinuousEnumerationSettings continuousEnumerationSettings;
    private JdbcSqlSplitAssigner.Provider<JdbcSourceSplit> jdbcSqlSplitAssignerProvider;
    private ResultExtractor<OUT> resultExtractor;
    private JdbcConnectionProvider connectionProvider;

    public JdbcSourceBuilder() {
        // placeholders ....    
    // builder build methods placeholders ....  

    // ----------------methods list.-----------------------
    // ----- required methods to call -------------------------------
    public JdbcSourceBuilder<OUT> setResultExtractor(ResultExtractor<OUT> resultExtractor)
    public JdbcSourceBuilder<OUT> setJdbcSqlSplitAssignerProvider(JdbcSqlSplitAssigner.Provider<JdbcSourceSplit> jdbcSqlSplitAssignerProvider);
    public JdbcSourceBuilder<OUT> setUsername(String username);
    public JdbcSourceBuilder<OUT> setPassword(String password);
    public JdbcSourceBuilder<OUT> setDriverName(String driverName);
    public JdbcSourceBuilder<OUT> setDBUrl(String dbURL);
    public JdbcSource<OUT> build();

    // ------ Optional methods to call------------------------------

    public JdbcSourceBuilder<OUT> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee);
    public JdbcSourceBuilder<OUT> setTypeInformation(TypeInformation<OUT> typeInformation);
    public JdbcSourceBuilder<OUT> setSplitReaderFetchBatchSize(int splitReaderFetchBatchSize);
    public JdbcSourceBuilder<OUT> setResultSetType(int resultSetType);
    public JdbcSourceBuilder<OUT> setResultSetConcurrency(int resultSetConcurrency);
    public JdbcSourceBuilder<OUT> setAutoCommit(boolean autoCommit);
    public JdbcSourceBuilder<OUT> setResultSetFetchSize(int resultSetFetchSize);
    public JdbcSourceBuilder<OUT> setConnectionProvider(JdbcConnectionProvider connectionProvider);
    public JdbcSourceBuilder<OUT> setJdbcConnectionOptionsBuilder(JdbcConnectionOptionsBuilder jdbcConnectionOptionsBuilder);

Table Source

/** A {@link DynamicTableSource} for New JDBC Source. */
public class JdbcDynamicTableSourceV2
        implements ScanTableSource,
                SupportsWatermarkPushDown {

    private static final String JDBC_TRANSFORMATION = "jdbc";

    private final JdbcConnectorOptions options;
    private final JdbcReadOptions readOptions;
    private final JdbcLookupOptions lookupOptions;
    private DataType physicalRowDataType;
    private final String dialectName;
    private long limit = -1;
    private @Nullable WatermarkStrategy<RowData> watermarkStrategy;
    private final String tableIdentifier;

    private final ReadableConfig readableConfig;

    private final DynamicTableFactory.Context context;
    // placeholders.....
    // need to use the JdbcSourceBuilder to implement the method.                
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext);


/** A lookup function for {@link JdbcDynamicTableSourceV2}. */
public class JdbcRowDataLookupFunction extends TableFunction<RowData> {

    private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
    private static final long serialVersionUID = 2L;

    private final String query;
    private final JdbcConnectionProvider connectionProvider;
    private final DataType[] keyTypes;
    private final String[] keyNames;
    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private final boolean cacheMissingKey;
    private final JdbcDialect jdbcDialect;
    private final JdbcRowConverter jdbcRowConverter;
    private final JdbcRowConverter lookupKeyRowConverter;

    private transient FieldNamedPreparedStatement statement;
    private transient Cache<RowData, List<RowData>> cache;

New JDBC Sink

DataStream Sink

Non-XA Sink (At Least Once semantic)

  • JdbcSink

    // JdbcSink class. 
    public class JdbcSink<IN> implements Sink<IN> {     
    	private final JdbcConnectionProvider connectionProvider;
        private final JdbcExecutionOptions executionOptions;
        private final JdbcQueryStatement<IN> queryStatement;
        public JdbcWriter<IN> createWriter(InitContext context) throws IOException ;
  • JdbcWriter

    class JdbcWriter<IN> implements SinkWriter<IN> {
        private final JdbcOutputFormat<IN, IN, JdbcBatchStatementExecutor<IN>> jdbcOutput;
        public void write(IN element, Context context) throws IOException, InterruptedException ;
        public void flush(boolean endOfInput) throws IOException, InterruptedException;
        public void close() throws Exception ;
  • JdbcQueryStatement

     * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record.
    public interface JdbcQueryStatement<T> extends Serializable {
        String query();
        void map(PreparedStatement ps, T data) throws SQLException;

XA Sink (Exactly Once semantic)

  • JdbcSink

    // JdbcSink class.
    public class JdbcSink<IN>
            implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommittable> {
        public JdbcWriter<IN> createWriter(InitContext context) throws IOException;
        public Committer<JdbcCommittable> createCommitter() throws IOException;
        public SimpleVersionedSerializer<JdbcCommittable> getCommittableSerializer();
        public StatefulSinkWriter<IN, JdbcWriterState> restoreWriter(
                InitContext context, Collection<JdbcWriterState> recoveredState);
        public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer();
  • JdbcWriter

    class JdbcWriter<IN>
            implements StatefulSink.StatefulSinkWriter<IN, JdbcWriterState>,
                    TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, JdbcCommittable> {          
    	private final JdbcOutputFormat<IN, IN, JdbcBatchStatementExecutor<IN>> jdbcOuput;
    	private final XaFacade xaFacade;
        private final XaGroupOps xaGroupOps;
        private final XidGenerator xidGenerator;
        private final XaSinkStateHandler stateHandler;
        private final JdbcExactlyOnceOptions options;
        private transient List<CheckpointAndXid> preparedXids = new ArrayList<>();
        private transient Deque<Xid> hangingXids = new LinkedList<>();
        private transient Xid currentXid;      
  • JdbcWriterState

    class JdbcWriterState implements Serializable {
        private final Collection<CheckpointAndXid> prepared;
        private final Collection<Xid> hanging;
  • JdbcWriterStateSerializer

    class JdbcWriterStateSerializer
            implements SimpleVersionedSerializer<JdbcWriterState> {
        public static final int VERSION = 1;
    	// methods list.
        public int getVersion();
        public byte[] serialize(@Nonnull JdbcWriterState writerState) throws IOException;
        public JdbcWriterState deserialize(int version, byte[] serialized) throws IOException;
  • JdbcCommitter

    class JdbcCommitter implements Committer<JdbcCommittable> {
        public void commit(Collection<CommitRequest<JdbcCommittable>> committables)
                throws IOException, InterruptedException {}
        public void close() throws Exception {}
  • JdbcCommitable

    class JdbcCommittable implements Serializable {}
  • JdbcCommittableSerializer

    class JdbcCommittableSerializer implements SimpleVersionedSerializer<JdbcCommittable> {
    	// method list.
        public int getVersion();
        public byte[] serialize(JdbcCommittable obj) throws IOException;
        public JdbcCommittable deserialize(int version, byte[] serialized) throws IOException;

Table Sink

  • JdbcDynamicTableSinkV2
    /** A {@link DynamicTableSink} for JDBC. */
    public class JdbcDynamicTableSinkV2 implements DynamicTableSink {
    	// old code placeholder...
        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
            // ...
            return SinkV2Provider.of(
                    new JdbcSink<>(..., jdbcOptions.getParallelism());

Enhance Catalog

Support customized jdbc calalogs and  connection

// Introduce context & config fields
public abstract class AbstractJdbcCatalog extends org.apache.flink.table.catalog.AbtrastCatalog {

    // old fields lines placeholder.......
    // Introduced new fileds.
    protected final Context context;
    protected final ReadableConfig config;
    public AbstractJdbcCatalog(Context context, ReadableConfig config) {
    // Introduced new methods.
    public Context getContext();
    public ReadableConfig getConfig();
    public Connection getConnection();
    // the old code lines placeholder...
// Introduce the 'createCatalog' method into package org.apache.flink.connector.jdbc.dialect.JdbcDialect
 * Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be stateless.
 * @see JdbcDialectFactory
public interface JdbcDialect extends Serializable {
    // The old methods declaration placeholder.....

    // The new method declaration.
     * Constructs a catalog.
     * @return A catalog based on the current SQL dialect.
    AbstractJdbcCatalog createCatalog(CatalogFactory.Context context, ReadableConfig config);

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
    • Users need to complete the upgrade before the community abandoned the old JDBC connector API 
  • If we are changing behavior how will we phase out the older behavior?
    • Before the new JDBC connector source/sink reaches the stable state, we need not to remove the old JDBC connector implementation, just mark it with '@Depracated' and new JDBC connector migration comments.
  • When will we remove the existing behavior?
    • After the new JDBC connector source/sink reaching the stable state. For example, at  one or more big-range versions after the feature released .

Test Plan

Make Integration-test based on test-containers, which has been proved by previous integration test work for most Flink connectors..

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

Related References

  1. [FLINK-24349] Support customized Calalogs via JDBC
  2. [Flink JDBC connect with secret] discuss thread.