Status

Current state: "Draft"

Discussion thread: TBD

JIRA: KAFKA-17178 - Getting issue details... STATUS and KAFKA-17131 - Getting issue details... STATUS

Released: targeting 4.1

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We did deprecate the "old Processor API" of Kafka Streams and replaced it with the "new Processor API" to improve type safety via KIP-478. However, we missed to update KTable.transformValues which is still using the old Processor API. This prevented us from completely removing all classes of the old Processor API in Apache Kafka 4.0 release. To complete removing the old Processor API in Apache Kafka 5.0, we need to deprecate KTable.transformValues and replace it with a new method which uses the new Processor API.

Public Interfaces

package org.apache.kafka.streams.kstream;

public interface KTable<K, V> {
    // to be deprecated

    @Deprecated
    <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
                                       final String... stateStoreNames);

    @Deprecated
    <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
                                       final Named named,
                                       final String... stateStoreNames);

    @Deprecated
    <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
                                       final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                       final String... stateStoreNames);

    @Deprecated
    <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
                                       final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                       final Named named,
                                       final String... stateStoreNames);

    // newly added
    <VOut> KTable<K, VOut> processValues(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
                                         final String... stateStoreNames);

    <VOut> KTable<K, VOut> processValues(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
                                         final Named named,
                                         final String... stateStoreNames);

    <VOut> KTable<K, VOut> processValues(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
                                         final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
                                         final String... stateStoreNames);

    <VOut> KTable<K, VOut> processValues(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
                                         final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
                                         final Named named,
                                         final String... stateStoreNames);
}

// interfaces to be deprecated
@Deprecated
public interface ValueTransformerWithKeySupplier<K, V, VR> extends ConnectedStoreProvider, Supplier<ValueTransformerWithKey<K, V, VR>> { ... }

@Deprecated
public interface ValueTransformerWithKey<K, V, VR> { ... }


package org.apache.kafka.streams.processor;

@Deprecated
public interface ProcessorContext { ... }

@Deprecated
public class To { ... }

Proposed Changes

We propose to deprecate the four overloads of KTable.transformValues() and replace them with equivalent new method KTable.processValue(), which offer the same functionality, but use the new type safe Processor API. Furthermore, we deprecate helper interfaces and classes ValueTransformerWithKeySupplier,  ValueTransformerWithKey, (the old) ProcessorContext, as well as class To. For the helper interfaces/classes, there is no need to replace them (ie, ValueTransformerWithKeySupplier,  ValueTransformerWithKey, as well as To which is only use by (old) ProcessorContext), or they have already a replacement (ie, for ProcssorContext there is already the new api.ProcessorContext).

Compatibility, Deprecation, and Migration Plan

User have until Apache Kafka 5.0 release, to rewrite their code. Given that KTable.processValue() provides the same functionality as the existing KTable.transformValues() there is no functional changes. As we only deprecate things for now, no breaking changes are introduced until Apache Kafka 5.0 release, and users can still upgrade w/o code changes until than.

TODO (after KIP was accepted) For Kafka Streams deprecation, please add a corresponding sub-task to KAFKA-16337 - Getting issue details... STATUS for tracking, after the KIP was approved, and include a link to the created ticket in this section.

Test Plan

Existing test for KTable#transformValues will be duplicate using KTabel#processValue() to carry forward existing test coverage. No other additional tests should be required.

Documentation Plan

Documentation and JavaDocs will be updated accordingly, including a "migration guide" to help users to rewrite their code from the old KTable#transformValues() to the new KTable.processValues(), ie, we we build on the existing migration guide for ValueTransformerWithKey to FixedKeyProcessor.

Rejected Alternatives

n/a

  • No labels