You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 47 Next »

Motivation

The motivation of moving to a new set of consumer client APIs with broker side co-ordination is laid out here.

Consumer API

The proposed consumer APIs are here. Several API usage examples are documented here.

Group management protocol

Rebalancing is the process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group has subscribed to. At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance. The way rebalancing works is as follows. One of the brokers is elected as the coordinator for a subset of the consumer groups. It will be responsible for triggering rebalancing attempts for certain consumer groups on consumer group membership changes or subscribed topic partition changes. It will also be responsible for communicating the resulting partition-consumer ownership configuration to all consumers of the group undergoing a rebalance operation.

Consumer

  1. On startup or on co-ordinator failover, the consumer sends a ConsumerMetadataRequest to any of the brokers in the "bootstrap.brokers" list. In the ConsumerMetadataResponse, it receives the location of the co-ordinator for it's group and the co-ordinator's generation id.
  2. If the returned generation id of the co-ordinator is greater than the last known generation id, it indicates that the co-ordinator has initiated a rebalance. The consumer then stops fetching data, commits offsets and sends a JoinGroupRequest to it's co-ordinator broker. In the JoinGroupResponse, it receives the list of topic partitions that it should own.
  3. At this time, group management is done and the consumer starts fetching data and (optionally) committing offsets for the list of partitions it owns.

Co-ordinator

  1. In steady state, the co-ordinator tracks the health of each consumer in every group through it's failure detection protocol.
  2. Upon election or startup, the co-ordinator reads the list of groups it manages and their membership information from zookeeper. If there is no previous group membership information, it does nothing until the first consumer in some group registers with it.
  3. Upon election, if the co-ordinator finds previous group membership information in zookeeper, it waits for the consumers in each of the existing groups to send a HeartbeatRequest. Once all known and alive consumers in a group heartbeat the co-ordinator, it marks the rebalance completed. 
  4. Upon election or startup, the co-ordinator also starts failure detection for all consumers in a group. Consumers that are marked as dead by the co-ordinator's failure detection protocol are removed from the group, the group's generation id is incremented in zookeeper and the co-ordinator triggers a rebalance operation for the consumer's group.
  5. Rebalance is triggered by killing the socket connection to the consumers in the group. If the rebalance is caused due to a new consumer, the co-ordinator only breaks the socket connection to the rest of the consumers in the group. Consumers notice the broken socket connection and trigger a co-ordinator discovery process (#1, #2 in the Consumer section above). Once all alive consumers re-register with the co-ordinator, it communicates the new partition ownership to each of the consumers in the RegisterConsumerResponse, thereby completing the rebalance operation.
  6. The co-ordinator tracks the changes to topic partition changes for all topics that any consumer group has registered interest for. If it detects a new partition for any topic, it triggers a rebalance operation (as described in #5 above). It is currently not possible to reduce the number of partitions for a topic. The creation of new topics can also trigger a rebalance operation as consumers can register for topics before they are created.

Failure detection protocol

The consumer specifies a session timeout, a heartbeat frequency and failed heartbeat threshold in the RegisterConsumerRequest that it sends to the co-ordinator in order to join a consumer group. The consumer initiates periodic heartbeats (HeartbeatRequest) to the co-ordinator and waits for a response. The heartbeat frequency determines the number of times a consumer sends a heartbeat to the co-ordinator within a session timeout window. The failure threshold determines the number of failed or missed heartbeats allowed to the consumer before it marks the consumer dead. For example, for a session timeout of 1 second and a heartbeat frequency of 3, the consumer is expected to send a HeartbeatRequest to the co-ordinator every (1000/3=333.33 milliseconds). With a failed heartbeat threshold of 3, it means the co-ordinator will mark the consumer dead, if it hasn't received at least one HeartbeatRequest every (1000/3)*3 i.e 1 second.

When the consumer has successfully joins a group, the failure detection process starts on the consumer as well as the co-ordinator. Here is the protocol in more detail -

1) Consumer periodically sends a HeartbeatRequest to the coordinator every session_timeout / heartbeat_frequency milliseconds.

2) Upon receiving the HeartbeatRequest, coordinator checks the generation number, the consumer id and the consumer group. If the consumer specifies an invalid or stale generation id (i.e., consumer id not assigned before, or generation number is smaller than current value), it send an error code in the HeartbeatResponse.

3) If coordinator has not heard from a consumer at least once in failed_heartbeat_threshold*(session_timeout/heartbeat_frequency), mark the consumer as dead, break the socket connection to the consumer and trigger a rebalance process for the group.

4) If consumer has not received a heartbeat from the coordinator after session timeout, it treats the coordinator as failed and triggers the co-ordinator re discovery process.

