Status

Current state:  Accepted

Discussion threadhere

Vote thread: here

JIRA: 16913  KAFKA-16913 - Getting issue details... STATUS


Motivation

When using a Connector that requires a schema (such as JDBC connectors) with JSON messages, the current JSONConverter requires a copy of the schema to be included in every message.

This increases the size of messages significantly.

It is also inconvenient, as few applications will produce messages that look like this.

record with schema
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "id",
        "type": "string",
        "optional": false
      },
      {
        "field": "name",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": "emp_001",
    "name": "Kevin"
  }
}


The ideal alternative (using a different JSON converter, paired with a schema registry) is difficult to adopt in some situations, such as where a schema registry is not available, or the Kafka message producers cannot be modified to include a reference or ID for the appropriate schema.

Public Interfaces

This proposal introduces a new optional config option for org.apache.kafka.connect.json.JsonConverter

new config
schema.content

If a value for this is not provided, the current behaviour (reading schemas from the message payload) is followed.

If a value is provided, it will be used as the schema

For example, you could use the config:

schemas.enable=true
schema.content={"type": "struct", "fields": [{ "field": "id", "type": "string", "optional": false },{ "field": "name", "type": "string", "optional": false }]}

and the Kafka message payload contains:

{
    "id": "emp_001",
    "name": "Kevin"
}

Existing parsing and interpretation of schemas will be identical to today.

The only difference is the ability to provide schemas externally to individual messages.

This option will only be used for toConnectData calls. It will be ignored for fromConnectData , this means that this convertor will be explicitly used for sink connector not source connector

Users would be able use ConfigProviders to provide the contents of a file as a schema.

For Example, 

# Config for Kafka Connect
config.providers=directory
config.providers.directory.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider

# Config for Kafka Convertor
schema.content=${directory:/schema:schema.json}

KIP-993: Allow restricting files accessed by File and Directory ConfigProviders will allow users to restrict what files to be used.


While using Connect REST API to create Connector with Schema Configuration, we will need include escape sequences in the schema.content field since REST API payload needs to be a JSON.

For Example

{
  "name": "api-file-source-json",
  "config": {
    "value.converter.schemas.enable": true,
    "value.converter.schema.content": "{\"type\": \"struct\", \"fields\": [{ \"field\": \"id\", \"type\": \"string\", \"optional\": false },{ \"field\": \"name\", \"type\": \"string\", \"optional\": false }]}"
  }
}


Proposed Changes

Functional changes will be made to org.apache.kafka.connect.json.JsonConverter  configure

The configure method will be extended.

If the schema.content  option is not set, the current behaviour will remain as-is.

If the schema.content  option is set, the schema file will be read and an instance variable will store the Connect schema to use.

byte[] schemadata = config.getSchemaContents();
JsonNode schemaNode = jsonDeserializer.deserialize("", schemadata);
schema = asConnectSchema(schemaNode);

If the file is not found, not readable, or the contents cannot be parsed as a Connect schema, this will throw an exception to alert the user to the configuration problem.

Compatibility, Deprecation, and Migration Plan

There is no compatibility impact from this proposal.

The new config option is optional, and if not provided, the existing behaviour remains unmodified.

Test Plan

Unit tests will cover successful cases, including a variety of schema types.

Unit tests will cover error cases, such as missing schema files, or schemas that do not match the message format.

Rejected Alternatives

We are not proposing to automatically infer or generate schemas from message contents, as discussed in KIP-301

We are not proposing support for multiple different schemas, (such as to allow for different schemas for different messages). Where this level of complexity is required, it would be better to use a Converter that integrates with a schema registry, such as io.confluent.connect.json.JsonSchemaConverter  or io.apicurio.registry.utils.converter.ExtJsonConverter 

We are not proposing support for JSON Schema itself, as we prefer to maintain compatibility with the existing schemas used by JsonConverter.

We are not proposing to implement changes in fromConnectData, such as to write the schema found in the message to a file.

We are not proposing to create a custom JsonConverter, since we would have to duplicate a lot of code from this existing Json Converter (private methods etc.) for the new convertor to work.


  • No labels