DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/7y7svyp3f560fzv1bgcr893vn258cn06
JIRA:
KAFKA-20197
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The motivation for KIP-1321 emerged during the implementation of KAFKA-20173, which is part of a larger initiative to provide comprehensive header support in Kafka Streams state stores, as outlined in KIP-1271: Allow to Store Record Headers in State Stores.
The StreamPartitioner interface is used to determine the destination partition for records sent to sink topics or internal repartition topics. Currently, the partitions method only receives the topic, key, value, and the number of partitions.
By adding header support to StreamPartitioner, we ensure that headers are consistently available throughout the Kafka Streams processing pipeline—from the source, through transformations and state stores (as per KIP-1271), to final partitioning and serialization.
Public Interfaces
We propose adding a new method to the StreamPartitioner interface. To maintain backward compatibility, this will be a default method that delegates to the existing partitions method.
public interface StreamPartitioner<K, V> {
Optional<Set<Integer>> partitions(String topic, K key, V value, int numPartitions);
/**
* Determine the number(s) of the partition(s) to which a record with the given key and value should be sent,
* for the given topic and current partition count
* @param topic the topic name this record is sent to
* @param key the key of the record
* @param value the value of the record
* @param headers the record headers
* @param numPartitions the total number of partitions
* @return an Optional of Set of integers between 0 and {@code numPartitions-1},
* Empty optional means using default partitioner
* Optional of an empty set means the record won't be sent to any partitions i.e drop it.
* Optional of Set of integers means the partitions to which the record should be sent to.
* */
default Optional<Set<Integer>> partitions(String topic, K key, V value, Headers headers, int numPartitions) {
return partitions(topic, key, value, numPartitions);
}
}
Proposed Changes
- Interface Enhancement: Add the headers-aware
partitionsmethod toStreamPartitioner. This is adefaultmethod to ensure binary compatibility. - Public API for Interactive Queries: Add new overloads to
KafkaStreams#queryMetadataForKeythat accept `Headers`. This allows users to perform metadata lookups for header-dependent partitioning strategies - Internal Callers and Propagation:
RecordCollectorImpl#sendto pass record headers when calling the partitioner.StreamsMetadataStateto support the new header-aware lookups. Existing header-less methods will propagate empty headers to the underlying partitioner logic- Existing public methods in
KafkaStreamsandStreamsMetadataStatethat do not accept headers will remain. They will propagate empty headers to the underlying header-aware partitioner methods to ensure consistent interface usage.
- Built-in Implementations:
WindowedStreamPartitioner: Override the new method to propagate headers to the underlyingWindowedSerializer#serializeBaseKey.DefaultStreamPartitioner: Override the new method to propagate headers to thekeySerializer#serialize
Compatibility, Deprecation, and Migration Plan
- Backward Compatibility: change is fully backward compatible. Existing custom
StreamPartitionerimplementations will continue to work as they will use the default implementation of the new method. - Deprecation: No methods are being deprecated in this KIP.
- Migration: Users wishing to leverage headers in their custom partitioning logic should migrate their
StreamPartitionerimplementations to override the new `partitions` method.
Test Plan
- Unit Tests and Integration tests:
- Verify
Partitioners correctly propagate headers. - Verify propagated headers are used to determine target partition
- Verify
Rejected Alternatives
- Updated the existing method without default: Rejected as it would break all existing user implementations.