Status
Current state: Under discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
-
KAFKA-9673Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Single Message Transformations (SMTs) in Kafka Connect provide a convenient, code-free way to modify records from source connectors before they get sent to a Kafka topic. In some more complex use cases it can be desired to apply SMTs dependent on some aspect of the record being processed.
For example, with Debezium there are topics which represent a schema change and topics which represent a data change, and users might want to apply transformations selectively, based on the topic type. SMTs cannot currently do this, since they're applied to all records produced by a source connector, irrespective of their intended topic. This problem would be solved if it was possible to apply an SMT according to the name of the topic (See KAFKA-7052 for further details).
This KIP proposes a way to only apply a particular transformation if the resource matches some condition. The condition is defined by a new interface and the implementations for common conditions will be provided. Connector authors and users will also be able to provide their own condition implementations for special cases, but this is not expected to be a common need.
A new Filter
SMT will also be implemented. This will filter records which do, or do not, satisfy a given condition. This is advantageous for users who do not want to incur the storage costs of consuming everything from a source connector, for example. Instead they can chose to ingest only the records of interest. Likewise for sink connectors it will enable exporting a subset of data without needing to resort to a Kafka Streams application to filter it first.
Public Interfaces
A new Predicate
interface will be added in the new org.apache.kafka.connect.transforms.predicates
package.
/** * A condition on ConnectRecords. * Implementations of this interface can be used for filtering records and conditionally applying Transformations. * Implementations must be public and have a public constructor with no parameters. */ public Predicate<R extends ConnectRecord<R>> implements Configurable, Closeable { /** * Configuration specification for this predicate. */ ConfigDef config(); /** * Validate the predicate configuration values against configuration definitions. * @param predicateConfigs the provided configuration values * @return List of Config, each Config contains the updated configuration information given * the current configuration values. */ default Config validate(Map<String, String> predicateConfigs) { ConfigDef configDef = config(); if (null == configDef) { throw new ConnectException( String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName()) ); } List<ConfigValue> configValues = configDef.validate(predicateConfigs); return new Config(configValues); } /** * Returns whether the given record satisfies this predicate. */ boolean test(R record); @Override void close(); }
All transformations will gain new implict configuration parameters which will be consumed by the connect runtime and not passed to the Transformation.configure()
method. These parameters will start with a question mark (?) so it's extremely unlikely they will collide with configuration parameters used by existing connectors.
A new Filter
SMT will be added to enable record filtering.
Proposed Changes
Predicates
The Predicate
interface is described above.
In order to negate the result of a predicate all predicates will implicitly support a boolean negate
configuration parameter, which defaults to false.
In addition to the Predicate
interface described above, this KIP will provide the following implementations:
TopicNameMatches
test()
will return true when the ConnectRecord.topic()
(i.e. it's name) matches a given Java regular expression pattern.
Config name | Type | Default | Required |
---|---|---|---|
pattern | String | null | yes |
HasHeaderKey
test()
will return true when the ConnectRecord.headers()
has 1 or more headers with a given key.
Config name | Type | Default | Required |
---|---|---|---|
name | String | null | yes |
RecordIsTombstone
test()
will return true when the ConnectRecord
represents a tombstone (i.e. has a null value). This predicate has no configuration parameters.
Conditionally applying an SMT
When a Transformation is configured with the new ?type
parameter its application will happen conditionally. The value of the ?type
parameter will be the name of a concrete class implementing the Predicate
interface. Configuration for the predicate will come from all other configuration parameters which start with '?'. These will be supplied to the configure(Map)
method, but with the initial '?' in the parameter name removed.
If during processing the predicate throws an exception the condition will be treated as not having been matched, the guarded SMT will not be applied and the exception logged. Note this behaviour is independent of the negate
parameter.
Consider the following example of a transformation chain with a single conditionally applied SMT:
transforms: t2 transforms.t2.?type: org.apache.kafka.connect.transforms.predicates.TopicNameMatch transforms.t2.?negate: true transforms.t2.?pattern: my-prefix-.* transforms.t2.type: org.apache.kafka.connect.transforms.ExtractField$Key transforms.t2.field: c1
The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch
. The ExtractField$Key
SMT will be applied only to records where the topic name does not (the negate
parameter) start with my-prefix-
(the pattern
parameter).
The Filter SMT
A new Filter
transformation will be added in the existing org.apache.kafka.connect.transforms
package. This will return null from apply(ConnectRecord)
when a configured Prediate.test(ConnectRecord)
returns true (or false when the negate
config is true).
Consider the following example of a transformation chain with a single Filter
SMT:
transforms: filter transforms.filter.type: Filter transforms.filter.condition: org.apache.kafka.connect.transforms.predicates.TopicNameMatch transforms.filter.condition.pattern: foo|bar transforms.filter.negate: false
The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch
and it takes a single configuration parameter, pattern
. Records having a topic name "foo" or "bar" match the predicate, so apply(ConnectRecord)
will return null and therefore those records are filtered out.
If during processing the predicate throws an exception the condition will be treated as not having been matched, the record will be filtered and the exception logged. Note this behaviour is independent of the negate
parameter.
Changes to Transformation
Currently Transformation
has a public ConfigDef config()
. This is not flexible enough for the Filter
SMT, which needs to support arbitrary configuration parameters in order to configure the predicate. To support this we will add a validate()
method to Transformation
.
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable { // ... existing methods ... /** * Validate the transformation configuration values against configuration definitions. * @param transformationConfigs the provided configuration values * @return List of Config, each Config contains the updated configuration information given * the current configuration values. */ default Config validate(Map<String, String> transformationConfigs) { ConfigDef configDef = config(); if (null == configDef) { throw new ConnectException( String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName()) ); } List<ConfigValue> configValues = configDef.validate(transformationConfigs); return new Config(configValues); } }
Compatibility, Deprecation, and Migration Plan
Users will need to perform a rolling upgrade of a distributed connect cluster before they can start using the new Filter
SMT or conditional SMTs.
The ?-prefixing of configuration parameters does not remove the possibility of a collision with existing connectors, but it's unlikely that any existing connectors are using configuration parameters which start with '?'.
Rejected Alternatives
- Alternative ways to configure conditional SMTs which removed the possibility of collision with existing connectors were considered. They were more verbose and difficult to understand.