Status
Current state: WIP
Discussion thread: here
JIRA: KAFKA-3209
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This proposal is for adding a record transformation API to Kafka Connect as well as certain bundled transformations. At the same time, we should not extend Connect's area of focus too much beyond the Extract and Load aspects of ETL. We will only support 1:{0,1} transformations – i.e. map and filter operations.
The objective is to:
- Allow for lightweight updates to records.
- Some transformations must be performed before the data hits Kafka (source) or another system (sink) e.g. filtering certain types of events or sensitive information.
- It's also useful for very light, single-message modifications that are easier to perform inline with the data import/export. It may be inconvenient to add stream processing into the mix for simple data massaging or control over routing.
- Benefit the growing connector ecosystem since some common options that are widely applicable can now be implemented once and reused. For example,
- It is common for source connectors to allow configuring what format the topic name should follow based on some aspect of the source data, or in the case of sink connectors what 'bucket' (table, index etc.) a record should end up in based on the topic. This is configured in many different ways currently.
- Some sink connectors allow inserting record metadata like the Kafka topic/partition/offset into the record key or value, while others do not. This information can get lost in translation if the functionality is absent and makes a connector less useful.
- See the 'bundled transformations' section below for more examples.
Public Interfaces and Proposed Changes
Java API
// Existing base class for SourceRecord and SinkRecord, new self type parameter. public abstract class ConnectRecord<R extends ConnectRecord<R>> { // ... // New abstract method: /** Generate a new record of the same type as itself, with the specified parameter values. **/ public abstract R newRecord(String topic, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp); } public interface Transformation<R extends ConnectRecord<R>> { /** Initialize with the provided configuration properties. **/ void init(Map<String, String> config); /** Apply transformation to the {@code record} and return another record object (which may be {@code record} itself). Must be thread-safe. **/ R apply(R record); /** Signal that this transformation instance will no longer will be used. **/ void close(); /** Configuration specification for this transformation. **/ ConfigDef config(); }
Configuration
A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms
config which represents a list of aliases. An alias in transforms
implies that some additional keys are configurable:
- transforms.$alias.type
– fully qualified class name for the transformation
- transforms.$alias.*
– all other keys as defined in Transformation.config()
are embedded with this prefix
Example:
transforms=tsRouter,insertKafkaCoordinates transforms.tsRouter.type=org.apache.kafka.connect.transforms.TimestampRouter transforms.tsRouter.topic.format=${topic}-${timestamp} transforms.tsRouter.timestamp.format=yyyyMMdd transforms.insertKafkaCoordinates.type=org.apache.kafka.connect.transforms.InsertInValue transforms.insertKafkaCoordinates.topic=kafka_topic transforms.insertKafkaCoordinates.partition=kafka_partition transforms.insertKafkaCoordinates.offset=kafka_offset
Runtime changes
For source connectors, transformations are applied on the collection of SourceRecord
retrieved from SourceTask.poll()
.
For sink connectors, transformations are applied on the collection of SinkRecord
before being provided to SinkTask.put()
.
If the result of any Transformation.apply()
in a chain is null
, that record is discarded (not written to Kafka in the case of a source connector, or not provided to sink connector).
Bundled transformations
WIP. Subject to discussion – not a final list.
Name | Functionality | Rationale | Configuration |
---|---|---|---|
Mask{Key,Value} | Mask or replace the specified primitive fields, assuming there is a top-level Struct . | Obscure sensitive info like credit card numbers. |
|
InsertIn{Key,Value} | Insert specified fields with given name, assuming there is a top-level Struct . | Widely applicable to insert certain record metadata. |
|
TimestampRouter | Timestamp-based routing. | Useful for temporal data e.g. application log data being indexed to a search system with a sink connector can be routed to a daily index. |
|
RegexRouter | Regex-based routing. | There are too many inconsistent configs to route in different connectors. |
See http://docs.oracle.com/javase/7/docs/api/java/util/regex/Matcher.html#replaceFirst(java.lang.String) |
ValueToKey | Create or replace record key with data from record value. | Useful when a source connector does not populate the record key but only the value with a |
|
Flatten | Flatten nested | Useful for sink connectors that can only deal with flat Struct s. |
TODO: specify escaping |
Replace | Filter and rename fields. | Useful for lightweight data munging. |
|
NumericCasts | Casting of numeric field to some specified numeric type. | Useful in conjunction with source connectors that don't have enough information and utilize an unnecessarily wide data type. |
|
TimestampConverter | Convert datatype of a timestamp field. | Timestamps are represented in a ton of different ways, provide a transformation from going between strings, epoch times as longs, and Connect date/time types. |
|
Hoist{Key,Value}ToStruct | Wrap data in a |
| |
Extract{Key,Value}FromStruct | Extract a specific field from a Struct . |
|
Patterns for implementing data transformations
Data transformations could be applicable to the key or the value of the record. We will have *
Key
and *Value
variants for these transformations that reuse the common functionality from a shared base class.Some common utilities for data transformations will shape up:
Cache the changes they make to
Schema
objects, possibly only preserving last-seen one as the likelihood of source dataSchema
changing is low.Copying of
Schema
objects with the possible exclusion of some fields, which they are modifying. Likewise, copying ofStruct
object to anotherStruct
having a differentSchema
with the exception of some fields, which they are modifying.Where fields are being added and a field name specified in configuration, we will want a consistent way to convey if it should be created as an optional field. We can use a leading '?' character. TODO: specify escaping
ConfigDef
does not provide aType.MAP
, but for the time being we can piggyback on top ofType.LIST
and represent maps as a list of key-value pairs separated by:
TODO: specify escaping- Where field names are expected, in some cases we should allow for getting at nested fields by allowing a dotted syntax which is common in such usage (and accordingly, will need some utilities around accessing a field that may be nested). TODO: specify escaping
Compatibility, Deprecation, and Migration Plan
There are no backwards compatibility concerns. Transformation is an additional layer at the edge of record exchange between the framework and connectors.
Test Plan
Unit tests for runtime changes and each bundled transformation, as well as system test exercising a few different transformation chains.
Rejected Alternatives
Transformation chains as top-level construct
The current proposal is to have transformation chains be configured in the connector config under the prefix transforms
. An alternative would be to reference a transformation chain by a name in the connector configuration, with the transformation chain specification managed separately by Connect.
However, the surface area for such a change is much larger - we would need additional REST APIs for creating, updating and validating transformation chain configs. The current proposal does not prevent taking this direction down the line.