5) If consumer finds the socket channel to the coordinator to be closed, it treats the coordinator as failed and triggers the co-ordinator re discovery process..

Note that on coordinator failover, the consumers may discover the new coordinator before or after the new coordinator has finished the failover process including loading the consumer group metadata from ZK, etc. In the latter case, the new coordinator will just accept its ping request as normal; in the former case, the new coordinator may reject its request, causing it to re-dicover coordinators and re-connect again, which is fine. Also, if the consumer connects to the new coordinator too late, the co-ordinator may have marked the consumer has dead and will be treat the consumer as a new consumer, which is also fine.

Request formats

For each consumer group, the coordinator stores the following information:

1) For each consumer group, the group metadata containing:

  • List of topics the group subscribes to
  • Group configs, including session timeout, etc.
  • Consumer metadata for each consumer in the group.
  • Current offsets for each consumed topic/partition.
  • Partition ownership metadata, including the consumer-assigned-partitions map, current group generation number, etc.

2) For each existing topic, a list of consumer groups that are currently subscribing to it.

ConsumerMetadataRequest

{
  Version                => int16
  CorrelationId          => int32
  ClientId               => String
  GroupId                => String
}

ConsumerMetadataResponse

{
  CorrelationId          => int32
  ErrorCode              => int16
  CoordinatorId          => Broker
  CoordinatorEpoch       => int32
  GroupGenerationId      => int32
}

JoinGroupRequest

 

{
  Version                => int16
  CorrelationId          => int32
  GroupId                => String
  ConsumerHost           => String
  SessionTimeout         => int32
  Topics                 => [String]
}

JoinGroupResponse

{
  Version                => int16
  CorrelationId          => int32
  ErrorCode              => int16
  GroupGenerationId      => int32
  GroupId                => String 
  ConsumerId             => String
  PartitionsToOwn        => [TopicAndPartition]
}

 

HeartbeatRequest

{
  Version                => int16
  CorrelationId          => int32
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
}

HeartbeatResponse

{
  CorrelationId          => int32
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
  ErrorCode              => int16
}
 

Wildcard Subscription

With wildcard subscription (for example, whitelist and blacklist), the consumers are responsible to discover matching topics through topic metadata request. That is, its topic metadata request will contain an empty topic list, whose response then will return the partition info of all topics, it will then filter the topics that match its wildcard expression, and then update the subscription list. Again, if the subscription list has changed from the previous values, it will start the consumer re-join group procedure.

Co-ordinator failover or connection loss to the co-ordinator

  1. On co-ordinator failover, the controller elects a new leader for a subset of the consumer groups affected due to the co-ordinator failure. As part of becoming the leader for a subset of the offset topic partitions, the co-ordinator reads metadata for each group that it is responsible for, from zookeeper. The metadata includes the group's consumer ids, the generation id and the subscribed list of topics. Until the co-ordinator has read all metadata from zookeeper, it returns the CoordinatorNotReady error code in HeartbeatResponse, ConsumerMetadataResponse and JoinGroupResponse.
  2. As part of electing a new consumer co-ordinator, the controller also sends the UpdateConsumerMetadataRequest to all brokers. If a consumer sends a ConsumerMetadataRequest to a broker before the broker has received the updated group metadata through the UpdateConsumerMetadataRequest from the controlller, the ConsumerMetadataResponse will return stale information about the co-ordinator. The consumer will receive NotCoordinatorForGroup error code on the heartbeat/commit offset responses. On receiving the NotCoordinatorForGroup error code, the consumer backs off and resends the ConsumerMetadataRequest.
  3. The consumer does not stop fetching data during the co-ordinator failover and re-discovery process.

Partition changes for subscribed topics

  1. The co-ordinator for a group detects changes to the number of partitions for the subscribed list of topics. The co-ordinator then increments the group's generation id in zookeeper.
  2. The controller watches the generation id changes for every consumer group and sends a UpdateConsumerMetadataRequest to all brokers with the new generation id for the affected group.
  3. The co-ordinator also closes the socket connection to all consumers in the group to trigger a rebalance operation.
  4. On losing connection to the co-ordinator, the consumers go through the co-ordinator re-discovery process. If the consumer receives a higher generation id in either the ConsumerMetadataResponse or the HeartbeatResponse, it stops fetching data, commits offsets and sends a JoinGroupRequest to the co-ordinator. 
  5. The co-ordinator waits for all consumers to send it the JoinGroupRequest for the group. Once it receives all expected JoinGroupRequests, it computes the new partition assignment and returns the updated assignment and the new generation id in the JoinGroupResponse
  6. On receiving the JoinGroupResponse, the consumer stores the new generation id and the consumer id locally and starts fetching data for the returned set of partitions.

Offset commits during rebalance




 

 

  • No labels