...
Paths stored in Zookeeper
Notation: When an element in a path is denoted \ [xyz\], that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/\[topic\] would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral. Wiki Markup
We store the following paths in Zookeeper:
...
- Partition-reassigned listener:
- child change on /brokers/partitions_reassignedunmigrated-wiki-markup
- child change on /brokers/partitions_reassigned/\[topic\]
Zookeeper listeners on all brokers
...
- Leader-change listener: value change on /brokers/topics/\[topic\]/\[partition_id\]/leaderunmigrated-wiki-markup
- State-change listener: child change on /brokers/state/\[broker_id\]
Configuration parameters
- LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
- KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.
...
Code Block |
---|
while(true) { pr = commitQ.dequeue canCommit = false while(!canCommit) { canCommit = true for each r in ISR if(!offsetReached(r, pr.offset)) { canCommit = false break } if(!canCommit) { p.CUR.add(r) p.ISR.delete(r) write p.ISR to ZK } } for each c in CUR if(c.leo >= pr.offset) { p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK } checkReassignedReplicas(pr, p.RAR, p.ISR) checkLoadBalancing() r.hw = pr.offset // increment the HW to indicate that pr is committed send ACK to the client that pr is committed } offsetReached(r: Replica, offset: Long) { if(r.leo becomes equal or larger than offset within KeepInSyncTime) return true return false } checkLoadBalancing() { // see if we need to switch the leader to the preferred replica if(leader replica is not the preferred one & the preferred replica is in ISR) { delete /brokers/topics/[topic]/[partition_id]/leader in ZK stop this commit thread stop the HW checkpoint thread } } checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica]) { // see if all reassigned replicas have fully caught up and older replicas have stopped fetching, if so, switch to those replicas // optimization, do the check periodically If (every replica in RAR has its leo >= pr.offset) { if(!sentCloseReplica.get) { oldReplicas = AR - RAR for(oldReplica <- oldReplicas) { if(r.broker_id != broker_id) sendStateChange(“close-replica”, oldReplica.broker_id, epoch) } sentCloseReplica.set(true) }else { // close replica is already sent. Wait until the replicas are closed or probably timeout and raise error if(broker_id is in (AR - RAR) && (other replicas in (AR - RAR) are not in ISR anymore)) { // leader is not in the reassigned replicas list completePartitionReassignment(RAR, ISR, AR, true) sentCloseReplica.set(false) } else if(every replica in (AR-RAR) is not in ISR anymore) { completePartitionReassignment(RAR, ISR, AR, false) sentCloseReplica.set(false) } } } completePartitionsReassignment(RAR: Set[Replica], ISR: Set[Replica], AR: Set[Replica], stopCommitThread: Boolean) { //newly assigned replicas are in-sync, switch over to the new replicas //need (RAR + ISR) in case we fail right after here write (RAR + ISR) as the new ISR in ZK update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR //triggers leader election if(stopCommitThread || (broker_id is not preferred replica)) { if(this broker_id is not in the new AR) sendStateChange(“close-replica”, broker_id, epoch) delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK if(stopCommitThread || (broker_id is not preferred replica)) { //triggers leader election delete /brokers/topics/[topic]/[partition_id]/leader in ZK stop this commit thread } } |
...