Status

Current state: Accepted

Discussion thread: here 

JIRA: KAFKA-18026 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Sometimes users may have a common code module that they wish to apply across all processors of their application, such as a custom logger or debugging element. Right now users have to manually apply this module to every processor in their application, and can only do so for PAPI applications. DSL users are simply out of luck. To make things easier for PAPI users and possible for DSL users, we would like to introduce a mechanism for wrapping processors throughout a Kafka Streams topology.

The primary change here is the introduction of the ProcessorWrapper, but we will also use this as an opportunity to clean up the related TopologyConfig

Public Interfaces

The following public config definition would be added to the StreamsConfig  class. Note that the config itself will be defined in both the StreamsConfig and the TopologyConfig

StreamsConfig
public class StreamsConfig {

    public static final String PROCESSOR_WRAPPER_CLASS_CONFIG = "processor.wrapper.class";
    private static final String PROCESSOR_WRAPPER_CLASS_DOC = "A processor wrapper class or class name that implements the <code>org.apache.kafka.streams.state.ProcessorWrapper</code> interface. Must be passed in to the StreamsBuilder or Topology constructor in order to take effect";
}

The ProcessorWrapper itself is defined as follows:

ProcessorWrapper
package org.apache.kafka.streams.processor.api;

/**
 * Wrapper class that can be used to inject custom wrappers around the processors of their application topology.
 * The returned instance should wrap the supplied {@code ProcessorSupplier} and the {@code Processor} it supplies
 * to avoid disrupting the regular processing of the application, although this is not required and any processor
 * implementation can be substituted in to replace the original processor entirely (which may be useful for example
 * while testing or debugging an application topology).
 * <p>
 * NOTE: in order to use this feature, you must set the {@link StreamsConfig#PROCESSOR_WRAPPER} config and pass it
 * in as a {@link TopologyConfig} when creating the {@link StreamsBuilder} or {@link Topology} by using the
 * appropriate constructor (ie {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} or {@link Topology#Topology(TopologyConfig)})
 * <p>
 * Can be configured, if desired, by implementing the {@link #configure(Map)} method. This will be invoked when
 * the {@code ProcessorWrapper} is instantiated, and will provide it with the TopologyConfigs that were passed in 
 * to the {@link StreamsBuilder} or {@link Topology} constructor.
 */
public interface ProcessorWrapper extends Configurable {

    @Override
    default void configure(final Map<String, ?> configs) {
        // do nothing
    }

    /**
     * Wrap or replace the provided {@link ProcessorSupplier} and return a {@link WrappedProcessorSupplier}
     * To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
     * use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
     */  
    <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName,
                                                                                                        final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier);

    /**
     * Wrap or replace the provided {@link FixedKeyProcessorSupplier} and return a {@link WrappedFixedKeyProcessorSupplier}
     * To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
     * use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
     */
     <KIn, VIn,  VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn,  VOut> wrapFixedKeyProcessorSupplier(final String processorName,
                                                                                                      final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier);

     /**
     * Use to convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier}
     */
     static <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> asWrapped(
        final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
    ) {
        return new WrappedProcessorSupplierImpl<>(processorSupplier);
    }

    /**
     * Use to convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier}
     */
    static <KIn, VIn,  VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn,  VOut> asWrappedFixedKey(
        final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
    ) {
        return new WrappedFixedKeyProcessorSupplierImpl<>(processorSupplier);
    } }

The return types are new interfaces introduced in this KIP for future compatibility (in case we want to add methods to the wrapped processor suppliers) and type checking (since we may want the ability to distinguish between an unwrapped processor supplier and one that has already been wrapped).

For now, these will simply be marker interfaces that extend the corresponding processor supplier class with no additional methods. They are defined below:

WrappedProcessorSupplier
package org.apache.kafka.streams.processor.api;  

/**
 * Marker interface for classes implementing {@link ProcessorSupplier}
 * that have been wrapped via a {@link ProcessorWrapper}.
 * <p>
 * To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
 * use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
 */
public interface WrappedProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, VIn, KOut, VOut> {

}
WrappedFixedKeyProcessorSupplier
package org.apache.kafka.streams.processor.api; 

/**
 * Marker interface for classes implementing {@link FixedKeyProcessorSupplier}
 * that have been wrapped via a {@link ProcessorWrapper}.
 * <p>
 * To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
 * use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
 */
public interface WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> extends FixedKeyProcessorSupplier<KIn, VIn, VOut> {

}

Proposed Changes

As noted above, we will introduce a new ProcessorWrapper class and its associated config. Unfortunately we have to add this to the TopologyConfig, rather than the Streams config, because we need access to the ProcessorWrapper during the topology construction and by the time the Topology is built and handed in to the KafkaStreams app, it is too late. We would like to do a cleanup of the config handling at some point as well, but will save that for a followup KIP at this time (eg StreamsConfig vs TopologyConfig with their overlapping configs, also KafkaStreams::new vs StreamsBuilder::new vs StreamsBuilder::build vs Topology::new  all of which take in various forms of configs)

If a user supplies a ProcessorWrapper, it will be applied immediately as processors are added to the topology, whether via the PAPI or the DSL. A user can decide whether or not to add a layer wrapping the passed-in process supplier or even opt to replace the provided processor supplier entirely without wrapping it. 

Compatibility, Deprecation, and Migration Plan

  • N/A

Test Plan

We will write a full integration test using both the PAPI and the DSL (with a processor/transformer) to ensure that the wrapping layer does not interfere with the underlying application

Rejected Alternatives

N/A

  • No labels