...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A condition on ConnectRecords. * Implementations of this interface can be used for filtering records and conditionally applying Transformations. * Implementations must be public and have a public constructor with no parameters. */ public Predicate<R extends ConnectRecord<R>> implementsextends Configurable, Closeable { /** * Configuration specification for this predicate. */ ConfigDef config(); /** * Returns whether the given record satisfies this predicate. */ boolean test(R record); @Override void close(); } |
...
Consider the following example of a transformation chain with a single conditionally applied applied ExtractField$Key
SMT:
Code Block |
---|
transforms: t2 transforms.t2.?predicate: !has-my-prefix transforms.t2.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatchExtractField$Key transforms.t2.?negatefield: truec1 transforms.t2.?patternpredicates: has-my-prefix-.* transforms.t2?predicates.has-my-prefix.type: org.apache.kafka.connect.transformspredicates.ExtractField$KeyTopicNameMatch transforms.t2.field: c1?predicates.has-my-prefix.pattern: my-prefix-.* |
The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch
. The ExtractField$Key
SMT will be applied only to records where the topic name does not (the negate
parameter) start with my-prefix-
(the pattern
parameter).
...
Code Block |
---|
transforms: filter transforms.filter.type: org.apache.kafka.connect.transforms.Filter transforms.filter.?predicate: foo-or-bar ?predicates: foo-or-bar ?predicates.foo-or-bar.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatch transforms.filter.??predicates.foo-or-bar.pattern: foo|bar transforms.filter.?negate: false |
The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch
and it takes a single configuration parameter, pattern
. Records having a topic name "foo" or "bar" match the predicate, so the filter
SMT will be evaluated, will return null and therefore those records are filtered out.
...