Status
Current state: Under discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
-
KAFKA-9673Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Single Message Transformations (SMTs) in Kafka Connect provide a convenient, code-free way to modify records from source connectors before they get sent to a Kafka topic. In some more complex use cases it can be desired to apply a list of SMTs conditionally, dependent on the topic the records are destined for. For example, with Debezium there are topics which represent a schema change and topics which represent a data change, and users might want to apply transformations selectively, based on the topic type. SMTs cannot currently do this, since they're applied to all records produced by a source connector, irrespective of their intended topic.
This KIP proposes to add a new SMT to Apache Kafka which can conditionally apply a list of other transformations based on a few simple predicates on records.
Public Interfaces
A new If
SMT will be added.
The existing Transformation
interface will have a default
method added in order to provide the more flexible SMT configuration which this SMT will require.
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable { // ... /** Configuration specification for this transformation. **/ ConfigDef config(); /* already exists */ /** Configuration specification for this transformation. **/ default ConfigDef config(Map<String, String> props) { /* new method */ return config(); } // ... }
Proposed Changes
Add the If
SMT
The If
SMT will apply an arbitrary transformation (or transformation chain) according to some predicate on the record.
- The
if.type
config parameter names the concrete class implementingRecordPredicate
(see below). - Other config parameters starting with the
if
prefix defines config parameters for theRecordPredicate
. - The
then
config parameter is a list of aliases for the transforms to be applied when the predicate is satisfied. - For each of the named aliases configs can be supplied starting with the
then.${alias
} prefix. This is entirely analogous to to the way as transforms themselves are configured, just using the SMT's config prefix.
Example configuration
The following shows the configuration for applying the ExtractField$Key
SMT on topics whose name matches the Java regular expression my-topic.*
transforms: conditionalExtract transforms.conditionalExtract.type: If transforms.conditionalExtract.if.type: TopicMatches transforms.conditionalExtract.if.regex: my-prefix-.* transforms.conditionalExtract.then: extractInt transforms.conditionalExtract.then.extractInt.type: org.apache.kafka.connect.transforms.ExtractField$Key transforms.conditionalExtract.then.extractInt.field: c1
Conditions
The value of the if.type
config of the SMT would be the name of a class implementing the new RecordPredicate
interface:
interface RecordPredicate<R extends ConnectRecord<R>> extends Configurable, Closeable { /** * Test the given record against this predicate. */ boolean test(ConnectRecord<R> record); }
As part of this KIP, two implementations of RecordPredicate
would be provided:
TopicMatches
condition
The TopicMatches
predicate would match a record if the record's name matched the regular expression configured with the regex
config parameter. An example is shown above.
Config name | Type | Validation |
---|---|---|
regex | String | Valid according to java.util.regex.Pattern.compile() |
Not
condition
This predicate would negate the result of another predicate, named via an alias. For example, the following will apply the transformation on records which did not start with "my-prefix-":
transforms: conditionalExtract transforms.conditionalExtract.type: If transforms.conditionalExtract.if.type: Not transforms.conditionalExtract.if.operand: hasPrefix transforms.conditionalExtract.if.operand.hasPrefix.type: TopicMatches transforms.conditionalExtract.if.operand.hasPrefix.regex: my-prefix-.* transforms.conditionalExtract.then: extractInt transforms.conditionalExtract.then.extractInt.type: org.apache.kafka.connect.transforms.ExtractField$Key transforms.conditionalExtract.then.extractInt.field: c1
Config name | Type | Validation |
---|---|---|
operand | String |
Other config parameters beginning with the if.operand.${alias
} alias would be used to configured the negated predicate.
Changes to Transformation
Currently Transformation
has a public ConfigDef config()
method which means that the ConfigDef
cannot depend on the configs which are present in the provided configs. In order to support the then config listing the SMT aliases whose configs are then prefixed by their alias it is necessary to make this interface more flexible. This will be done by adding a default ConfigDef config(Map<String, String> props)
method to Transformation
. The default implementation will just call the nullary version of the method.
The returned ConfigDef
would include the configs for the given transformations using the existing ConfigDef.embed()
mechanism.
Compatibility, Deprecation, and Migration Plan
The change to Transformation
uses a default method so is backwards compatible with existing implementations.
Rejected Alternatives
- Adding a new config to all connectors to allow conditionality based on a topic name regex. The new config name could potentially collide with a config used by some existing SMT. It's less flexible (or multiple configs are required to add flexibility). It can be a little awkward to configure conditionality for a whole chain of SMTs.
- Using an expression language/DSL for specifying the condition. There's no reason to treat the condition differently from the conditionally applied transformations. It's likely if such syntax sugar were desirable for the condition it would also be desirable for configuring the guarded transformations too. If a DSL were to be implemented, it would be better to allow it to configure the whole transformation chain, rather than just conditions.