Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/kv6pnbk711ofnd6hzzlxjlp1g07n07tk
JIRA: - KAFKA-17485Getting issue details... STATUS
Released: TBD (target 4.0)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Streams supports the KafkaClientSupplier
interface, that allows users to create consumer, producer, and admin client instances, which the Kafka Streams runtime will use. With KIP-1071 (and other ideas to refactor and improve the Kafka Streams runtime internally), this model does not work any longer, because we want to move the clients and Kafka Streams code bases closer to each other, allowing Kafka Streams to build on internal client APIs (for KIP-1071 in particular, we aim to build on internal KafkaConsumer APIs). Mid to long term, we aim to more and more co-develop clients and Kafka Streams, and change Kafka Streams to not use clients as a block box as we do right now, but rather build on lower level primitives, effectively transforming Kafka Streams into a "processing client" by itself. This model required that the Kafka Streams runtime can rely on client internals, which we cannot guarantee if an externally create client instance is used.
Many developers use KafkaClientSupplier
interface to wrap the standard KafkaAdminClient
, KafkaConsumer
, and KafkaProducer
. To preserve this functionality, we propose to add a new KafkaClientInteceptor
interface.
Public Interfaces
We propose to deprecate KafkaClientSupplier
interface, and all related public API (e.g., KafkaStreams
constructors and the corresponding default.client.supplier
configuration). Additionally, we propose to add KafkaClientInterceptor
interface and new KafkaStreams
constructors and also add a corresponding config. To highlight the difference of KafkaConsumer
(which assign partitions) to "task assignment" inside Kafka Streams, we also propose to add a new StreamsConsumer
interface. At this point, we don't add any new method to it, but with KIP-1071, we expect that new methods will be added in the future (e.g., when adding client side task assignment support). In addition, we propose to add pre-defined interceptor classes.
package org.apache.kafka.streams; // existing @Deprecated interface KafkaClientSupplier { ... } // newly added interface StreamsConsumer extends KafkaConsumer<byte[], byte[]> { // marker interface for future usage; does not add anything new at this point } public class KafkaStreamsConsumer extends KafkaConsumer<byte[], byte[]> implements StreamsConsumer { // no new methods at this point -- only implements `StreamsConsumer` marker interface } interface KafkaClientInterceptor { default AdminInterceptor wrapAdminClient(final KafkaAdminClient adminClient, Map<String, Object> adminConfig) { return new AdminInterceptor(adminClient); } default StreamsConsumerInterceptor wrapMainConsumer(final KafkaStreamsConsumer mainConsumer, Map<String, Object> mainConsumerConfig) { return new StreamsConsumerInterceptor(mainConsumer); } default ConsumerInterceptor<byte[], byte[]> wrapRestoreConsumer(final KafkaConsumer<byte[], byte[]> restoreConsumer, Map<String, Object> restoreConsumerConfig) { return new ConsumerInterceptor(restoreConsumer); } default ConsumerInterceptor<byte[], byte[]> wrapGlobalConsumer(final KafkaConsumer<byte[], byte[]> globalConsumer, Map<String, Object> globalConsumerConfig) { return new ConsumerInterceptor(globalConsumer); } default ProducerInterceptor<byte[], byte[]> wrapProducer(final KafkaProducer<byte[], byte[]> producer, Map<String, Object> producerConfig) { return new ProducerInterceptor(producer); } } public class AdminInterceptor implements Admin { private final Admin adminClient; public AdminInterceptor(final Admin adminClient) { this.adminClient = admintClient; } // some example methods: public void close() { admin.close(); } public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics) { return admin.createTopics(newTopics); } // same interceptor methods for all other public methods on `Admin` interface; omitted for brevity } public class ConsumerInterceptor implement Consumer { /* constructor plus interceptor methods for all public methods */ } // same as for AdminInterceptor public class ProducerInterceptor implement Producer { /* constructor plus interceptor methods for all public methods */ } // same as for AdminInterceptor public class StreamsConsumerInterceptor extends ConsumerInterceptor { public StreamsConsumerInterceptor(final StreamsConsumer mainConsumer) { super(mainConsumer); } } // updating existing public class KafkaStreams { @Deprecate public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier); @Deprecate public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, Time time); @Deprecate public KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier); // there is no 4th overload taking `Topology, StreamsConfig, KafkaClientSupplier, Time` // new constructors public KafkaStreams(Topology topology, Properties props, KafkaClientInterceptor clientInterceptor); public KafkaStreams(Topology topology, Properties props, KafkaClientInterceptor clientInterceptor, Time time); public KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientInterceptor clientInterceptor); // add missing 4th overload public KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientInterceptor clientInterceptor, Time time); } public class StreamsConfig { @Deprecated public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; // newly added // default value is `null` -- accepts a `KafkaClientInterceptor` implementation either as .class or String (fully qualified name) public static final String DEFAULT_CLIENT_INTERCEPTOR_CONFIG = "default.client.interceptor" @Deprecated public KafkaClientSupplier getKafkaClientSupplier(); }
Proposed Changes
We propose to replace KafkaClientSupplier
with KafkaClientInterceptor
interface. The new interface will get a client instance, that the Kafka Streams runtime creates, as parameter plus the config used to create the client, and returns "interceptor client". It would be invalid to create a new client instance. The default implementation of the "wrapXxx()" methods use the pre-define [Admin|StreamsConsumer|Consumer|Producer]Interceptor
classes to wrap the provided client. We introduce [Admin|StreamsConsumer|Consumer|Producer]Interceptor
classes to provide better backward compatibility. Assume, we add a new method to the StreamsConsumer
interface in the future: w/o the pre-defined StreamsConsumerInterceptor
we would define KafkaClientInterceptor
methods differently, e.g., StreamsClient warpMainConsumer(...)
and user code would need to return new StreamsConsumer() { ... }
from wrapMainConsumer()
, what implies that their code won't compile any longer when we add a new method to StreamsConsumer
interface. In contrast, using StreamsConsumerInterceptor
when adding a new method to StreamsConsumer
we can add a new default implementation to StreamsConsumerInterceptor
avoiding that user code breaks (the same applies for the other clients). – In addition, expecting XxxInterceptor
as return type, make the contract to only wrap the provided client for interception clear (as it will not be allowed any longer to return a new Producer(0)
instance.)
With KIP-1071 enabled, it would not be allowed to use KafkaClientSupplier
any longer (as long as KafkaClientSupplier
is not removed). This implies that existing Kafka Streams applications which use the KafkaClientSupplier
right now, would need to switch to the new KafkaClientInterceptor
first, before they can enable KIP-1071. It's also not allowed to use both the old KafkaClientSupplier
and new KafkaClientInterceptor
at the same time; we would raise a config exception for these cases.
Compatibility, Deprecation, and Migration Plan
We deprecate exiting APIs which is not a breaking change (subject to removal, most likely in 5.0 release). The new interface mimics the functionality of the deprecated one, and thus, there is not concern with removed functionality. The only theoretically possible feature gap would be, if the KafkaClientSupplier
would be used to instantiate some third party implementation of the Admin
, Consumer
, or Producer
interfaces. We are not aware of any third party Java implementations of these interfaces that would be used in combination with Kafka Streams, and don't see this as a risk. Of course, we might not be aware of all user cases, and hope that the deprecation period of the KafkaClientSupplier
interface would bring any such use-case to our attention, so we can work on other solutions if necessary.
Test Plan
Standard unit testing is sufficient for this feature.
Documentation Plan
The documentation will be updates as always, including examples, and the Kafka Streams configuration page.
Rejected Alternatives
N/A