Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
When Kafka is used as messaging infrastructure of consumer business, we often need to adjust the capacity of certain topics to accommodate the bursty nature of the data. The adjustment involves both adding more partitions hence the throughput, and desirably reducing partitions so that they can be reclaimed. Operating clusters with ever growing partitions adds overhead to the operation. First, the performance degrades when a single disk needs to support large number partitions; and second larger cluster footprints makes it more vulnerable to disruption at infrastructure level such as machine or rack decommission (which is not uncommon in large enterprise). This motivates us to add an easy and transparent way to reduce partitions for topics, which is particularly convenient in the following situation.
The cluster has a large number of topics and total number of partitions is close to the limit.
Most of the dynamic topics do not contain keyed messages. (See limitation section)
The data retention period is relatively short.
This proposal is based on what is described in KIP-500 group of changes, mainly the removal of zookeeper and migrating metadata to an internal Raft quorum.
We have implemented the described changes and deployed it in various setups internally in production environment.
Two configuration fields are added
- delete.topic.partition.enable is a boolean value indicating wether this functionality is enabled.
- delete.topic.partition.interval.delay.ms is the check interval during delayed deletion. The default value is 300000 (5 minutes).
We propose to modify PartitionRecord and add a DeleteTopicPartitionRecord in __cluster_metadata
We propose to add new field 'mode' for a partition that indicates if a partition is to be removed. specifically at any time a partition can be
ReadWrite (code=0)：it means the partition can be read from and written to.
ReadOnly (code=1)：it means the partition can only be read
None (code=-1)：it means the partition should be filtered and not written to, but consumption is not impacted.
AdminClient API changes
The AdminClient API will have new methods added
Protocol RPC changes
A new API DeletePartitions will be added with the following DeletePartitionsRequest and DeletePartitionsResponse
In UpdateMeta API we add mode field to return the partition read/write status of the current topic partitions.
Add 'mode' field in Metadata API that represents the read/write model of the partitions of current topic, meanwhile incrementing the version of ApiKey.
Client API changes
Add a new partition field 'mode' in org.apache.kafka.common.PartitionInfo
- Both KafkaProducer and KafkaConsumer are aware of state of the partition and filter accordingly.
- Partitions with ReadOnly and None status will be filtered out for writes.
- Partitions with None status will be filtered out for reads.
Kafka Topic Command changes
- Support using --partitions options to specify a smaller number than current partitions
- Add --delete-partitions-delay option (Long) to specify when the data should be deleted. The default value is 0 meaning the partition should be deleted right away.
- The command might issue warning under certain situations, such as
- "WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected"
- "WARNING: This feature is only enabled with Metadata version above 10 and delete.topic.partition.enable turned on"
- "WARNING: The topic is currently under changes"
The following change in Controller will be made:
- Added controller event TopicPartitionDeletion
- Add a class TopicPartitionDeletionManager to handle TopicPartitionDeletion event
- When KafkaController starts, a scheduleDelayDeletePartitionTask is scheduled periodically to check retention for delayed deletion.
The workflow involving TopicPartitionDeletionManager class is summarized as below:
- TopicCommand executes the DeletePartition RPC command to KafkaController and saves DeleteTopicPartitionsRecord in the KafkaController metadata.
- TopicPartitionDeleteManager starts to execute onPartitionDeletion method, updates the mode of Partition to ReadOnly. The partition remains in OnlinePartition state. All brokers are notified through PartitionStateMachine.
- ScheduleDelayDeletePartitionTask will update the Partition mode to None after specified delay period. The partition state changes to OfflinePartition and NonExistentPartition. The brokers are notified through PartitionStateMachine. and the partition replica status changes to OfflineReplica and ReplicaDeletionStarted, stops synchronizing data and clear data at all broker through ReplicaStateMachine.
- When Controller gets the successful stopReplica response from Broker, the Partition replica status is changed to ReplicaDeletionSuccessful, Then it cleans up metadata as well. otherwise, the Partition replica status changes to ReplicaDeletionIneligible, and waits for KafkaController to try again.
Compatibility, Deprecation, and Migration Plan
The proposed change is compatible with Kafka clients backwards and forwards with current constraints
- When older clients access new broker and cannot interpret 'mode', we suggest the administrator to manually set delete.topic.partition.enable to true. This will restrict the Metadata request to be above the specified version otherwise it will get LEADER_NOT_AVAILABLE.
- When newer clients access older broker (with new version org.apache.kafka.common.PartitionInfo class), the default value of the partition status field of the 'mode' field is ReadWrite, hence there will be no impact.
Upgrading the cluster from any older version is possible with the above situation handled.