Status
Current state: Under discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
-
KAFKA-9673Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Single Message Transformations (SMTs) in Kafka Connect provide a convenient, code-free way to modify records from source connectors before they get sent to a Kafka topic. In some more complex use cases it can be desired to apply a list of SMTs conditionally, dependent on the topic the records are destined for. For example, with Debezium there are topics which represent a schema change and topics which represent a data change, and users might want to apply transformation selectively, based on the topic type. SMTs cannot currently do this, since they're applied to all records produced by a source connector, irrespective of their intended topic.
This KIP proposes to add a new SMT to Apache Kafka which can conditionally apply a list of other transformations based on a few simple predicates on records.
Public Interfaces
A new Conditional
SMT will be added
The existing Transformation
interface will have a default
method added in order to provide the more flexible SMT configuration which Conditional
will require.
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable { // ... /** Configuration specification for this transformation. **/ ConfigDef config(); /* already exists */ /** Configuration specification for this transformation. **/ default ConfigDef config(Map<String, String> props) { /* new method */ return config(); } // ... }
Proposed Changes
Conditional SMT
The Conditional
SMT will apply an arbitrary transformation (or transformation chain) according to some predicate on the record.
- The Conditional SMT has its own list of transformsto apply. The transformation chain is configured in exactly the same way as transforms themselves are configured, just using the
Conditional
's config prefix. - The condition config defines the predicate for when the transforms are applied to a record using a <condition-type>:<parameters> syntax
Example configuration
The following shows the Conditional
SMT applying the ExtractField$Key
SMT on topics whose name matches the Java regular expression my-topic.*
transforms: conditionalExtract transforms.conditionalExtract.type: Conditional transforms.conditionalExtract.transforms: extractInt transforms.conditionalExtract.transforms.extractInt.type: org.apache.kafka.connect.transforms.ExtractField$Key transforms.conditionalExtract.transforms.extractInt.field: c1 transforms.conditionalExtract.condition: topic-matches:my-topic.*
Conditions
We would initially support three condition types:
topic-matches:<pattern> The transformation would be applied if the record's topic name matched the given regular expression pattern. For example, the following would apply the transformation on records being sent to any topic with a name beginning with "my-prefix-":
transforms.conditionalExtract.condition: topic-matches:my-prefix-.*
has-header:<header-name> The transformation would be applied if the record had at least one header with the given name. For example, the following will apply the transformation on records with at least one header with the name "my-header":
transforms.conditionalExtract.condition: has-header:my-header
not:<condition-name> This would negate the result of another named condition using the condition config prefix. For example, the following will apply the transformation on records which lack any header with the name my-header:
transforms.conditionalExtract.condition: not:hasMyHeader transforms.conditionalExtract.condition.hasMyHeader: has-header:my-header
The condition design is flexible to allow other conditions to be added in the future if necessary. For example it could support conjunction and disjunction predicates via and:<condition-name-1>,<condition-name-2>
and or:<condition-name-1>,<condition-name-2>
in a similar was to not:
.
Changes to Transformation
Currently Transformation
has a public ConfigDef config()
method which means that the ConfigDef
is static and cannot depend on the configs which are present in the provided configs. In order to support the transformations
config listing the SMT aliases whose configs are then prefixed by their alias it is necessary to make this interface more flexible. This will be done by adding a default ConfigDef config(Map<String, String> props)
method to Transformation
. The default implementation will just call the nullary version of the method.
For the Conditional
SMT the ConfigDef built will depend on the configs present, though the following would always be required:
Name | Type | Validation |
---|---|---|
transformations | LIST | Non null list of unique aliases. |
condition | STRING | A String matching the regular expression ([a-zA-Z0-9-]+)(:.*)?, where the first group is the condition name, as described above, and the 2nd group is optional parameters for that predicate. |
The returned ConfigDef
would include the configs for the given transformations using the existing ConfigDef.embed()
mechanism.
Compatibility, Deprecation, and Migration Plan
The change to Transformation
uses a default method so is backwards compatible with existing implementations.
Rejected Alternatives
- Adding a new config to all connectors to allow conditionality based on a topic name regex. The new config name could potentially collide with a config used by some existing SMT. It's less flexible (or multiple configs are required to add flexibility). It can be a little awkward to configure conditionality for a whole chain of SMTs.