Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
For LeaderAndISRCommand,: it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.0 create the partitionif P doesn't exist locally, if not presentcall startReplica()
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 Stop the fetcher to the current leader
2.1.2 call becomeLeader()
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 stop the fetcher to the current leader
2.2.2 call becomeFollower()
3. If the command has a flag INIT, delete all local partitions not in set_p.


becomeLeader(r: Replica, command)
{
  stop r.partition.leaderAndISRZKVersion = command.leaderAndISRZKVersion   r.partition.the ReplicaFetcherThread to the old leader   //after this, no more messages from the old leader can be appended to r  r.leaderAndISRZKVersion = command.leaderAndISRZKVersion
  r.ISR = command.ISR   stop the HW checkpoint thread for r
  r.isLeader wait= untiltrue every replica in ISR catches up to r.leo   r.hw = r.leo
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
}

becomeFollower(r: Replica)
{
  // this is required if this replica was the last leader
  stop the commit thread, if any
}
Note that the new leader's HW could be behind the HW of the previous leader. Therefore, immediately after the leadership transition,
it is possible for a consumer (client) to ask for an offset larger than the new leader's HW. To handle this case, if the consumer is
asking for an offset between the leader's HW and LEO, the broker could just return an empty set. If the requested offset is larger
than LEO, the broker would still return OffsetOutofRangeException.


becomeFollower(r: Replica)
{

  stop the ReplicaFetcherThread to the old leader  r.isLeader = false                                //disables reads/writes to this partition on this broker
  stop the currentcommit ReplicaFetcherThreadthread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
}


startReplica(r: Replica)
{
  create the partition directory locally, if not present
  start startthe HW checkpoint thread for r
}




For StopReplicaCommand,: it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 call stopReplica() on p



stopReplica(r: Replica){  stop the ReplicaFetcherThread associated with r, if any.
  stop the HW checkpoint thread for r  delete pthe frompartition localdirectory storagelocally, if present.}
D. Handling controller failure.

...