DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/7y7svyp3f560fzv1bgcr893vn258cn06
Voting thread: https://lists.apache.org/thread/b55fl20b3fhj51nttytsg59vs0hmvfol
JIRA:
KAFKA-20197
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The motivation for KIP-1321 emerged during the implementation of KAFKA-20173, which is part of a larger initiative to provide comprehensive header support in Kafka Streams state stores, as outlined in KIP-1271: Allow to Store Record Headers in State Stores.
The StreamPartitioner interface is used to determine the destination partition for records sent to sink topics or internal repartition topics. Currently, the partitions method only receives the topic, key, value, and the number of partitions.
By adding header support to StreamPartitioner, we ensure that headers are consistently available throughout the Kafka Streams processing pipeline—from the source, through transformations and state stores (as per KIP-1271), to final partitioning and serialization.
Public Interfaces
We propose adding a new method to the StreamPartitioner interface. To maintain backward compatibility, this will be a default method that delegates to the existing partitions method.
public interface StreamPartitioner<K, V> {
@Deprecated(since = "4.3", forRemoval = true)
Optional<Set<Integer>> partitions(String topic, K key, V value, int numPartitions);
/**
* Determine the number(s) of the partition(s) to which a record with the given key and value should be sent,
* for the given topic and current partition count
* @param topic the topic name this record is sent to
* @param key the key of the record
* @param value the value of the record
* @param headers the record headers
* @param numPartitions the total number of partitions
* @return an Optional of Set of integers between 0 and {@code numPartitions-1},
* Empty optional means using default partitioner
* Optional of an empty set means the record won't be sent to any partitions i.e drop it.
* Optional of Set of integers means the partitions to which the record should be sent to.
* */
default Optional<Set<Integer>> partitions(String topic, K key, V value, Headers headers, int numPartitions) {
return partitions(topic, key, value, numPartitions);
}
}
// already existing method
@Deprecated(since = "4.3", forRemoval = true)
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
return queryMetadataForKey(storeName, key, new RecordHeaders(), keySerializer);
}
// already existing method
@Deprecated(since = "4.3", forRemoval = true)
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
return queryMetadataForKey(storeName, key, new RecordHeaders(), partitioner);
}
/**
* Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
*
* @param storeName the {@code storeName} to find metadata for
* @param key the key to find metadata for
* @param headers the record headers
* @param keySerializer serializer for the key
* @param <K> key type
* Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store,
* or {@code null} if no matching metadata could be found.
*/
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final Headers headers,
final Serializer<K> keySerializer) {
validateIsRunningOrRebalancing();
return streamsMetadataState.keyQueryMetadataForKey(storeName, key, headers, keySerializer);
}
/**
* Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
*
* @param storeName the {@code storeName} to find metadata for
* @param key the key to find metadata for
* @param headers the record headers
* @param partitioner the partitioner to be used to locate the host for the key
* @param <K> key type
* Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using
* the supplied partitioner, or {@code null} if no matching metadata could be found.
*/
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final Headers headers,
final StreamPartitioner<? super K, ?> partitioner) {
validateIsRunningOrRebalancing();
return streamsMetadataState.keyQueryMetadataForKey(storeName, key, headers, partitioner);
}
Proposed Changes
- Interface Enhancement: Add the headers-aware
partitionsmethod toStreamPartitioner. This is adefaultmethod to ensure binary compatibility. - Public API for Interactive Queries: Add new overloads to
KafkaStreams#queryMetadataForKeythat accept `Headers`. This allows users to perform metadata lookups for header-dependent partitioning strategies Headers Mutability and Nullability: While headers are mutable by nature,
StreamPartitioneris not expected to mutate them. The documentation will explicitly mention that mutating headers can lead to unexpected results. Additionally, when usingStreamPartitionerorInteractive Queries (IQ), headers can be empty, but they cannot be null. This behavior will also be explicitly documented- No changes are planed for
KafkaStreamsNamedTopologyWrapper. This class and entire feature are deprecated, so no changes are planed there - Internal Callers and Propagation:
RecordCollectorImpl#sendto pass record headers when calling the partitioner.StreamsMetadataStateto support the new header-aware lookups.
- Built-in Implementations:
WindowedStreamPartitioner: Override the new method to propagate headers to the underlyingWindowedSerializer#serializeBaseKey.DefaultStreamPartitioner: Override the new method to propagate headers to thekeySerializer#serialize
Compatibility, Deprecation, and Migration Plan
- Backward Compatibility: change is fully backward compatible. Existing custom
StreamPartitionerimplementations will continue to work as they will use the default implementation of the new method. - Deprecation: Deprecate non-headers public methods to be able to remove them in Kafka
5.0 - Migration: Users wishing to leverage headers in their custom partitioning logic should migrate their
StreamPartitionerimplementations to override the new `partitions` method. - Functional interface migration:
StreamPartitioneris a functional interface that currently exposes headerless methods, which are going to be deprecated. Users will be notified about using the deprecated API, and they will need to implement the interface directly instead of using lambda syntax. However, they can return to using the functional style partitioner once the deprecated methods are completely removed, which is planned for Kafka5.0
Test Plan
- Unit Tests and Integration tests:
- Verify
Partitioners correctly propagate headers. - Verify propagated headers are used to determine target partition
- Verify
Rejected Alternatives
- Updated the existing method without default: Rejected as it would break all existing user implementations.