Discussion thread
Vote thread
JIRA

FLINK-17025 - Getting issue details... STATUS

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

The WITH option in table DDL defines the properties which is needed for specific connector to create source/sink. The connector properties structure was designed for SQL CLI config YAML a long time ago. That's why we have a "connector.*" prefix and "format.*" prefix for hierarchy definition. However, from the point of DDL view, the "connector.*" are verbose, because all the options in WITH are connector specific, including the formats. Besides, we may want to introduce more connector specific properties that allow specifying which fields should end up in which parts of the record in FLIP-107, e.g. "key.fields", "timestamp.field". However, prefixing "connector." will make them verbose, but without "connector." prefix makes them inconsistent.

This FLIP wants to simplify connector properties further, in order to make the properties more concise and readable and better cooperate with FLIP-107. With the introduction of new TableSource/TableSink/Factory interfaces, it's a good chance to refactor the connector properties with the new factory. 

Proposed Changes

In the new properties, there are mainly 4 changes:

  • "connector.type" will be renamed to "connector", "format.type" will be renamed to "format".
  • put "connector.version" into "connector", i.e. the identifier. We suggest to use "-" as version delimiter (this should be a guide in the Javadoc of Factory). We propose to use the following identifiers for existing connectors:
    • kafka => kafka for 0.11+ versions, we don't suffix "-universal", because the meaning of "universal" not easy to understand.
    • kafka-0.11 => kafka for 0.11 version
    • kafka-0.10 => kafka for 0.10 version
    • elasticsearch-6 => elasticsearch for 6.x versions
    • elasticsearch-7 => elasticsearch for 7.x versions
    • hbase-1.4 => hbase for 1.4.x versions
    • jdbc
    • filesystem
  • all the remaining "connector." prefix will be removed, but the "format." prefix will be kept
  • refactor some property keys to prefix with "scan", or "lookup", or "sink". This can separate options from source and sink. We use this terms because they aligns to FLIP-95 interfaces, ScanTableSource, LookupTableSource and DynamicTableSink.

The reason why we still keep "format." prefix is for property hierarchy. We can easily extract all the "format" prefixed properties via a utility provided by framework, and then use the extracted properties to discover, validate and instantiate the format, e.g. FactoryUtil.find(TableFormatFactory.class, context, FORMAT_OPTION).

The "connector" property key is framework-level key and can't be renamed to other keys by connectors. That means all the connectors have the same "connector" property key.  A factory is located by Factory#factoryIdentifier(), it reflects to the "connector" property for source/sink factories. Framework will extract the value of "connector" to discover DynamicTableSourceFactory/DynamicTableSinkFactory.

Remove Factory#factoryVersion() method

As proposed above, we would like to use a single identifier to discover factories. So Factory#factoryVersion() method is not needed anymore. We should remove this method which is introduced in FLIP-95 and also update related Javadocs.

New Property Keys

I colored some naming changes in red, they are not simply removes 'connector.' prefix or refactor 'scan', 'lookup', 'sink' prefix. 

Kafka:

Old keyNew keyNote
connector.typeconnector
connector.versionN/Amerged into 'connector' key
connector.topictopic
connector.properties.zookeeper.connectproperties.zookeeper.connect
connector.properties.bootstrap.serversproperties.bootstrap.servers
connector.properties.group.idproperties.group.id
connector.startup-modescan.startup.mode
connector.specific-offsetsscan.startup.specific-offsets
connector.startup-timestamp-millisscan.startup.timestamp-millis
connector.sink-partitionersink.partitioner"fixed", or "round-robin", or a class name "org.mycompany.MyPartitioner"
connector.sink-partitioner-classN/Amerged into 'sink.partitioner', not needed anymore
format.typeformat


Elasticsearch:

Old keyNew keyNote
connector.typeconnector
connector.versionN/Amerged into 'connector' key
connector.hostshosts
connector.indexindex
connector.document-typedocument-type
connector.failure-handlerfailure-handler
connector.connection-max-retry-timeoutconnection.max-retry-timeout
connector.connection-path-prefixconnection.path-prefix
connector.key-delimiterdocument-id.key-delimiterThey can also be used by sources in the future. In addition, we prefix 'document-id' to make the meaning more understandable. 
connector.key-null-literaldocument-id.key-null-literal
connector.flush-on-checkpointsink.flush-on-checkpoint
connector.bulk-flush.max-actionssink.bulk-flush.max-actionswe still use bulk-flush, because it's a elasticsearch terminology.
connector.bulk-flush.max-sizesink.bulk-flush.max-size
connector.bulk-flush.intervalsink.bulk-flush.interval
connector.bulk-flush.back-off.typesink.bulk-flush.back-off.strategy
connector.bulk-flush.back-off.max-retriessink.bulk-flush.back-off.max-retries
connector.bulk-flush.back-off.delaysink.bulk-flush.back-off.delay


