This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-435: Incremental Partition Reassignment
Skip to end of metadata
Go to start of metadata

Status

Current state: Under Discussion

Discussion threadhere

JIRA KAFKA-6794 - Getting issue details... STATUS

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

If partition reassignment involves a lot of replicas, then it could put too much overhead on the brokers. 

Say you have a replication factor of 4 and you trigger a reassignment which moves all replicas to new brokers. Now 8 replicas are fetching at the same time which means you need to account for 8 times the current producer load plus the catch-up replication. To make matters worse, the replicas won't all become in-sync at the same time; in the worst case, you could have 7 replicas in-sync while one is still catching up. Currently, the old replicas won't be disabled until all new replicas are in-sync. This makes configuring the throttle tricky since ISR traffic is not subject to it.

Rather than trying to bring all 4 new replicas online at the same time, a friendlier approach would be to do it incrementally: bring one replica online, bring it in-sync, then remove one of the old replicas. Repeat until all replicas have been changed. This would reduce the impact of a reassignment and make configuring the throttle easier at the cost of a slower overall reassignment.

Public Interfaces

Two configs will be added.

Config nameTypeDefaultValid valuesImportanceDynamic update mode
reassignment.parallel.partition.countint2147483647[1,...]mediumcluster-wide
reassignment.parallel.replica.countint2147483647[1,...]mediumcluster-wide

Proposed Changes

As explained above, the goal would be to incrementally add new partitions, a batch at a time to avoid putting much pressure on the brokers. The only exception is the first step where (if needed) we add that many replicas that is enough to fulfil the min.insync.replicas requirement set on the broker. For instance if there are 4 brokers, min.insync.replicas set to 3 but there are only 1 in-sync replica, then we immediately add 2 other in one step, so the producers are able to continue.

Furthermore in the first step we'll elect the new preferred leader (if the reassignment requires it) to unload pressure from the current leader.

The two configs are aiming to control batching on partition and topic levels. We practically default to the current behaviour to remain backward compatible, although as a future work it might make sense to lower the defaults based on feedback. 

Batches will be created based on the input reassignment. That means that the order of the input could be used to have some control over how batches are created. For instance in case of a reassignment for a single partition from (0, 1, 2, 3, 4) to (5, 6, 7, 8, 9) we would form the batches (0, 1) → (5, 6); (2, 3) → (7, 8) and 4 → 9 and would execute the reassignment in this order. For multiple partitions it would work in a similar fashion but the reassignment.parallel.replica.count would control how many replicas of that partition can be reassigned concurrently.

As an addition these values could be changed dynamically to somewhat "throttle" the reassignment. It might be better to throttle certain reassignment on a much more advanced way but it could also exceed the scope of this KIP.

Algorithm

Calculating a Reassignment Step

  1. For calculating a reassignment step, always the final target replica (FTR) set and the current replica (CR) set is used.
  2. Calculate the replicas to be dropped (DR):
    1. Calculate n = max(reassignment.parallel.replica.count, size(FTR) - size(CR))
    2. Filter those replicas from CR which are not in FTR, this is the excess replica (ER) set
    3. Take the first reassignment.parallel.replica.count replicas of ER, that will be the set of dropped replicas
  3. Calculate the new replicas (NR) to be added
    1. Calculate that if the partition has enough online replicas to fulfil the min.insync.replicas config so the producers are able to continue.
    2. If the preferred leader is different in FTR and it is not yet reassigned, then add it to the NR
    3. If size(NR) < min.insync.replicas then take min(min.insync.replicas, reassignment.parallel.replica.count) - size(NR) replicas from FTR
    4. Otherwise take as many replica as reassignment.parallel.replica.count allows
  4. Create the target replica (TR) set: CR + NR - DR
  5. If this is the last step, then order the replicas as specified by FTR. This means that the last step is always equals to FTR

Performing a Reassignment Step

Performing a reassignment step is somewhat similar in big picture to the currently existing algorithm.

  1. Calculate the next reassignment step.
  2. Wait until this step is not identical to the current assignment and there is at least one replica ISR.
    1. The calculation would result in identical steps if it's not able to add new replicas to the TR set. This could be because target brokers are offline.
    2. We are not able to reassign partitions if at least one replica is not in-sync.
  3. Set all new replicas to OnlineReplica when they are in ISR (if this is not the first increment we're in).
  4. Update CR in Zookeeper (/kafka/brokers/topics/{topic}) with TR for the given partition.
  5. Send LeaderAndIsr requests to all replicas in CR+NR.
  6. Start new replicas in NR by moving them into the NewReplica state.
  7. Set CR to TR in memory.
  8. Send LeaderAndIsr request with a potential new leader (if current leader not in TR) and a new CR (using TR) and same isr to every broker in TR

  9. Replicas in DR -> Offline (force those replicas out of isr)

  10. Replicas in DR -> NonExistentReplica (force those replicas to be deleted)

  11. Update the /admin/reassign_partitions path in ZK to remove this partition if the reassignment is completed.

Example

The following code block shows how a transition happens from (0, 1, 2, 3, 4) into (5, 6, 7, 8, 9) where the initial leader is 0.

Reassignment Example
 (0, 1, 2, 3, 4)     // starting assignment
        |
(5, 0, 1, 2, 3, 4)   // +5, new leader (5) is elected
        |
 (5, 6, 2, 3, 4)   // -[0,1] +[6]
        |
 (5, 6, 7, 8, 4)   // -[2,3] +[7,8]
        |
 (5, 6, 7, 8, 9)   // -[4] +[9], requested order is matched, reassignment finished

Compatibility, Deprecation, and Migration Plan

Since these changes won't affect any public interfaces, neither Zookeeper, there will be no compatibility issues.

Test Plan

  • The existing unit tests will be parameterized so they would run with both modes
  • Extra unit tests will be added to cover those cases that are not covered with the existing tests
  • Ducktape tests would be parameterized to run with both modes
  • If needed, extra ducktapes could be added to cover cases that are needed

Future Work

Replication Throttling

It would be useful to give an upper cap on the bandwith of the replication so users won't overwhelm their cluster. This throttle could be controlled overall for all partition and perhaps it would make sense to do it on a per partition basis and only specify a max capacity at the overall level.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels