Status

Discussion thread

https://lists.apache.org/thread/wtkl4yn847tdd0wrqm5xgv9wc0cb0kr8

Vote thread


JIRA


Release

1.20 or beyond

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, if users have Avro schemas in an Apicurio Registry (an open source Apache 2 licensed schema registry), then the natural way to work with those Avro events is to use the schemas in the Apicurio Repository. This FLIP proposes a new Kafka oriented Avro Apicurio format, to allow Flink users to work with Avro schemas stored in the Apicurio Registry.

 

Messages in the Apicurio format have a schema ID (usually the global ID in a Kafka header), which identifies the schema in the Apicurio Registry. The new format will:

  • For inbound messages, use the ID to find the Avro schema 
  • For outbound messages, register a schema in Apicurio and include the ID in the message 

In this way Apache Flink can be used to consume and produce Apicurio events.

Public Interfaces

The changes associated with this Flip are in the Flink Kafka connector:

  • Create a new Avro Apicurio format that is used with the Kafka connector
  • Change the Kafka deserialization logic to discover a new consumer record based implementation, so that the Kafka headers can be passed to the format. The formats implementation uses the header information to look up of the associate schema in the Apicurio registry
  • Change the Kafka serialization logic after the serialization, to discover a new record based implementation; the Avro Apicurio format implementation then provides headers to add to the Producer record.

The serialization Factory is 

/**
* Record based Serialization Factory. Implementations can access the ConsumerRecord including Kafka headers.  
*/
public interface RecordBasedSerializationFactory {
   RecordBasedSerialization create();
}


The record deserialization is 
public interface RecordBasedSerialization  {

byte[] getPayload(byte[] customSerialisation) throws IOException;

Headers getKeyHeaders(byte[] customSerialisation) throws IOException;

Headers getValueHeaders(byte[] customSerialisation) throws IOException;

public boolean canProcess(byte[] customSerialisation);

}


The deserialization Factory is

public interface RecordBasedDeserializationFactory {
   RecordBasedDeserialization create();
}

The deserialization is 
public interface RecordBasedDeserialization {
byte[] serializeConsumerRecordForKey(ConsumerRecord<byte[], byte[]> record) throws IOException;

byte[] serializeConsumerRecordForValue(ConsumerRecord<byte[], byte[]> record) throws IOException;

boolean canProcess(ConsumerRecord<byte[], byte[]> record) throws IOException;

}


Here is a class diagram for deserialization

In the spirit of core Flink’s FactoryUtils.loadAndInvokeFactory service loaders are discovered then instantiated, the canProcess method is called, to check if the format can process the message, if it can then we proceed with the factory instantiation. 

These discovered implementation can find the Apicurio format class, which will add extra prefix information to the serialization / deserialization to facilitate communication between the Kafka connector code and the Avro Apicurio format jar. 

The extra information prefix information added to the serialization

The extra information prefix information added to the deserialization 


The above prefix information is added to the serialization and deserialization in the Avro Apicurio code. So this could be done differently for another format implementation. 

 

Proposed Changes

In this section, we describe in detail all the configurations that need updating.

Apicurio avro Format Options 


Apicurio avro Format Options 

Option

Required

Forwarded

Default

Type

Sink only

Description

Format

required

no

(none)

String


Specify what format to use, use 'avro-apicurio'.

properties

optional

yes

(none)

Map


This is the apicurio-registry client configuration properties. Any Flink properties take precedence,

apicurio.registry.request.ssl.truststore.location

optional

yes

(none)

String


Location / File of SSL truststore

apicurio.registry.request.ssl.truststore.type

optional

yes

(none)

String


Type of SSL truststore

apicurio.registry.request.ssl.truststore.password

optional

yes

(none)

String


Password of SSL truststore

apicurio.registry.request.ssl.keystore.location

optional

yes

(none)

String


Location / File of SSL keystore

apicurio.registry.request.ssl.keystore.type

optional

yes

(none)

String


Type of SSL keystore

apicurio.registry.request.ssl.keystore.password

optional

yes

(none)

String


Password of SSL keystore

apicurio-auth-basic-userid

optional

yes

(none)

String


Basic auth userId

apicurio-auth-basic-password

optional

yes

(none)

String


Basic auth password

apicurio-auth-oidc-url

optional

yes

(none)l

String


The auth URL to use for OIDC

apicurio-auth-oidc-clientID

optional

yes

(none)

String


Client ID to use for OIDC

apicurio.auth.oidc.clientSecret

optional

yes

(none)

String


Client secret to use for OIDC

apicurio-auth-oidc-scope

optional

yes

(none)

String


Client scope to use for OIDC

apicurio-auth-oidc-client-token-expiration-reduction

optional

yes

1

String


The token expiration to use for OIDC. This is a Duration in seconds. This is the amount of time before the token expires that Apicurio requests a new token.  

apicurio-avro.use-headers

optional

yes

true

boolean


Configures to read/write the artifact identifier to Kafka message headers instead of in the message payload.

avro-apicurio.artifactId

optional

yes

(none)

String

Y

Specifies the artifactId of the artifact to be registered. This is unique and so the same Apicurio schema is always associated with this format.  

avro-apicurio.artifactName

optional

yes

“generated-schema”

 

String

Y

This specifies the name of the artifact to be registered. This is not unique.

 

avro-apicurio.artifactDescription

optional

yes

“Schema registered by Apache Flink.”

 

String

Y

This specifies the description of the artifact to be registered.

avro-apicurio.artifactVersion

optional

yes

“1”

String

Y

This specifies the version of the artifact to be registered.

avro-apicurio.schema

optional

yes

(none)

String


The schema registered or to be registered in the Apicurio Registry. If no schema is provided Flink converts the table schema to avro schema. The schema provided must match the table schema.

avro-apicurio.groupId

optional

yes

“default”

String

Y

The id of the Apicurio group to use when creating a schema.  

avro-apicurio.register-schema

optional

yes

false

Boolean

Y

When true the schema is registered, otherwise the schema is not registered. In this case the schema must be already defined in Apicurio. 


Compatibility, Deprecation, and Migration Plan

The Apicurio format jar file should be used with the corresponding Kafka connector jar. A back level Kafka connector should not be used with the new Apicurio Avro format. 

Test Plan

Existing UT/IT can ensure compatibility with old options. New tests will cover the new options.

UT to test serialization and deserialization work as expected.

 

Rejected Alternatives

  • Running Apicurio in Confluent mode,  so that the Confluent Avro format could be used. This would be great for some users , but this FLIP is provides function to facilitate working naturally with Apicurio schemas where the global/content ID are in the Kafka headers.
  • Initially we passed maps for header information from the kafka connector to Flink for deserialization. Similar for serialize. This was not great, because maps are not ideal and it was a big change as it needed core Flink interface changes
  • We then moved the Avro Apicurio format to the Kafka connector and looked to discover a new record using the existing de/serialization interfaces. So we could pass down the record (containing the headers) rather than the payload. This did not work, because there is a dependence on the Avro connector that is not  aware of the new interface.
  • We considered using Thread local storage to pass the headers, we did not like this as there was a risk of memory leaks if we did not manage the thread well, also the contract is hidden.







  • No labels