Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update Transformation interface in KIP with version that was committed

...

 

Code Block
languagejava
// Existing base class for SourceRecord and SinkRecord, new self type parameter.
public abstract class ConnectRecord<R extends ConnectRecord<R>> {
 
    // ...
 
    // New abstract method:
       
    /** Generate a new record of the same type as itself, with the specified parameter values. **/
    public abstract R newRecord(String topic, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp);
 
}

public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
 
    /** Initialize with the provided configuration properties. **/
    void init/ via Configurable base interface:
    // void configure(Map<String, String>?> configconfigs);
 
    /**
     * Apply transformation to the {@code record} and return another record object (which may be {@code record} itself) or {@code null},
     * corresponding to a map or filter operation respectively. The Mustimplementation must be thread-safe.
 *    */
    R apply(R record);
 
    /** SignalConfiguration specification thatfor this transformation instance will no longer will be used. **/
    voidConfigDef closeconfig();
 
    /** ConfigurationSignal specificationthat for this transformation instance will no longer will be used. **/
    @Override
    ConfigDefvoid configclose();
 
}

 

Configuration

A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms config which represents a list of aliases. An alias in transforms implies that some additional keys are configurable:
transforms.$alias.type – fully qualified class name for the transformation
transforms.$alias.* – all other keys as defined in Transformation.config() are embedded with this prefix

...