Status
Discussion thread | https://lists.apache.org/thread/wtkl4yn847tdd0wrqm5xgv9wc0cb0kr8 |
Vote thread | |
JIRA | |
Release | 1.20 or beyond |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
- Motivation
- Public Interfaces
- Proposed Changes
- Ensure all the ConfigOptions are properly annotated
- Ensure all user-facing configurations are included in the documentation generation process
- Make the existing ConfigOptions use the proper type
- Mark all internally used ConfigOptions with the @Internal annotation
- Compatibility, Deprecation, and Migration Plan
- Test Plan
Motivation
Currently, if users have Avro schemas in an Apicurio Registry (an open source Apache 2 licensed schema registry), then the natural way to work with those Avro events is to use the schemas in the Apicurio Repository. This FLIP proposes a new Kafka oriented Avro Apicurio format, to allow Flink users to work with Avro schemas stored in the Apicurio Registry.
Messages in the Apicurio format have a schema ID (usually the global ID in a Kafka header), which identifies the schema in the Apicurio Registry. The new format will:
- For inbound messages, use the ID to find the Avro schema
- For outbound messages, register a schema in Apicurio and include the ID in the message
In this way Apache Flink can be used to consume and produce Apicurio events.
Public Interfaces
The changes associated with this Flip are in the Flink Kafka connector:
- Create a new Avro Apicurio format that is used with the Kafka connector
- Change the Kafka deserialization logic to discover a new consumer record based implementation, so that the Kafka headers can be passed to the format. The formats implementation uses the header information to look up of the associate schema in the Apicurio registry
- Change the Kafka serialization logic after the serialization, to discover a new record based implementation; the Avro Apicurio format implementation then provides headers to add to the Producer record.
The serialization Factory is
/** * Record based Serialization Factory. Implementations can access the ConsumerRecord including Kafka headers. */ public interface RecordBasedSerializationFactory { RecordBasedSerialization create(); } The record deserialization is public interface RecordBasedSerialization { byte[] getPayload(byte[] customSerialisation) throws IOException; Headers getKeyHeaders(byte[] customSerialisation) throws IOException; Headers getValueHeaders(byte[] customSerialisation) throws IOException; public boolean canProcess(byte[] customSerialisation); }
The deserialization Factory is
public interface RecordBasedDeserializationFactory { RecordBasedDeserialization create(); } The deserialization is public interface RecordBasedDeserialization { byte[] serializeConsumerRecordForKey(ConsumerRecord<byte[], byte[]> record) throws IOException; byte[] serializeConsumerRecordForValue(ConsumerRecord<byte[], byte[]> record) throws IOException; boolean canProcess(ConsumerRecord<byte[], byte[]> record) throws IOException; }
Here is a class diagram for deserialization
In the spirit of core Flink’s FactoryUtils.loadAndInvokeFactory service loaders are discovered then instantiated, the canProcess method is called, to check if the format can process the message, if it can then we proceed with the factory instantiation.
These discovered implementation can find the Apicurio format class, which will add extra prefix information to the serialization / deserialization to facilitate communication between the Kafka connector code and the Avro Apicurio format jar.
The extra information prefix information added to the serialization
The extra information prefix information added to the deserialization
The above prefix information is added to the serialization and deserialization in the Avro Apicurio code. So this could be done differently for another format implementation.
Proposed Changes
In this section, we describe in detail all the configurations that need updating.
Apicurio avro Format Options
Apicurio avro Format Options
Option | Required | Forwarded | Default | Type | Sink only | Description |
Format | required | no | (none) | String | Specify what format to use, use 'avro-apicurio'. | |
properties | optional | yes | (none) | Map | This is the apicurio-registry client configuration properties. Any Flink properties take precedence, | |
apicurio.registry.request.ssl.truststore.location | optional | yes | (none) | String | Location / File of SSL truststore | |
apicurio.registry.request.ssl.truststore.type | optional | yes | (none) | String | Type of SSL truststore | |
apicurio.registry.request.ssl.truststore.password | optional | yes | (none) | String | Password of SSL truststore | |
apicurio.registry.request.ssl.keystore.location | optional | yes | (none) | String | Location / File of SSL keystore | |
apicurio.registry.request.ssl.keystore.type | optional | yes | (none) | String | Type of SSL keystore | |
apicurio.registry.request.ssl.keystore.password | optional | yes | (none) | String | Password of SSL keystore | |
apicurio-auth-basic-userid | optional | yes | (none) | String | Basic auth userId | |
apicurio-auth-basic-password | optional | yes | (none) | String | Basic auth password | |
apicurio-auth-oidc-url | optional | yes | (none)l | String | The auth URL to use for OIDC | |
apicurio-auth-oidc-clientID | optional | yes | (none) | String | Client ID to use for OIDC | |
apicurio.auth.oidc.clientSecret | optional | yes | (none) | String | Client secret to use for OIDC | |
apicurio-auth-oidc-scope | optional | yes | (none) | String | Client scope to use for OIDC | |
apicurio-auth-oidc-client-token-expiration-reduction | optional | yes | 1 | String | The token expiration to use for OIDC. This is a Duration in seconds. This is the amount of time before the token expires that Apicurio requests a new token. | |
apicurio-avro.use-headers | optional | yes | true | boolean | Configures to read/write the artifact identifier to Kafka message headers instead of in the message payload. | |
avro-apicurio.artifactId | optional | yes | (none) | String | Y | Specifies the artifactId of the artifact to be registered. This is unique and so the same Apicurio schema is always associated with this format. |
avro-apicurio.artifactName | optional | yes | “generated-schema”
| String | Y | This specifies the name of the artifact to be registered. This is not unique.
|
avro-apicurio.artifactDescription | optional | yes | “Schema registered by Apache Flink.”
| String | Y | This specifies the description of the artifact to be registered. |
avro-apicurio.artifactVersion | optional | yes | “1” | String | Y | This specifies the version of the artifact to be registered. |
avro-apicurio.schema | optional | yes | (none) | String | The schema registered or to be registered in the Apicurio Registry. If no schema is provided Flink converts the table schema to avro schema. The schema provided must match the table schema. | |
avro-apicurio.groupId | optional | yes | “default” | String | Y | The id of the Apicurio group to use when creating a schema. |
avro-apicurio.register-schema | optional | yes | false | Boolean | Y | When true the schema is registered, otherwise the schema is not registered. In this case the schema must be already defined in Apicurio. |
Compatibility, Deprecation, and Migration Plan
The Apicurio format jar file should be used with the corresponding Kafka connector jar. A back level Kafka connector should not be used with the new Apicurio Avro format.
Test Plan
Existing UT/IT can ensure compatibility with old options. New tests will cover the new options.
UT to test serialization and deserialization work as expected.
Rejected Alternatives
- Running Apicurio in Confluent mode, so that the Confluent Avro format could be used. This would be great for some users , but this FLIP is provides function to facilitate working naturally with Apicurio schemas where the global/content ID are in the Kafka headers.
- Initially we passed maps for header information from the kafka connector to Flink for deserialization. Similar for serialize. This was not great, because maps are not ideal and it was a big change as it needed core Flink interface changes
- We then moved the Avro Apicurio format to the Kafka connector and looked to discover a new record using the existing de/serialization interfaces. So we could pass down the record (containing the headers) rather than the payload. This did not work, because there is a dependence on the Avro connector that is not aware of the new interface.
- We considered using Thread local storage to pass the headers, we did not like this as there was a risk of memory leaks if we did not manage the thread well, also the contract is hidden.