Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/kv6pnbk711ofnd6hzzlxjlp1g07n07tk

JIRA: KAFKA-17485 - Getting issue details... STATUS

Released: TBD (target 4.0)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams supports the KafkaClientSupplier interface, that allows users to create consumer, producer, and admin client instances, which the Kafka Streams runtime will use. With KIP-1071 (and other ideas to refactor and improve the Kafka Streams runtime internally), this model does not work any longer, because we want to move the clients and Kafka Streams code bases closer to each other, allowing Kafka Streams to build on internal client APIs (for KIP-1071 in particular, we aim to build on internal KafkaConsumer APIs). Mid to long term, we aim to more and more co-develop clients and Kafka Streams, and change Kafka Streams to not use clients as a block box as we do right now, but rather build on lower level primitives, effectively transforming Kafka Streams into a "processing client" by itself. This model required that the Kafka Streams runtime can rely on client internals, which we cannot guarantee if an externally create client instance is used.

Many developers use KafkaClientSupplier interface to wrap the standard KafkaAdminClient, KafkaConsumer, and KafkaProducer. To preserve this functionality, we propose to add a new KafkaClientInteceptor interface.

Public Interfaces

We propose to deprecate KafkaClientSupplier interface, and all related public API (e.g., KafkaStreams constructors and the corresponding default.client.supplier configuration). Additionally, we propose to add KafkaClientInterceptor interface and new KafkaStreams constructors and also add a corresponding config. To highlight the difference of KafkaConsumer (which assign partitions) to "task assignment" inside Kafka Streams, we also propose to add a new StreamsConsumer interface. At this point, we don't add any new method to it, but with KIP-1071, we expect that new methods will be added in the future (e.g., when adding client side task assignment support). In addition, we propose to add pre-defined interceptor classes.

package org.apache.kafka.streams;

// existing

@Deprecated
interface KafkaClientSupplier { ... }


// newly added

interface StreamsConsumer extends KafkaConsumer<byte[], byte[]> {
  // marker interface for future usage; does not add anything new at this point
}

public class KafkaStreamsConsumer extends KafkaConsumer<byte[], byte[]> implements StreamsConsumer {
  // no new methods at this point -- only implements `StreamsConsumer` marker interface
}

interface KafkaClientInterceptor {

    default AdminInterceptor wrapAdminClient(final KafkaAdminClient adminClient, Map<String, Object> adminConfig) {
        return new AdminInterceptor(adminClient);
    }

    default StreamsConsumerInterceptor wrapMainConsumer(final KafkaStreamsConsumer mainConsumer, Map<String, Object> mainConsumerConfig) {
        return new StreamsConsumerInterceptor(mainConsumer);
    }

    default ConsumerInterceptor<byte[], byte[]> wrapRestoreConsumer(final KafkaConsumer<byte[], byte[]> restoreConsumer, Map<String, Object> restoreConsumerConfig) {
        return new ConsumerInterceptor(restoreConsumer);
    }

    default ConsumerInterceptor<byte[], byte[]> wrapGlobalConsumer(final KafkaConsumer<byte[], byte[]> globalConsumer, Map<String, Object> globalConsumerConfig) {
        return new ConsumerInterceptor(globalConsumer);
    }

    default ProducerInterceptor<byte[], byte[]> wrapProducer(final KafkaProducer<byte[], byte[]> producer, Map<String, Object> producerConfig) {
        return new ProducerInterceptor(producer);
    }
}

public class AdminInterceptor implements Admin {
    private final Admin adminClient;

    public AdminInterceptor(final Admin adminClient) {
        this.adminClient = admintClient;
    }

    // some example methods:

    public void close() {
        admin.close();
    }

    public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics) {
        return admin.createTopics(newTopics);
    }

    // same interceptor methods for all other public methods on `Admin` interface; omitted for brevity 
}

public class ConsumerInterceptor implement Consumer { /* constructor plus interceptor methods for all public methods */ } // same as for AdminInterceptor

