Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/7y7svyp3f560fzv1bgcr893vn258cn06

Voting thread: https://lists.apache.org/thread/b55fl20b3fhj51nttytsg59vs0hmvfol

JIRA: KAFKA-20197 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The motivation for KIP-1321 emerged during the implementation of KAFKA-20173, which is part of a larger initiative to provide comprehensive header support in Kafka Streams state stores, as outlined in KIP-1271: Allow to Store Record Headers in State Stores.

The StreamPartitioner interface is used to determine the destination partition for records sent to sink topics or internal repartition topics. Currently, the partitions method only receives the topic, key, value, and the number of partitions.

By adding header support to StreamPartitioner, we ensure that headers are consistently available throughout the Kafka Streams processing pipeline—from the source, through transformations and state stores (as per KIP-1271), to final partitioning and serialization.

Public Interfaces

We propose adding a new method to the StreamPartitioner interface. To maintain backward compatibility, this will be a default method that delegates to the existing partitions method.


StreamPartitioner
public interface StreamPartitioner<K, V> {

    @Deprecated(since = "4.3", forRemoval = true)
    Optional<Set<Integer>> partitions(String topic, K key, V value, int numPartitions);

    /**
     * Determine the number(s) of the partition(s) to which a record with the given key and value should be sent,
     * for the given topic and current partition count
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param headers the record headers
     * @param numPartitions the total number of partitions
     * @return an Optional of Set of integers between 0 and {@code numPartitions-1},
     * Empty optional means using default partitioner
     * Optional of an empty set means the record won't be sent to any partitions i.e drop it.
     * Optional of Set of integers means the partitions to which the record should be sent to.
     * */
    default Optional<Set<Integer>> partitions(String topic, K key, V value, Headers headers, int numPartitions) {
        return partitions(topic, key, value, numPartitions);
    }
}


KafkaStreams
    // already existing method
	@Deprecated(since = "4.3", forRemoval = true)
    public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                    final K key,
                                                    final Serializer<K> keySerializer) {
        return queryMetadataForKey(storeName, key, new RecordHeaders(), keySerializer);
    }

    // already existing method 
	@Deprecated(since = "4.3", forRemoval = true)
    public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                    final K key,
                                                    final StreamPartitioner<? super K, ?> partitioner) {
        return queryMetadataForKey(storeName, key, new RecordHeaders(), partitioner);
    }

    /**
     * Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
     *
     * @param storeName     the {@code storeName} to find metadata for
     * @param key           the key to find metadata for
     * @param headers       the record headers
     * @param keySerializer serializer for the key
     * @param <K>           key type
     * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store,
     * or {@code null} if no matching metadata could be found.
     */
    public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                    final K key,
                                                    final Headers headers,
                                                    final Serializer<K> keySerializer) {
        validateIsRunningOrRebalancing();
        return streamsMetadataState.keyQueryMetadataForKey(storeName, key, headers, keySerializer);
    }

    /**
     * Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
     *
     * @param storeName     the {@code storeName} to find metadata for
     * @param key           the key to find metadata for
	 * @param headers       the record headers
     * @param partitioner   the partitioner to be used to locate the host for the key
     * @param <K>           key type
     * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using
     * the supplied partitioner, or {@code null} if no matching metadata could be found.
     */
    public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
                                                    final K key,
												    final Headers headers,
                                                    final StreamPartitioner<? super K, ?> partitioner) {
        validateIsRunningOrRebalancing();
        return streamsMetadataState.keyQueryMetadataForKey(storeName, key, headers, partitioner);
    }



Proposed Changes

  1. Interface Enhancement: Add the headers-aware partitions method to StreamPartitioner. This is a default method to ensure binary compatibility.
  2. Public API for Interactive Queries: Add new overloads to KafkaStreams#queryMetadataForKey that accept `Headers`. This allows users to perform metadata lookups for header-dependent partitioning strategies
  3. Headers Mutability and Nullability: While headers are mutable by nature, StreamPartitioner is not expected to mutate them. The documentation will explicitly mention that mutating headers can lead to unexpected results. Additionally, when using StreamPartitioner or Interactive Queries (IQ), headers can be empty, but they cannot be null. This behavior will also be explicitly documented

  4. No changes are planed for KafkaStreamsNamedTopologyWrapper. This class and entire feature are deprecated, so no changes are planed there
  5. Internal Callers and Propagation:
    1. RecordCollectorImpl#send to pass record headers when calling the partitioner.
    2. StreamsMetadataState to support the new header-aware lookups.
  6. Built-in Implementations:
    1. WindowedStreamPartitioner: Override the new method to propagate headers to the underlying WindowedSerializer#serializeBaseKey.

    2. DefaultStreamPartitioner: Override the new method to propagate headers to the keySerializer#serialize

Compatibility, Deprecation, and Migration Plan

  • Backward Compatibility: change is fully backward compatible. Existing custom StreamPartitioner implementations will continue to work as they will use the default implementation of the new method.
  • Deprecation: Deprecate non-headers public methods to be able to remove them in Kafka 5.0
  • Migration: Users wishing to leverage headers in their custom partitioning logic should migrate their StreamPartitioner implementations to override the new `partitions` method.
  • Functional interface migration: StreamPartitioner is a functional interface that currently exposes headerless methods, which are going to be deprecated. Users will be notified about using the deprecated API, and they will need to implement the interface directly instead of using lambda syntax. However, they can return to using the functional style partitioner once the deprecated methods are completely removed, which is planned for Kafka 5.0

Test Plan

  • Unit Tests and Integration tests:
    • Verify Partitioners correctly propagate headers.
    • Verify propagated headers are used to determine target partition

Rejected Alternatives

  • Updated the existing method without default: Rejected as it would break all existing user implementations.
  • No labels