The primary motivation of this project is to remove the following problems in the current consumer rebalancing design -
- Herd Effect
The current consumer rebalancing design suffers from the herd effect where all participating consumers in a group listen on 1) all partitions available for the topics they are consuming, and 2) all consumers in a group. Hence, any change in the above, leads to Zookeeper listeners fired in all the consumers. Every consumer responds to each Zookeeper notification and starts a rebalancing operation. This causes unnecessary processing in each participating consumer process and churn in the message consumption process. In addition, since each consumer in a group triggers a rebalancing operation on every single Zookeeper notification. When there are, say, 100 topics and one broker goes down, that fires 100 Zookeeper notifications. Ideally, only one rebalancing operation is sufficient, instead of 100.
- Split Brain Problem
- Each consumer decides which partitions it should own, depending on its view of the Zookeeper state. When Zookeeper listener fires, every consumer reads the partition and consumer group information from Zookeeper and independently makes a localized decision about partition ownership. Although the decision algorithm is made deterministic, if the consumer group info or the partition info they read from the Zookeeper are different this would lead to conflicts in partition ownership. This leads to unnecessary failed rebalancing attempts.
- It could be possible that consumers "think" the rebalance is successful although it is not. Since each consumer would think the rebalance is successful if its attempt to write its own ownership of the partitions does not encounter conflicts, although some partitions could be not claimed by anybody (due to inconsistent views, or a consumer failure for example). When this happens, the consumer group is actually not pulling from all available partitions for the topics, which is a major problem. In addition, if one consumer has run out of the available number of rebalancing attempts, it simply throws a ConsumerRebalanceFailedException, which will not be captured by its peers in the group. And the consumer group might go ahead without consuming all partitions until the next rebalancing attempt is triggered. In general, there is no way for the consumer to know if a particular rebalancing attempt completed successfully or not. Ideally, there should be a reliable way to
- detect FATAL errors in participating consumer processes and be able to retire those consumers from the group.
- verify a consumer rebalancing operation by checking that each partition is being consumed by exactly one consumer process and be able to trigger another rebalancing operation is this verification step fails.
- Complex Code
One of the objectives of this project is also to cleanup the code in ZookeeperConsumerConnector and make it simpler. This also includes writing more true unit tests for the consumer code, possibly using EasyMock. A thin consumer client allows 1) easy re-implementation of the consumer logic in different languages, 2) easy rollout by keeping minimum dependencies on the consumer side (e.g. get rid of dependency on Zookeeper client)
- A centralized consumer coordinator will be elected from the available brokers. By using Zookeeper to keep track of the coordinator election we can guanrantee that if the co-ordinator dies, it will trigger another election that will appoint a new coordinator
- On startup (whenever it finds that it is elected as a coordinator):
- Register child change watch on /consumers/[group]/ids for each group.
- Register child change watch for /brokers/topics/[topic] for each topic.
- Register a listener for session expiration events.
- For each group, keep an atomic boolean specifying if the group is under rebalancing.
- On startup, each consumer:
- Registers itself in Zookeeper, under /consumers/[group]/ids/[consumer_id]
- Registers a listener for session expiration events
- Creates a queue in Zookeeper which will be used by the coordinator to issue rebalancing state change requests to the consumer. Then listens for requests to be queued by registering a child change watch under /consumers/[group]/queue/[consumer_id]
- When a new rebalancing state change (either stop_fetch or start_fetch) request arrives, it dequeues the request from its queue and acts on it:
- If it is a stop_fetch, stop fetchers, clear fetcher queue, commit offsets and release all ownership in the Zookeeper. Once the consumer enters this state, the auto commit thread will do nothing.
- If it is a start_fetch [with ownership info as (topic: partition)*], start the fetchers and when the fetchers are successfully started, it will register the partition ownership information in Zookeeper.
* Actually it might be better to the coordinator to write all the ownership info in Zookeeper himself before send out a simple start_fetch request, but then it cannot track if all the consumers have started their fetcher.
- As we can see, this will result in very light consumer client code.
Here is how a rebalancing operation would work -
- Whenever the watchers on the group/topic changes are fired, compute the affected consumer groups. For each group, check if it is already under rebalancing, if not, start a rebalance operation for the group and set the boolean; otherwise do nothing.
- For each rebalance operation of a specific group:
- First read the available partitions for all topics and the available consumers in the group.
- Compute the affected consumers within the group that needs rebalance:
- Given a consumer change (for example, one consumer is dead or a new consumer comes up), everyone else get affected.
- Given a topic partition change (for example, one broker is dead), probably only a subset of consumers in the group get affected.
- For each affected consumer, send a close_fetcher request to their notification queues.
- Waits until the consumers' ownership have been released in the Zookeeper or a timeout. If this times out, go back to step c.
- Compute the new assignment for all the affected consumers, and queue up a request to own the computed subset of partitions in each consumer's queue.
- Waits until each partition has been owned by the assigned consumer id or a timeout. If this times out, the coordinator will assume that one or more consumers have failed to start fetchers successfully. It will mark this rebalancing attempt as failed and go to step a as another rebalancing attempt.
- Reset the boolean for the consumer group.
- For m topics, each with n partitions, and k clients, there will be m * n + k watchers for the coordinator. So even if the possibility of a single change is rare, in total the watchers of the coordinator might "keep on fire". Therefore, computing the affected consumer group logic should be very light, otherwise the processing cost might be too high.
- One rebalancing operation might take long for synchronization (coordinator first wait for everyone to finish stop fetching, then waits again for everyone to finish start fetching, so two sync barriers for each operation). So if there are queued requests of rebalance for multiple consumer groups, rebalancing might be delayed for quite some time. What makes it worse is that if one consumer is failed during the rebalance, the coordinator have to run out of the number of retires to confirm a failed rebalance operation, further delay waiting rebalance requests.
This proposal suggests removing the Zookeeper client from the Kafka consumer. Note that in the previous proposal consumers use Zookeeper to do the following tasks:
- Notify that it has stopped fetchers (by releasing the ownership in Zookeeper, and coordinator have to wait for everyone to release)
- Notify that it has started fetchers (by writing its new ownership to Zookeeper, and coordinator have to wait for everyone to write)
In addition to rebalancing, the current consumers also use Zookeeper to do the following tasks:
- Announce its existence/failure to others (in the new design, to the coordinator) by writing itself to Zookeeper
- Automatically commit the consumed offset of its consuming partitions to Zookeeper
So, we need to solve the above problems with a TCP request/response protocol between the co-ordinator and the consumer
Coordinator Election & Failover
- As in Proposal I, a single coordinator will be selected from the broker servers. However, for consumers to get in touch with the coordinator without talking to Zookeeper, we need every broker to remember who is the elected coordinator. However, the coordinator still needs to register itself in the Zookeeper on startup.
- If the coordinator dies, its Zookeeper ephemeral node is deleted. This triggers a co-ordinator leader election process on the Kafka brokers.
- On startup, the coordinator registers itself in Zookeeper.
- It reads the current set of consumer groups, the list of consumers in each group, and then ping each consumer.
- For those consumers that does not response, it treats them as failed and removes them from the Zookeeper.
- For those consumers that does respond with their current consuming partitions, compare with the ownership map from Zookeeper.
- It sets the watchers on broker-partition in Zookeeper
- After every consumer has either responded with the consuming information or treated as failed, decide whether need to rebalance and compute the affected consumers.
- Heartbeat the consumers periodically, with responded consuming partition information along with offsets, register the received offsets in Zookeeper
- For the affected consumers that needs rebalancing:
- Send stop_fetcher command
- Wait until ACK from everyone
- Compute the new assignment
- Write the new ownership to Zookeeper
- Send the start_fetcher command
- Wait until ACK from everyone
- If someone fails to respond the ACK within timeout period, treated him as failed and remove and restart the rebalancing again.
- A rebalancing operation is necessary if -
- Some consumers are failed in a group (they will fail to response the ping), all other consumers are affected for rebalancing
- Some consumers join a group (they will try to connect to him), all other consumers are affected for rebalancing
- Some brokers has failed (watchers in the Zookeeper will be fired), a subset of its consumers are affected for rebalancing
- On startup, the consumer is informed with a list of the brokers. It can talks to one of the Kafka brokers in the cluster to find the coordinator.
- It connects to the coordinator and sends its consumer group and topics it would like to consume using a certain number of consumer streams.
- It waits for a response that contains the start-fetchers state change command with the list of brokers and topic partitions it should consume, and the offsets to begin consumption from.
- Whenever another broker connects to him as the new coordinator, drop the connection with the old one and start communicate with him.
- Whenever the consumer discovers a broken socket while writing the offsets or if it hasn’t been pinged from the coordinator until a timeout, it stops fetchers and asks other brokers who is the new coordinator.
Consumer offset management
- A bunch of customers have expressed interest in manual offset management. For example, search indexers prefer to get the offsets per partition from the consumer, so that they can checkpoint offsets as part of the indexer or in some database.
- Typically, such clients would want to manually commit offsets at a particular frequency. Since this design lets the co-ordinator pick that frequency, it might not be suitable for manual checkpointing of offsets. In this case, we might be better off exposing the offsets to the client and let the client take care of committing the offsets either in ZK or someplace else.
- If the auto offset commit option is turned off, then the co-ordinator doesn’t checkpoint the offsets in ZK.
- If we want to support manual offset management, we will have to provide API to let the consumer specify bootstrap offsets for each partition on startup. These offsets will need to be communicated to the co-ordinator on the connection request. If there is a discrepancy in the bootstrap offsets provided by consumers in the same group, then it raises an alert and sends an error code as the response to the consumer connection request.
- Coordinator failover during rebalancing:
- If it dies after step a and before step d, affected consumers would stop fetching until the new coordinator is elected. This latency would be long and the number of affected consumer might be large.
- If it dies after step d but before step e, the newly elected coordinator can make use of the written ownership info to shorten the rebalancing processing period, but still the affected consumer might be large.
- It is not certain if we allow consumers to pro-actively consider a coordinator as failed, or only waiting a new coordinator to connect to him. Adding the first option might result in fast failures, but may increase the stop fetcher latency.