Page tree
Skip to end of metadata
Go to start of metadata

Status

Current stateAccepted

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-86-Improve-Connector-Properties-td34922.html

JIRA: 

FLINK-14645 - Getting issue details... STATUS

FLINK-14649 - Getting issue details... STATUS  

FLINK-14824 - Getting issue details... STATUS

FLINK-15137 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation


Connector properties is a very basic component which is used to construct a connector table via YAML, descriptor API, and DDL. It is also the serialization representation when persisting into catalog. However, we have encountered some problems when using connector properties, especially in DDL.

The following list contains problems that are ordered by priority:

  1. FLINK-14645: Data types defined in DDL loses precision and nullability when converting to properties.
  2. FLINK-14649: Some properties structure is hard to define in DDL, e.g. List and Map structure.
  3. FLINK-14824: Improve schema derivation for formats

Therefore, this document proposes some improvement changes to connector properties API. However, the current design does still not solve all the goals and use cases that the community wants to support.


Proposed Changes

Evolve Schema Definition

Currently, data types defined in DDL will be converted to TypeInformation and use TypeStringUtils to serialize/deserialize which will lose the precision and nullability information. The following is an example of current schema properties (schema is converted into the following properties to persist into catalog). 


"schema.0.name" = "a"
"schema.0.type" = "INT"
"schema.1.name" = "b"
"schema.1.type" = "BIGINT"
"schema.2.name" = "c"
"schema.2.type" = "TIMESTAMP"


The proposed way is to use LogicalType#asSerializableString and LogicalTypeParser to serialize/deserialize data types which keeps precision and nullability and other information. But we need to figure out how to keep compatible with previous versions.

The proposed compatible approach is to use another properties key for schema data type. As there were some discussions in the community about the “type” keyword is misused in many places and it’s confused what the type means (e.g. `connector.type`, `format.type`). I would propose to use “data-type” as the properties key.


"schema.0.name" = "a"
"schema.0.data-type" = "INT"
"schema.1.name" = "b"
"schema.1.data-type" = "BIGINT NOT NULL"
"schema.2.name" = "c"
"schema.2.data-type" = "TIMESTAMP(3)"


When deriving a TableSchema from properties, if the property key is “data-type”, then use LogicalTypeParser to parse. If the property key is “type”, then fallback to the old parser. When writing TableSchema to properties, always use the new serializer LogicalType#asSerializableString. Note that, “data-type” and “type” shouldn’t exist in the same schema properties in the same time, otherwise, an exception should be thrown. 

For DDL or descriptor users, it is transparent. For YAML users, it is suggested to upgrade to the new proposed properties. For example, if the users are using the following YAML:


tables:
  - name: MyUserTable    
    type: source           
    connector:
      type: kafka
      ...
    schema:
      - name: a
        type: INT
      - name: b
        type: BIGINT
      - name: c
        type: TIMESTAMP

The new proposed YAML would be:

tables:
  - name: MyUserTable      
    type: source           
    connector:
      type: kafka
      ...
    schema:
      - name: a
        data-type: INT
      - name: b
        data-type: BIGINT NOT NULL
      - name: c
        data-type: TIMESTAMP(3)


Using “data-type” property key, users can define nullability and precisions. 

Flatten Property Keys

There are some connector properties are list or map structures. They can be easily defined in YAML but not in DDL. For example, Kafka connector specific properties have to be set in the following way:


'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'connector.properties.2.key' = 'group.id',
'connector.properties.2.value' = 'testGroup',


It is complex and not intuitive to define in this way. In order to cooperate with DDL better, we propose to flatten all the property keys. 

It has some disadvantage to define in this way. 

  • Users need to keep track of the indices
  • The key space is not constant. Validation of keys would require prefix magic and wildcards. Like in TableFactories: `connector.propertie.#.key`.
  • It is complex and not intuitive to define and document.



The following table lists all the map or list structure properties in existing connectors and the proposed new properties. 



Connector

Old Properties

New Properties

Kafka

'connector.properties.0.key'='zookeeper.connect',

'connector.properties.0.value'='localhost:2181',

'connector.properties.1.key'='bootstrap.servers',

'connector.properties.1.value'='localhost:9092',

'connector.properties.2.key'='group.id',

'connector.properties.2.value'='testGroup'

'connector.properties.zookeeper.connect'='localhost:2181,localhost:2182',

'connector.properties.bootstrap.servers'='localhost:9092,localhost:9093',

'connector.properties.group.id'='testGroup'

Kafka

'connector.specific-offsets.0.partition'='0',

'connector.specific-offsets.0.offset'='42',

'connector.specific-offsets.1.partition'='1',

'connector.specific-offsets.1.offset'='300'

'connector.specific-offsets'=

'partition:0,offset:42;partition:1,offset:300'

ElasticSearch

'connector.hosts.0.hostname'='host_name',

'connector.hosts.0.port'='9092',

'connector.hosts.0.protocol'='http'

'connector.hosts'='http://host_name:9092;http://host_name:9093'


NOTE: The flattened `connector.properties.*` properties can accept arbitrary properties defined and documented by the kafka itself. We will cut the `connector.properties` key prefix, convert the adjusted keys and values into  Properties and pass to the underlying KafkaClient. The value of  `connector.properties.*` properties follows external system’s format. We can also document the required or important properties in the documentation.

The resolution order is if the new properties is defined, then use the new properties, otherwise, fallback to the old properties. We can try to use the new ConfigOption to extract the configuration values.


Format Schema Derivation


Currently, one also needs to define a schema for formats in DDL or descriptor API or YAML. For JSON, CSV and OldCsv formats, we already supported 'format.derive-schema'='true' to get the schema from table schema automatically. But for Avro format, a user has to pass an Avro schema file or define the format schema explicitly via 'format.avro-schema'. Avro is the only one which doesn’t support schema derivation. It is a bad experience for users to define schema twice, especially when the schema is very large.

The proposed changes contains:

  1. support derive schema from table schema for Avro format. 
  2. deprecate 'format.avro-schema'
  3. make 'format.derive-schema'='true' as the default behavior for all formats
  4. and also deprecate 'format.derive-schema' in general.

Compatibility, Deprecation, and Migration Plan


The proposed changes keep original property key and value and introduce new keys to support them in a better way. The new keys has a higher priority than old keys. So it can keep compatible with previous version. This is similar to ConfigOption#fallbackKeys.

We should also update the documentation to use new keys and add notes about the deprecated keys and maybe drop them in the future version. 


  • No labels