DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Draft"
Discussion thread: TBD
JIRA: KAFKA-17178 - Getting issue details... STATUS , KAFKA-17131 - Getting issue details... STATUS , and KAFKA-19688 - Getting issue details... STATUS
Released: targeting 4.3
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
We did deprecate the "old Processor API" of Kafka Streams and replaced it with the "new Processor API" to improve type safety via KIP-478. However, we missed to update KTable.transformValues which is still using the old Processor API. This prevented us from completely removing all classes of the old Processor API in Apache Kafka 4.0 release. To complete removing the old Processor API in Apache Kafka 5.0, we need to deprecate KTable.transformValues and replace it with a new method which uses the new Processor API.
Unfortunately, we also introduced a backward incompatibility bug in the newly added KStream.processValues implementation (cf
KAFKA-19668
-
Getting issue details...
STATUS
). To address this backward incompatibility, we propose to also deprecate KStreams.processValues methods, and replace with a backward compatible alternative.
Public Interfaces
package org.apache.kafka.streams.kstream;
public interface KTable<K, V> {
// to be deprecated
@Deprecated
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final String... stateStoreNames);
@Deprecated
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Named named,
final String... stateStoreNames);
@Deprecated
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames);
@Deprecated
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final Named named,
final String... stateStoreNames);
// newly added
<VOut> KTable<K, VOut> processValues(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final String... stateStoreNames);
<VOut> KTable<K, VOut> processValues(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Named named,
final String... stateStoreNames);
<VOut> KTable<K, VOut> processValues(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames);
<VOut> KTable<K, VOut> processValues(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
final Named named,
final String... stateStoreNames);
}
// interfaces to be deprecated
@Deprecated
public interface ValueTransformerWithKeySupplier<K, V, VR> extends ConnectedStoreProvider, Supplier<ValueTransformerWithKey<K, V, VR>> { ... }
@Deprecated
public interface ValueTransformerWithKey<K, V, VR> { ... }
package org.apache.kafka.streams.processor;
@Deprecated
public interface ProcessorContext { ... }
@Deprecated
public class To { ... }
public interface KStream<K, V> {
// to be deprecated
@Deprecated
<VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final String... stateStoreNames
);
@Deprecated
<VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
);
// newly added
<VOut> KStream<K, VOut> process(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final String... stateStoreNames
);
<VOut> KStream<K, VOut> process(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
);
}
Proposed Changes
We propose to deprecate the four overloads of KTable.transformValues() and replace them with equivalent new methods KTable.processValue(), which offer the same functionality, but use the new type safe Processor API. Furthermore, we deprecate helper interfaces and classes ValueTransformerWithKeySupplier, ValueTransformerWithKey, (the old) ProcessorContext, as well as class To. For the helper interfaces/classes, there is no need to replace them (ie, ValueTransformerWithKeySupplier, ValueTransformerWithKey, as well as To which is only use by (old) ProcessorContext), or they have already a replacement (ie, for ProcssorContext there is already the new api.ProcessorContext).
We further propose to deprecate the two overloads for KStream#processValues() and replace them with equivalent new methods KStream#process(), which offer the same functionality, but provide a correct implementation.
Compatibility, Deprecation, and Migration Plan
User have until Apache Kafka 5.0 release, to rewrite their code. Given that KTable.processValue() provides the same functionality as the existing KTable.transformValues() there is no functional changes. As we only deprecate things for now, no breaking changes are introduced until Apache Kafka 5.0 release, and users can still upgrade w/o code changes until than.
For Apache Kafka 3.x users, we provide a safe and fully backward compatible upgrade path from KStream#transformValues() and KStreams#flatTransformValues() to the newly added KStreams#process() methods.
The only backward incompatibility we cannot address, is a potential issue for Kafka Streams 4.0.0 and 4.1.0 users (and potentially 4.0.1, 4.1.1, and 4.2.0 users, if they have an affected Topology, do not apply the fix of
KAFKA-19668
-
Getting issue details...
STATUS
). if these uses need to migrate off the deprecated methods KStreams#processValues(). We will update the docs with all necessary information, including the documentation for 4.0, 4.1, and 4.2 release, to guide users to not use KStream#processValues() (or use it in the right way) to avoid any future issue. – Given that the bug of
KAFKA-19668
-
Getting issue details...
STATUS
only applies to very specific scenarios, we expect only a very small number of users to be affected by any backward incompatibility issues, and these users still have time to address these issues manually before the next major release. Given that KStream#processValue() won't be removed any time soon, we provide plenty of time and users can upgrade to newer version of Kafka Streams just staying on the deprecated methods for the time being.
TODO (after KIP was accepted) For Kafka Streams deprecation, please add a corresponding sub-task to KAFKA-16337 - Getting issue details... STATUS for tracking, after the KIP was approved, and include a link to the created ticket in this section.
Test Plan
Existing test for KTable.transformValues will be duplicate using KTabel.processValue(), and test for KStream.processValues() will be duplicated using KStream.process() to carry forward existing test coverage. Additionally, we will provide upgrade test from older versions to newer version to verify that the upgrade path works as expected.
Documentation Plan
Documentation and JavaDocs will be updated accordingly, including a "migration guide" to help users to rewrite their code from the old KTable#transformValues() to the new KTable.processValues() (ie, we build on the existing migration guide for ValueTransformerWithKey to FixedKeyProcessor), plus migrating from KStream#processValues() to KStream#process().
Rejected Alternatives
n/a