Current state: Under Discussion. This KIP currently has mostly architectural details. Finer details will be added once we agree on the core design.
Discussion thread:
JIRA:
Currently if the Kafka cluster loses a broker, there is no mechanism to transfer replicas from the failed node to others within the cluster other than manually triggering ReassignPartitionCommand. This is especially a problem when operating clusters with replication factor = 2. Loss of a single broker means that there is effectively no more redundancy which is not desirable. Automatic rebalancing upon failure is important to preserve redundancy within the cluster. This KIP effectively automates commands that operators typically perform upon failure.
This problem is broken down into 2 parts. Discovery of failed brokers and mitigation. The design for each is documented separately.
How do we discover that a broker has failed? It is particularly important to avoid false alarms i.e. if a broker is being bounced it should not count as a failure and prematurely trigger a rebalance.
We feel that the following approach is better than a periodic scan (documented in the rejected alternatives section).
The KafkaController receives an onBrokerFailure callback everytime a broker dies. This is an explicit signal that we should leverage because this is also used to effect leadership changes. The proposal is to introduce a new path in ZooKeeper that is managed by only the controller. onBrokerFailure callback can generate a znode in /admin/failed_brokers. This can have a timestamp corresponding to when this callback was first received by a controller.
/admin/failed_brokers/<broker_id> { "failure_timestamp" : xxx } |
The current controller can schedule a task to which executes at "failure_timestamp + failure_interval". At this time if the znode still exists, that node is considered dead and we can go to the mitigation phase described below. If the failed broker re-enters the cluster before the failure_internal (30 mins, 1hr?), the corresponding znode is deleted and the scheduled rebalance task is cancelled.
The list of dead nodes needs to be persisted to survive controller handoffs and any newly elected controller can simply pick up the persisted list in ZK and reuse the initial failure time.
Once the failure detection piece is solved we need to decide how best to solve the problem of moving replicas to hosts. If a broker has been deemed to be failed, the controller should generate a list of topic-partitions that it owned. In large brokers such as the ones we run within LinkedIn, each failed broker likely has several hundred partitions. We will saturate the network if we bootstrap all these at the same time. So some throttling is needed.
There are 2 approaches here:
We recommend the second approach.
Reassign partitions is an already available feature used in Kafka to move partition replicas between different hosts. Unless there is a strong reason not to, we should leverage this approach rather than reinvent.
This command creates znodes under /admin/reassign_partitions for the topic-partitions that need to be moved. The controller parses each of these partitions and makes sure that the partition replica changes are propogated within the cluster. More documentation is available here. The controller can use it's own internal APIs to trigger this.
Since the volume of partitions that can be moved is quite large we should consider making changes to reassign partitions to breakdown a large reassignment task into smaller chunks to avoid creating very large znodes. Once the assignments are written in ZK, the controller can parse each one sequentially like independent partition assignment tasks.
This has a nice side effect of improving the reassign partitions tool. Currently it breaks when it generates znodes larger than 1M for large brokers. This requires operators to manually breakdown the task into separate iterations that are long running.
Here's the how the ZK path can look:
/admin/reassign_partitions/seq_1 { topic-partitions to move, isAutoGenerated : true/false } /admin/reassign_partitions/seq_2 { topic-partitions to move, isAutoGenerated : true/false } /admin/reassign_partitions/seq_1 { topic-partitions to move, isAutoGenerated : true/false } |
The controller parses each of the chunks sequentially and deletes them when the chunk is processed.
Let's walk through a sample sequence of events.
Completeness definition for a task
After the task is started, we need to wait on it to complete. Complete means that the replicas have caught up in ISR (once).
Broker restart after some assignments have been processed
To address this case, we should check if the partitions in a task all have the desired number of replicas before starting it. i.e. if Broker X is back online and catching up, there is no point in executing any subsequent jobs that have been queued up. However if the task is already running it is simpler to let it complete. Since tasks are not very large, they should not take very long to complete. The controller should only cancel tasks with "isAutoGenerated":true.
Deletion of failed_nodes znode
How do we GC the failed_nodes entries. They are deleted in 2 cases:
If a controller fails, the newly elected controller should rebuild it's internal state based on the nodes in /failed_brokers and the auto scheduled reassignment tasks.
There is a case where the controller fails after generating the reassignment tasks but before deleting the corresponding entry in /failed_brokers. There are a couple of ways to solve this:
Option 2 is simpler.
Should we throttle replica bootstrap traffic? This is tangentially related but we can actually throttle inter broker replication traffic to say "50%" of available network. Regardless of the number of failed replicas we have, we can impose a hard limitation on the total amount of network traffic caused by auto-rebalancing. Given that we plan to move only a set of partitions at one time this shouldn't really be a problem. Rather a nice to have.
This doesn't really solve the problem of load balancing existing assignments within a cluster. It should be possible to ensure even partition distribution in the cluster whenever new nodes are added i.e. if I expand a cluster it would be nice to not move partitions onto them manually. This KIP does not attempt to solve that problem rather it makes it possible to tackle auto-expansion in the future since we could reuse the same pattern.
The following configs can be added.
In addition, we can also add a quota config for replica fetchers if we choose to tackle that in this KIP.
This should not impact existing users at all. Old behavior can be preserved by simply setting the failure interval to -1.
Behavior of reassign partitions:
In this approach, the controller can identify dead replicas by iterating through all topics periodically and identify all the failed replicas based on the information under /topics/<topic_name>. If any replica is not online, write that replica to /admin/failed_brokers. The rest is similar to the approach described above. This does not rely on receiving an onBrokerFailure callback. However, since a significant amount of controller behavior depends on getting these callbacks right, it is perhaps more apt to use that as a direct signal. There are a few other challenges to this approach:
After some discussion we decided against this proposal because of the added complexity. This is still worth discussing though.
Each of the proposals rely on a replica being down for a configurable period of time. Is this the right approach in all circumstances? In this section, we discuss some ideas to speed up the failure detection time and still not trigger rebalancing when not needed.
The following are perhaps the most common causes of hard broker failures:
What if we modify the above to report clean shutdowns instead of failures? In this case, each broker writes an entry to (say) /admin/rolling_bounce prior to shutting down cleanly. Most cases of clean shutdown are because of rolling bounces. Upon receiving an onBrokerFailure() notification, the controller can check to see if the corresponding rolling bounce znode exists. If not, the node did not exit cleanly and the controller can begin the rebalance immediately. If it does exist, the broker can wait for a period of time for the node to return. If the node does not come back in 30 minutes (configurable), the partitions can be reassigned and the corresponding entry can be removed from /admin/rolling_bounce.
The upside of this solution is that we detect real failures almost immediately while also handling clean shutdowns in which the node does not rejoin the cluster.
/admin/rolling_bounce/<broker_id> {"timestamp" : xxx} |