HBase:

Old keyNew keyNote
connector.typeconnector
connector.versionN/Amerged into 'connector' key
connector.table-nametable-name
connector.zookeeper.quorumzookeeper.quorum
connector.zookeeper.znode.parentzookeeper.znode-parent
connector.write.buffer-flush.max-sizesink.buffer-flush.max-size
connector.write.buffer-flush.max-rowssink.buffer-flush.max-rows
connector.write.buffer-flush.intervalsink.buffer-flush.interval


JDBC:

Old keyNew key
connector.typeconnector
connector.urlurl
connector.tabletable-name
connector.driverdriver
connector.usernameusername
connector.passwordpassword
connector.read.partition.columnscan.partition.column
connector.read.partition.numscan.partition.num
connector.read.partition.lower-boundscan.partition.lower-bound
connector.read.partition.upper-boundscan.partition.upper-bound
connector.read.fetch-sizescan.fetch-size
connector.lookup.cache.max-rowslookup.cache.max-rows
connector.lookup.cache.ttllookup.cache.ttl
connector.lookup.max-retrieslookup.max-retries
connector.write.flush.max-rowssink.buffer-flush.max-rows
connector.write.flush.intervalsink.buffer-flush.interval
connector.write.max-retriessink.max-retries


Filesystem:

Old keyNew key
connector.typeconnector
connector.pathpath
format.typeformat


Csv Format:

Old keyNew key
format.typeformat
format.field-delimitercsv.field-delimiter
format.disable-quote-charactercsv.disable-quote-character
format.quote-charactercsv.quote-character
format.allow-commentscsv.allow-comments
format.ignore-parse-errorscsv.ignore-parse-errors
format.array-element-delimitercsv.array-element-delimiter
format.escape-charactercsv.escape-character
format.null-literalcsv.null-literal


Json Format:

Old keyNew key
format.typeformat
format.fail-on-missing-fieldjson.fail-on-missing-field
format.ignore-parse-errorsjson.ignore-parse-errors


Examples

CREATE TABLE kafka_table (
 ...
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'test-topic',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
);

CREATE TABLE fs_table (
 ...
) WITH (
'connector' = 'filesystem',
'path' = 'file:///path/to/whatever',
'format' = 'csv',
'csv.allow-comments' = 'true',
'csv.ignore-parse-errors' = 'true'
);

CREATE TABLE es_table (
 ...
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'myuser',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size'= '42 mb',
'sink.bulk-flush.back-off.max-retries' = '10'
);

CREATE TABLE jdbc_table (
 ...
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/flink-test',
'table' = 'jdbc_table_name',
'driver' = 'com.mysql.jdbc.Driver',
'username' = '...',
'password'= '...',
'sink.buffer-flush.interval' = '5s'
);


You can refer to the documentation for current connector properties keys: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html

In FLIP-107, we may introduce more connector properties that allow specifying which fields should end up in which parts of the record. So a Kafka table may look like this:


CREATE TABLE kafka_table (
id BIGINT,
name STRING,
col1 STRING,
col2 STRING,
log_ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'test-topic',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',

'timestamp.field' = 'log_ts',
'key.fields' = 'id,name',
'key.format' = 'json',
'key.json.fail-on-missing-field' = 'false',
'value.format'= 'avro'
);

Compatibility, Deprecation, and Migration Plan

Because we introduce a new set of connector property keys for new factory. We propose don't touch the old factories, because there are too many moving parts in this release, it may affect the stability and happiness of our users if we change too much. So for compatibility, in the end, the kafka connector jar will contain two factories. If the connector properties are using "connector", then the framework will find the new factory instance. Otherwise ("connector.type"), the framework will find the old factory instance. 

For future evolving, we propose to add a "property-version=1" to the properties implicitly when processing DDLs.  

Rejected Alternatives

Option#1: Use factoryIdentifier as the prefix

'connector' = 'kafka',
'kafka.topic' = 'test-topic',
'format' = 'json', // the <factoryIdentifier> value would be a default prefix, as described in FLIP-95
'json.ignore-parse-errors' = 'true'

However, all the properties in the WITH clause are connector specific, including the formats. For example, Kafka may supports "key.format", and "value.format". So in order to make them consistent, we have prefix all the properties with the connector identifier name. 

Option#2: Prefix all with specific connector identifier name

'connector' = 'kafka',
'kafka.version'='0.10',
'kafka.topic' = 'test-topic',
'kafka.key.fields' = 'id, name',
'kafka.key.format' = 'csv',
'kafka.key.csv.field-delimiter' = ';'
'kafka.value.format' = 'json',
'kafka.value.json.ignore-parse-errors' = 'true',
'kafka.timestamp.field' = 'log_ts'

However, all the "kafka" prefix are verbose, because all the properties are of course kafka specific (connecotr=kafka).