public class ProducerInterceptor implement Producer {  /* constructor plus interceptor methods for all public methods */ } // same as for AdminInterceptor

public class StreamsConsumerInterceptor extends ConsumerInterceptor {
    public StreamsConsumerInterceptor(final StreamsConsumer mainConsumer) {
        super(mainConsumer);
    }
}



// updating existing

public class KafkaStreams {

    @Deprecate
    public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier);
    @Deprecate
    public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, Time time);
    @Deprecate
    public KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier);
    // there is no 4th overload taking `Topology, StreamsConfig, KafkaClientSupplier, Time`

    // new constructors
    public KafkaStreams(Topology topology, Properties props, KafkaClientInterceptor clientInterceptor);
    public KafkaStreams(Topology topology, Properties props, KafkaClientInterceptor clientInterceptor, Time time);
    public KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientInterceptor clientInterceptor);
    // add missing 4th overload
    public KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientInterceptor clientInterceptor, Time time);
}

public class StreamsConfig {
    @Deprecated
    public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";

    // newly added
    // default value is `null` -- accepts a `KafkaClientInterceptor` implementation either as .class or String (fully qualified name)
    public static final String DEFAULT_CLIENT_INTERCEPTOR_CONFIG = "default.client.interceptor"


    @Deprecated
    public KafkaClientSupplier getKafkaClientSupplier();
}

Proposed Changes

We propose to replace KafkaClientSupplier with KafkaClientInterceptor interface. The new interface will get a client instance, that the Kafka Streams runtime creates, as parameter plus the config used to create the client, and returns "interceptor client". It would be invalid to create a new client instance. The default implementation of the "wrapXxx()" methods use the pre-define [Admin|StreamsConsumer|Consumer|Producer]Interceptor classes to wrap the provided client. We introduce [Admin|StreamsConsumer|Consumer|Producer]Interceptor classes to provide better backward compatibility. Assume, we add a new method to the StreamsConsumer interface in the future: w/o the pre-defined StreamsConsumerInterceptor we would define KafkaClientInterceptor methods differently, e.g., StreamsClient warpMainConsumer(...) and user code would need to return new StreamsConsumer() { ... } from wrapMainConsumer(), what implies that their code won't compile any longer when we add a new method to StreamsConsumer interface. In contrast, using StreamsConsumerInterceptor when adding a new method to StreamsConsumer  we can add a new default implementation to StreamsConsumerInterceptor avoiding that user code breaks (the same applies for the other clients). – In addition, expecting XxxInterceptor as return type, make the contract to only wrap the provided client for interception clear (as it will not be allowed any longer to return a new Producer(0) instance.)

With KIP-1071 enabled, it would not be allowed to use KafkaClientSupplier any longer (as long as KafkaClientSupplier is not removed). This implies that existing Kafka Streams applications which use the KafkaClientSupplier right now, would need to switch to the new KafkaClientInterceptor first, before they can enable KIP-1071. It's also not allowed to use both the old KafkaClientSupplier and new KafkaClientInterceptor at the same time; we would raise a config exception for these cases.

Compatibility, Deprecation, and Migration Plan

We deprecate exiting APIs which is not a breaking change (subject to removal, most likely in 5.0 release). The new interface mimics the functionality of the deprecated one, and thus, there is not concern with removed functionality. The only theoretically possible feature gap would be, if the KafkaClientSupplier would be used to instantiate some third party implementation of the Admin, Consumer, or Producer interfaces. We are not aware of any third party Java implementations of these interfaces that would be used in combination with Kafka Streams, and don't see this as a risk. Of course, we might not be aware of all user cases, and hope that the deprecation period of the KafkaClientSupplier interface would bring any such use-case to our attention, so we can work on other solutions if necessary.

Test Plan

Standard unit testing is sufficient for this feature.

Documentation Plan

The documentation will be updates as always, including examples, and the Kafka Streams configuration page.

Rejected Alternatives

N/A

  • No labels