Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The WITH option in table DDL defines the properties which is needed for specific connector to create source/sink. The connector properties structure was designed for SQL CLI config YAML a long time ago. That's why we have a "connector.*" prefix and "format.*" prefix for hierarchy definition. However, from the point of DDL view, the "connector.*" are verbose, because all the options in WITH are connector specific, including the formats. Besides, we may want to introduce more connector specific properties that allow specifying which fields should end up in which parts of the record in FLIP-107, e.g. "key.fields", "timestamp.field". However, prefixing "connector." will make them verbose, but without "connector." prefix makes them inconsistent.
This FLIP wants to simplify connector properties further, in order to make the properties more concise and readable and better cooperate with FLIP-107. With the introduction of new TableSource/TableSink/Factory interfaces, it's a good chance to refactor the connector properties with the new factory.
Proposed Changes
In the new properties, there are mainly 4 changes:
- "connector.type" will be renamed to "connector", "format.type" will be renamed to "format".
- put "connector.version" into "connector", i.e. the identifier. We suggest to use "-" as version delimiter (this should be a guide in the Javadoc of Factory). We propose to use the following identifiers for existing connectors:
- kafka => kafka for 0.11+ versions, we don't suffix "-universal", because the meaning of "universal" not easy to understand.
- kafka-0.11 => kafka for 0.11 version
- kafka-0.10 => kafka for 0.10 version
- elasticsearch-6 => elasticsearch for 6.x versions
- elasticsearch-7 => elasticsearch for 7.x versions
- hbase-1.4 => hbase for 1.4.x versions
- jdbc
- filesystem
- all the remaining "connector." prefix will be removed, but the "format." prefix will be kept
- refactor some property keys to prefix with "scan", or "lookup", or "sink". This can separate options from source and sink. We use this terms because they aligns to FLIP-95 interfaces, ScanTableSource, LookupTableSource and DynamicTableSink.
The reason why we still keep "format." prefix is for property hierarchy. We can easily extract all the "format" prefixed properties via a utility provided by framework, and then use the extracted properties to discover, validate and instantiate the format, e.g. FactoryUtil.find(TableFormatFactory.class, context, FORMAT_OPTION).
The "connector" property key is framework-level key and can't be renamed to other keys by connectors. That means all the connectors have the same "connector" property key. A factory is located by Factory#factoryIdentifier(), it reflects to the "connector" property for source/sink factories. Framework will extract the value of "connector" to discover DynamicTableSourceFactory/DynamicTableSinkFactory.
Remove Factory#factoryVersion() method
As proposed above, we would like to use a single identifier to discover factories. So Factory#factoryVersion() method is not needed anymore. We should remove this method which is introduced in FLIP-95 and also update related Javadocs.
New Property Keys
I colored some naming changes in red, they are not simply removes 'connector.' prefix or refactor 'scan', 'lookup', 'sink' prefix.
Kafka:
Old key | New key | Note |
---|---|---|
connector.type | connector | |
connector.version | N/A | merged into 'connector' key |
connector.topic | topic | |
connector.properties.zookeeper.connect | properties.zookeeper.connect | |
connector.properties.bootstrap.servers | properties.bootstrap.servers | |
connector.properties.group.id | properties.group.id | |
connector.startup-mode | scan.startup.mode | |
connector.specific-offsets | scan.startup.specific-offsets | |
connector.startup-timestamp-millis | scan.startup.timestamp-millis | |
connector.sink-partitioner | sink.partitioner | "fixed", or "round-robin", or a class name "org.mycompany.MyPartitioner" |
connector.sink-partitioner-class | N/A | merged into 'sink.partitioner', not needed anymore |
format.type | format |
Elasticsearch:
Old key | New key | Note |
---|---|---|
connector.type | connector | |
connector.version | N/A | merged into 'connector' key |
connector.hosts | hosts | |
connector.index | index | |
connector.document-type | document-type | |
connector.failure-handler | failure-handler | |
connector.connection-max-retry-timeout | connection.max-retry-timeout | |
connector.connection-path-prefix | connection.path-prefix | |
connector.key-delimiter | document-id.key-delimiter | They can also be used by sources in the future. In addition, we prefix 'document-id' to make the meaning more understandable. |
connector.key-null-literal | document-id.key-null-literal | |
connector.flush-on-checkpoint | sink.flush-on-checkpoint | |
connector.bulk-flush.max-actions | sink.bulk-flush.max-actions | we still use bulk-flush, because it's a elasticsearch terminology. |
connector.bulk-flush.max-size | sink.bulk-flush.max-size | |
connector.bulk-flush.interval | sink.bulk-flush.interval | |
connector.bulk-flush.back-off.type | sink.bulk-flush.back-off.strategy | |
connector.bulk-flush.back-off.max-retries | sink.bulk-flush.back-off.max-retries | |
connector.bulk-flush.back-off.delay | sink.bulk-flush.back-off.delay |
HBase:
Old key | New key | Note |
---|---|---|
connector.type | connector | |
connector.version | N/A | merged into 'connector' key |
connector.table-name | table-name | |
connector.zookeeper.quorum | zookeeper.quorum | |
connector.zookeeper.znode.parent | zookeeper.znode-parent | |
connector.write.buffer-flush.max-size | sink.buffer-flush.max-size | |
connector.write.buffer-flush.max-rows | sink.buffer-flush.max-rows | |
connector.write.buffer-flush.interval | sink.buffer-flush.interval |
JDBC:
Old key | New key |
---|---|
connector.type | connector |
connector.url | url |
connector.table | table-name |
connector.driver | driver |
connector.username | username |
connector.password | password |
connector.read.partition.column | scan.partition.column |
connector.read.partition.num | scan.partition.num |
connector.read.partition.lower-bound | scan.partition.lower-bound |
connector.read.partition.upper-bound | scan.partition.upper-bound |
connector.read.fetch-size | scan.fetch-size |
connector.lookup.cache.max-rows | lookup.cache.max-rows |
connector.lookup.cache.ttl | lookup.cache.ttl |
connector.lookup.max-retries | lookup.max-retries |
connector.write.flush.max-rows | sink.buffer-flush.max-rows |
connector.write.flush.interval | sink.buffer-flush.interval |
connector.write.max-retries | sink.max-retries |
Filesystem:
Old key | New key |
---|---|
connector.type | connector |
connector.path | path |
format.type | format |
Csv Format:
Old key | New key |
---|---|
format.type | format |
format.field-delimiter | csv.field-delimiter |
format.disable-quote-character | csv.disable-quote-character |
format.quote-character | csv.quote-character |
format.allow-comments | csv.allow-comments |
format.ignore-parse-errors | csv.ignore-parse-errors |
format.array-element-delimiter | csv.array-element-delimiter |
format.escape-character | csv.escape-character |
format.null-literal | csv.null-literal |
Json Format:
Old key | New key |
---|---|
format.type | format |
format.fail-on-missing-field | json.fail-on-missing-field |
format.ignore-parse-errors | json.ignore-parse-errors |
Examples
CREATE TABLE kafka_table (
...
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'test-topic',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
);
CREATE TABLE fs_table (
...
) WITH (
'connector' = 'filesystem',
'path' = 'file:///path/to/whatever',
'format' = 'csv',
'csv.allow-comments' = 'true',
'csv.ignore-parse-errors' = 'true'
);
CREATE TABLE es_table (
...
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'myuser',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size'= '42 mb',
'sink.bulk-flush.back-off.max-retries' = '10'
);
CREATE TABLE jdbc_table (
...
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/flink-test',
'table' = 'jdbc_table_name',
'driver' = 'com.mysql.jdbc.Driver',
'username' = '...',
'password'= '...',
'sink.buffer-flush.interval' = '5s'
);
You can refer to the documentation for current connector properties keys: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
In FLIP-107, we may introduce more connector properties that allow specifying which fields should end up in which parts of the record. So a Kafka table may look like this:
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
col1 STRING,
col2 STRING,
log_ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'test-topic',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'timestamp.field' = 'log_ts',
'key.fields' = 'id,name',
'key.format' = 'json',
'key.json.fail-on-missing-field' = 'false',
'value.format'= 'avro'
);
Compatibility, Deprecation, and Migration Plan
Because we introduce a new set of connector property keys for new factory. We propose don't touch the old factories, because there are too many moving parts in this release, it may affect the stability and happiness of our users if we change too much. So for compatibility, in the end, the kafka connector jar will contain two factories. If the connector properties are using "connector", then the framework will find the new factory instance. Otherwise ("connector.type"), the framework will find the old factory instance.
For future evolving, we propose to add a "property-version=1" to the properties implicitly when processing DDLs.
Rejected Alternatives
Option#1: Use factoryIdentifier as the prefix
'connector' = 'kafka',
'kafka.topic' = 'test-topic',
'format' = 'json', // the <factoryIdentifier> value would be a default prefix, as described in FLIP-95
'json.ignore-parse-errors' = 'true'
However, all the properties in the WITH clause are connector specific, including the formats. For example, Kafka may supports "key.format", and "value.format". So in order to make them consistent, we have prefix all the properties with the connector identifier name.
Option#2: Prefix all with specific connector identifier name
'connector' = 'kafka',
'kafka.version'='0.10',
'kafka.topic' = 'test-topic',
'kafka.key.fields' = 'id, name',
'kafka.key.format' = 'csv',
'kafka.key.csv.field-delimiter' = ';'
'kafka.value.format' = 'json',
'kafka.value.json.ignore-parse-errors' = 'true',
'kafka.timestamp.field' = 'log_ts'
However, all the "kafka" prefix are verbose, because all the properties are of course kafka specific (connecotr=kafka).