Child pages
  • Kafka replication detailed design V2
Skip to end of metadata
Go to start of metadata

Differences between V1 and V2

This detailed design differs from the original detailed design in the following areas -

  1. This design aims to remove split-brain and herd effect issues in the V1 design. A partition has only one brain (on the leader) and all brokers only respond to state changes that are meant for them (as decided by the leader).
  2. The state machine in this design is completely controlled only by the leader for each partition. Each follower changes its state only based on such a request from the leader for a particular partition. Leader co-ordinated state machine allows central state machine verification and allows it to fail fast.
  3. This design introduces an epoch or generation id per partition, which is a non-decreasing value for a partition. The epoch increments when the leader for a partition changes.
  4. This design handles delete partition or delete topic state changes for dead brokers by queuing up state change requests for a broker in Zookeeper.
  5. This design scales better wrt to number of ZK watches, since it registers fewer watches compared to V1. The motivation is to be able to reduce the load on ZK when the Kafka cluster grows to thousands of partitions. For example, if we have a cluster of 3 brokers hosting 1000 topics with 3 partitions each, the V1 design requires registering 15000 watches. The V2 design requires registering 3000 watches.
  6. This design ensures that leader change ZK notifications are not queued up on any other notifications and can happen instantaneously.
  7. This design allows explicit monitoring of
    1. the entire lifecycle of a state change -
      1. leader, broker id 0, requested start-replica for topic foo partition 0, to broker id 1, at epoch 10
      2. leader, broker id 0, requested start-replica for topic foo partition 0, to broker id 2, at epoch 10
      3. follower, broker id 1, received start-replica for topic foo partition 0, from leader 0, at epoch 10
      4. follower, broker id 2, received start-replica for topic foo partition 0, from leader 0, at epoch 10
      5. follower, broker id 1, completed start-replica for topic foo partition 0, request from leader 0, at epoch 10
      6. follower, broker id 2, completed start-replica for topic foo partition 0, request from leader 0, at epoch 10
    2. the backup of state change requests, on slow followers

Paths stored in Zookeeper

Notation: When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral.

We store the following paths in Zookeeper:

  1. Stores the information of all live brokers.
  2. Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
  3. Stores the id of the replica that’s the current leader of this partition
  4. Stores the id of the set of replicas that are in-sync with the leader
  5. This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
  6. This path is used by the leader of a partition to enqueue state change requests to the follower replicas. The various state change requests include start replica, close replica. This path is created by the add brokers admin command. This path is only deleted by the remove brokers admin command. The purpose of making this path persistent is to cleanly handle state changes like delete topic and reassign partitions even when a broker is temporarily unavailable (for example, being bounced).

Key data structures

Every broker stores a list of partitions and replicas assigned to it. The current leader of a partition further maintains 3 sets: AR, ISR, CUR and RAR, which correspond to the set of replicas that are assigned to the partition, in-sync with the leader, catching up with the leader, and being reassigned to other brokers. Normally, ISR is a subset of AR and AR = ISR + CUR. The leader of a partition maintains a commitQ and uses it to buffer all produce requests to be committed. For each replica assigned to a broker, the broker periodically stores its HW in a checkpoint file.

Key algorithms

Zookeeper listeners ONLY on the leader

  1. Partition-reassigned listener:
    1. child change on /brokers/partitions_reassigned
    2. child change on /brokers/partitions_reassigned/[topic]

Zookeeper listeners on all brokers

    1. Leader-change listener: value change on /brokers/topics/[topic]/[partition_id]/leader
    2. State-change listener: child change on /brokers/state/[broker_id]

Configuration parameters

    1. LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
    2. KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.

Broker startup

Each time a broker starts up, it calls brokerStartup() and the algorithms are described below

Leader election

State change events

On every broker

Leader change

This leader change listener is registered on every broker hosting a partition p. Each time it is triggered, the following procedure is executed -

On State change

Each broker has a ZK path that it listens to for state change requests from the leader

On the leader

On reassignment of partitions

Each time a partition reassigned event is triggered on the leader, it calls onPartitionReassigned()

State change communication

The leader uses this API to communicate a state change request to the followers

State change operations

Start replica

This state change is requested by the leader or the admin command for a new replica assignment

Close replica

This state change is requested by the leader when a topic or partition is deleted or moved to another broker

Become follower

This state change is requested by the leader when the leader for a replica changes

Become leader

This state change is done by the new leader

Admin commands

This section describes the algorithms for various admin commands like create/delete topic, add/remove partition.

Create topic

The admin commands does the following while creating a new topic

Delete topic

Add partition to existing topic

Remove partition for existing topic

Handling produce requests

Produce request handler on the leader

Message replication

Commit thread on the leader

Follower fetching from leader

A follower keeps sending ReplicaFetcherRequests to the leader. The process at the leader and the follower are described below -

At the leader

At the follower

ReplicaFetcherThread for Replica r:

  • No labels