Status
Current state: Accepted
Discussion thread: here
Vote thread: here
JIRA: 16913
-
KAFKA-16913Getting issue details...
STATUS
Motivation
When using a Connector that requires a schema (such as JDBC connectors) with JSON messages, the current JSONConverter requires a copy of the schema to be included in every message.
This increases the size of messages significantly.
It is also inconvenient, as few applications will produce messages that look like this.
{ "schema": { "type": "struct", "fields": [ { "field": "id", "type": "string", "optional": false }, { "field": "name", "type": "string", "optional": false } ] }, "payload": { "id": "emp_001", "name": "Kevin" } }
The ideal alternative (using a different JSON converter, paired with a schema registry) is difficult to adopt in some situations, such as where a schema registry is not available, or the Kafka message producers cannot be modified to include a reference or ID for the appropriate schema.
Public Interfaces
This proposal introduces a new optional config option for org.apache.kafka.connect.json.JsonConverter
schema.content
If a value for this is not provided, the current behaviour (reading schemas from the message payload) is followed.
If a value is provided, it will be used as the schema
For example, you could use the config:
schemas.enable=true schema.content={"type": "struct", "fields": [{ "field": "id", "type": "string", "optional": false },{ "field": "name", "type": "string", "optional": false }]}
and the Kafka message payload contains:
{ "id": "emp_001", "name": "Kevin" }
Existing parsing and interpretation of schemas will be identical to today.
The only difference is the ability to provide schemas externally to individual messages.
This option will only be used for toConnectData calls. It will be ignored for fromConnectData , this means that this convertor will be explicitly used for sink connector not source connector
Users would be able use ConfigProviders to provide the contents of a file as a schema.
For Example,
# Config for Kafka Connect config.providers=directory config.providers.directory.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider # Config for Kafka Convertor schema.content=${directory:/schema:schema.json}
KIP-993: Allow restricting files accessed by File and Directory ConfigProviders will allow users to restrict what files to be used.
While using Connect REST API to create Connector with Schema Configuration, we will need include escape sequences in the schema.content
field since REST API payload needs to be a JSON.
For Example
{ "name": "api-file-source-json", "config": { "value.converter.schemas.enable": true, "value.converter.schema.content": "{\"type\": \"struct\", \"fields\": [{ \"field\": \"id\", \"type\": \"string\", \"optional\": false },{ \"field\": \"name\", \"type\": \"string\", \"optional\": false }]}" } }
Proposed Changes
Functional changes will be made to org.apache.kafka.connect.json.JsonConverter
configure
The configure method will be extended.
If the schema.content
option is not set, the current behaviour will remain as-is.
If the schema.content
option is set, the schema file will be read and an instance variable will store the Connect schema to use.
byte[] schemadata = config.getSchemaContents(); JsonNode schemaNode = jsonDeserializer.deserialize("", schemadata); schema = asConnectSchema(schemaNode);
If the file is not found, not readable, or the contents cannot be parsed as a Connect schema, this will throw an exception to alert the user to the configuration problem.
Compatibility, Deprecation, and Migration Plan
There is no compatibility impact from this proposal.
The new config option is optional, and if not provided, the existing behaviour remains unmodified.
Test Plan
Unit tests will cover successful cases, including a variety of schema types.
Unit tests will cover error cases, such as missing schema files, or schemas that do not match the message format.
Rejected Alternatives
We are not proposing to automatically infer or generate schemas from message contents, as discussed in KIP-301
We are not proposing support for multiple different schemas, (such as to allow for different schemas for different messages). Where this level of complexity is required, it would be better to use a Converter that integrates with a schema registry, such as io.confluent.connect.json.JsonSchemaConverter
or io.apicurio.registry.utils.converter.ExtJsonConverter
We are not proposing support for JSON Schema itself, as we prefer to maintain compatibility with the existing schemas used by JsonConverter.
We are not proposing to implement changes in fromConnectData, such as to write the schema found in the message to a file.
We are not proposing to create a custom JsonConverter, since we would have to duplicate a lot of code from this existing Json Converter (private methods etc.) for the new convertor to work.