...
Kafka is designed to allow messages with the same key from the same producer to be consumed in the same order as they are produced. This feature is useful for applications which maintain local states per key. However, as of current design of Kafka, this in-order delivery is not guaranteed if we expand partition of the topic. Furthermore, for topic whose traffic fluctuate significantly over time, it will be useful to be able to expand partitions when the expected byte-in-rate is high and delete partitions when the expected byte-in-rate is low. This KIP proposes a design to allow partition expansion and deletion while still ensuring in-order message delivery for keyed messages.
Goals
This KIP allows arbitrary sequence of partition expansion and deletion of an existing topic while still ensuring in-order message delivery for keyed messages, except that user can not expand partitions of an existing topic when there is still partition marked for deletion for that topic. After the size of this partition reaches zero, due to either retention or AdminClient.deleteRecords(...), then this partition will be removed from the topic and user can expand partitions of this topic.
In the future we can expand upon the work of this KIP to support partition expansion even when there is still partition marked for deletion for that topic.
Public Interfaces
Zookeeper
1) Update the znodes /brokers/topics/[topic] to use the following json format
Code Block |
---|
{ "version" : int32, "partitions" : { partition -> replicaList ... }, "initial_partition_count" : int32 <-- NEW. This is the number of partitions when the topic is created. "undeleted_partition_count" : int32 <-- NEW. This is the partition_count used by producer for choosing partitions. } |
2) Update the znodes /brokers/topics/[topic]/partitions/[partition]/state to use the following json format
Code Block |
---|
{ "version" : int32, "partition_epoch" : int32 "leaderEpochAfterCreation" : { <-- NEW. This represents a map from partition to leaderEpoch for lower partitions. int32 -> int32 ... }, "leaderEpochBeforeDeletion" : { <-- NEW. This represents a map from partition to leaderEpoch for lower partitions. int32 -> int32 ... } } |
Protocol
1) Update LeaderAndIsrRequest to re-use topic field for all its partitions and add field undeleted_partition_count for each topic.
Code Block |
---|
LeaderAndIsrRequest => controller_id controller_epoch topic_states live_leaders controller_id => int32 controller_epoch => int32 topic_states => [LeaderAndIsrRequestTopicState] <-- NEW. This field includes LeaderAndIsrRequestPartitionState live_leaders => [LeaderAndIsrRequestLiveLeader] LeaderAndIsrRequestTopicState => topic partition_states topic => str <-- This is moved from LeaderAndIsrRequestPartitionState. undeleted_partition_count => int32 <-- NEW. This is the total number of partitons of this topic. partition_states => [LeaderAndIsrRequestPartitionState] LeaderAndIsrRequestPartitionState => partition leader leader_epoch isr zk_version replicas partition => int32 controller_epoch => int32 leader => int32 leader_epoch => int32 isr => [int32] zk_version => int32 replicas => [int32] is_new_replica => boolean |
2) Update ProduceRequest to include undeleted_partition_count per topic.
Code Block |
---|
ProduceRequest => transactional_id acks timeout topic_data transaction_id => nullable_str acks => int16 timeout => int32 topic_data => [TopicProduceData] TopicData => topic data topic => str undeleted_partition_count => int32 <-- NEW. This is the number of undeleted partitons of this topic expected by the producer. data => PartitionData PartitionData => partition record_set partition => int32 record_set => Records |
3) Add ConsumerGroupPositionRequest that allows a consumer to report its own position of partitions and query position of partitions of other consumers in its consumer group.
Code Block |
---|
ConsumerGroupPositionRequest => group_id generation_id member_id topics group_id => str generation_id => int32 member_id => str topics => [ConsumerGroupPositionRequestTopic] ConsumerGroupPositionRequestTopic => topic partitions topic => str partitions => [ConsumerGroupPositionRequestPartition] ConsumerGroupPositionRequestPartition => partition need_position position partition => int32 need_position => boolean // If true, ConsumerGroupPositionResponse should include the position of this partition of the group. position => int64 // Position of this partition of this consumer. |
4) Update ConsumerGroupPositionResponse to include fields need_position and position for relevant partitions.
Code Block |
---|
ConsumerGroupPositionResponse => throttle_time_ms topics error_code throttle_time_ms => int32 topics => [ConsumerGroupPositionResponseTopic] error_code => int16 ConsumerGroupPositionResponseTopic => topic partitions topic => str partitions => [ConsumerGroupPositionResponsePartition] ConsumerGroupPositionResponsePartition => partition need_position position partition => int32 need_position => boolean // If true, ConsumerGroupPositionRequest should include the position of this partition of this consumer position => int64 // Position of this partition of the group. |
5) Add PartitionLeaderEpochsForPartitionsRequest and PartitionLeaderEpochsForPartitionsResponse. PartitionLeaderEpochsForPartitionsResponse essentially encodes the leaderEpochAfterCreation and the leaderEpochBeforeDeletion map for those partitions specified in the PartitionLeaderEpochsForPartitionsRequest.
Code Block |
---|
PartitionLeaderEpochsForPartitionsRequest => topics topics => [PartitionLeaderEpochsForPartitionsRequestTopic] PartitionLeaderEpochsForPartitionsRequestTopic => topic partitions topic => str partitions => [int32] |
Code Block |
---|
PartitionLeaderEpochsForPartitionsResponse => throttle_time_ms topics throttle_time_ms => int32 topics => [PartitionLeaderEpochsForPartitionsResponseTopic] PartitionLeaderEpochsForPartitionsResponseTopic => topic partitions topic => str partitions => [PartitionLeaderEpochsForPartitionsResponsePartition] PartitionLeaderEpochsForPartitionsResponsePartition => partition leaderEpoch partition => int32 leader_epoch_after_creation => int32 // -1 if the given partition is not in leaderEpochAfterCreation of the partition znode. leader_epoch_before_deletion => int32 // -1 if the given partition is not in leaderEpochBeforeDeletion of the partition znode. |
6) Update UpdateMetadataRequest to re-use topic field for all its partitions and add fields undeleted_partition_count and initial_partition_count for each topic.
Code Block |
---|
UpdateMetadataRequest => controller_id controller_epoch max_partition_epoch partition_states live_brokers controller_id => int32 controller_epoch => int32 max_partition_epoch => int32 topic_states => [UpdateMetadataRequestTopicState] live_brokers => [UpdateMetadataRequestBroker] UpdateMetadataRequestTopicState => topic partition_states topic => str initial_partition_count => int32 <-- NEW. This represents a map from partition to leaderEpoch for lower partitions. undeleted_partition_count => int32 <-- NEW. This is the number of undeleted partitons of this topic expected by the producer. partition_states => [UpdateMetadataRequestTopicState] UpdateMetadataRequestPartitionState => partition controller_epoch leader leader_epoch partition_epoch isr zk_version replicas offline_replicas partition => int32 controller_epoch => int32 leader => int32 leader_epoch => int32 partition_epoch => int32 isr => [int32] zk_version => int32 replicas => [int32] offline_replicas => [int32] |
7) Add fields undeleted_partition_count and initial_partition_count for each topic in MetadataResponse.
Code Block |
---|
MetadataResponse => throttle_time_ms max_partition_epoch brokers cluster_id controller_id topic_metadata throttle_time_ms => int32 max_partition_epoch => int32 brokers => [MetadatBroker] cluster_id => nullable_str controller_id => int32 topic_metadata => [TopicMetadata] TopicMetadata => topic_error_code topic is_internal partition_metadata topic_error_code => int16 topic => str initial_partition_count => int32 <-- NEW. This represents a map from partition to leaderEpoch for lower partitions. undeleted_partition_count => int32 <-- NEW. This is the number of undeleted partitons of this topic expected by the producer. is_internal => boolean partition_metadata => [PartitionMetadata] PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch partition_epoch isr offline_replicas partition_error_code => int16 partition_id => int32 leader => int32 replicas => [int32] leader_epoch => int32 partition_epoch => int32 isr => [int32] offline_replicas => [int32] |
8) Add field undeleted_partition_count for each topic in the FetchRequest
Code Block |
---|
FetchRequest => replica_id max_wait_time min_bytes max_bytes isolation_level session_id epoch topics forgetten_topics_data replica_id => int32 max_wait_time => int32 min_bytes => int32 max_bytes => int32 isolation_level => int8 session_id => int32 epoch => int32 topics => [FetchRequestTopic] forgetten_topics_data => [ForgottenTopicData]FetchRequestTopic => topic undeleted_partition_count partitions topic => str undeleted_partition_count => int32 <-- NEW. This is the number of undeleted partitons of this topic expected by the producer. partitions => [FetchRequestPartition] |
Consumer API
1) Add the following method to the interface org.apache.kafka.clients.consumer.Consumer
Code Block |
---|
public void subscribe(Collection<String> topics, ConsumerRebalanceListener consumerRebalanceListener, PartitionKeyRebalanceListener partitionKeyRebalanceListener); public interface PartitionKeyRebalanceListener { /* * This callback allows user to flush state related to the keys previously received in the given set of partitions before another consumer loads state for such keys and starts to consume messages with these keys. This can happen after partition creation or deletion for an existing topic. */ void onPartitionKeyMaybeRevoked(Collection<TopicPartition>); /* * This callback allows user to load state for new keys that may be received in the given set of partitions. This can happen after partition creation or deletion for an existing topic. */ void onPartitionKeyAssigned(Collection<TopicPartition>); } |
Topic config
...