...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* A condition on ConnectRecords.
* Implementations of this interface can be used for filtering records and conditionally applying Transformations.
* Implementations must be public and have a public constructor with no parameters.
*/
public Predicate<R extends ConnectRecord<R>> implements Configurable, Closeable {
/**
* Configuration specification for this predicate.
*/
ConfigDef config();
/**
* Validate the predicate configuration values against configuration definitions.
* @param predicateConfigs the provided configuration values
* @return List of Config, each Config contains the updated configuration information given
* the current configuration values.
*/
default Config validate(Map<String, String> predicateConfigs) {
ConfigDef configDef = config();
if (null == configDef) {
throw new ConnectException(
String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
);
}
List<ConfigValue> configValues = configDef.validate(predicateConfigs);
return new Config(configValues);
}
/**
* Returns whether the given record satisfies this predicate.
*/
boolean test(R record);
@Override
void close();
} |
...
A new Filter
transformation will be added in the existing org.apache.kafka.connect.transforms
package. This will return null from apply(ConnectRecord)
when a configured Prediate.test(ConnectRecord)
returns true (or false when the negate
config is true). This is not of much use on its own, but is intended to applied conditionally as described above. This will allow messages to be filtered according to the predicate.
Consider the following example of a transformation chain with a single Filter
SMT:
Code Block |
---|
transforms: filter transforms.filter.type: Filter transforms.filter.condition?type: org.apache.kafka.connect.transforms.predicates.TopicNameMatch transforms.filter.condition.?pattern: foo|bar transforms.filter.?negate: false |
The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch
and it takes a single configuration parameter, pattern
. Records having a topic name "foo" or "bar" match the predicate, so apply(ConnectRecord)
so the filter
SMT will be evaluated, will return null and therefore those records are filtered out.
If during processing the predicate throws an exception this will be handled in the same way as errors in transformations.
Changes to Transformation
Currently Transformation
has a public ConfigDef config()
. This is not flexible enough for the Filter
SMT, which needs to support arbitrary configuration parameters in order to configure the predicate. To support this we will add a validate()
method to Transformation
.
...
Compatibility, Deprecation, and Migration Plan
...