Status

Current stateUnder discussion

Discussion thread: here 

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Summary

Today, kafka-reassign-partitions.sh --execute submits all partition reassignments from the JSON in one AdminClient#alterPartitionReassignments call (unless the user manually splits work). For large clusters this can create large bursts of replication traffic and controller work.

This KIP proposes tool-only pacing controls:

  1. --reassignment-batch-size — caps how many topic partitions are submitted per step (semantics differ slightly depending on mode; see below). 0 preserves legacy behavior (single request for the entire plan).
  2. --incremental — optional mode used with --reassignment-batch-size > 0 to keep at most N partition reassignments in flight for this execution, submitting the next partition from a deterministic queue when a slot frees up.
  3. --reassignment-poll-interval-ms — how long the tool sleeps between polls while waiting for batch completion (non-incremental, between batches) and while driving incremental submissions; default 1000 ms.

Ordering for both modes is (topic name, partition index), not the order of entries in the JSON file.

No broker protocol, controller, or metadata changes are required; pacing is implemented entirely in the reassignment tool using existing Admin APIs (alterPartitionReassignments, listPartitionReassignments, metadata reads).

Motivation

Problem

  1. Operational risk: Submitting hundreds or thousands of partition reassignments in one RPC can stress replication links, disk, and the controller in ways that are hard to predict during maintenance windows.
  2. Limited operator control: Throttling (--throttle) limits bandwidth but does not limit how many partitions are simultaneously moving; operators often want serialised waves or bounded concurrency without hand-splitting JSON files.
  3. Manual workarounds: Teams split reassignment JSON by hand or wrap the tool in scripts that call alterPartitionReassignments in chunks — error-prone and inconsistent across deployments.

Goals

  • Provide first-class, documented pacing in the supported reassignment tool.
  • Preserve full backward compatibility when defaults are unchanged (--reassignment-batch-size 0, no --incremental).
  • Keep behavior deterministic and explainable (stable partition ordering).

Non-goals

  • Replacing cluster-wide replication quotas or broker-side limits.
  • Changing how the controller executes reassignments (same server-side semantics).
  • Global coordination across multiple concurrent tool processes (each execute remains independent).

Public interfaces

CLI (kafka-reassign-partitions.sh)

Option

Applies to

Semantics

--reassignment-batch-size <int>

--execute only

  • Default 0: legacy — entire plan in one alterPartitionReassignments request; no extra waits in the tool.
  • > 0 without --incremental: split the plan into contiguous batches of at most N partitions (sorted by topic, then partition id). After each batch except the last, the tool blocks until every partition in that batch reports complete (current ISR matches target and reassignment not active) before submitting the next batch.
  • > 0 with --incremental: N is the maximum number of partition reassignments from this JSON that may be active at once; when one completes, the tool submits the next partition from the sorted queue.

--incremental

--execute only

Requires --reassignment-batch-size > 0. Mutually exclusive interpretation of batch size as in-flight cap (see above).

--reassignment-poll-interval-ms <long>

--execute only

Milliseconds to sleep between progress polls when the tool is waiting on reassignment state. Default 1000. 

Validation and compatibility

  • --reassignment-batch-size must be ≥ 0. Negative values are rejected.
  • --incremental without --reassignment-batch-size > 0 is rejected at argument validation time.
  • --reassignment-batch-size, --incremental and --reassignment-poll-interval-ms are not permitted with --list, --generate, --verify, or --cancel (same pattern as other execute-only options).
  • --reassignment-poll-interval-ms must be > 0. Negative values are rejected.

Programmatic API

ReassignPartitionsCommand.executeAssignment gains parameters:

  • int reassignmentBatchSize
  • boolean incremental
  • long reassignmentPollIntervalMs

Existing callers that omit pacing pass 0 and false to preserve legacy behaviour.

Detailed design

Partition counting

One row in the JSON (one TopicPartition) = one partition reassignment for pacing purposes, regardless of how many brokers join or leave the replica set in that single move.

Batch construction (non-incremental, reassignmentBatchSize > 0)

  1. Build the map of TopicPartition → target replicas from the JSON.
  2. Split into batches: sort keys with compareTopicPartitions, then chunk in groups of N partitions.
  3. For each batch except the last:
    alterPartitionReassignments(batch) → wait until all partitions in that batch are complete → next batch.
  4. For the last batch:
    alterPartitionReassignments only — the tool does not wait for completion before printing success (same family as legacy “started” messaging). Operators should run --verify for full completion.

While waiting for a step to finish, the tool repeatedly uses the Admin client to read current reassignment and replica state for the partitions in that step, compares it to the JSON target, sleeps for --reassignment-poll-interval-ms, and repeats until every partition in the step satisfies the completion condition (or errors).

Incremental mode (--incremental)

  1. Sort partitions deterministically (compareTopicPartitions).
  2. Maintain a pending deque and an in-flight map (submissions for this execute only).
  3. Loop until pending is empty: remove completed partitions from in-flight (using the same completion predicate as non-incremental wait paths), then submit new partitions up to the N in-flight cap.
  4. Poll interval between iterations when work remains: reassignment-poll-interval-ms (1000 ms in the default implementation).

Semantics: The tool returns after all partitions in this JSON have been successfully submitted to alterPartitionReassignments, not necessarily after all replication has finished (consistent with legacy execute for “completion” of submission).

Non-incremental wait between batches

  • Poll interval: reassignmentPollIntervalMs (1000 ms in the reference implementation).
  • Completion uses existing findPartitionReassignmentStates / PartitionReassignmentState logic; inconsistent terminal states produce TerseException (same class of errors as today’s verify path).

Interaction with --additional

--additional only bypasses the “existing reassignment on cluster” guard. It does not merge pacing across concurrent executes: --reassignment-batch-size applies per process invocation to that JSON. Multiple overlapping executes can each contribute up to N in-flight partition reassignments from their respective plans.

Interaction with throttles

Existing --throttle / --replica-alter-log-dirs-throttle behaviour is unchanged; pacing is orthogonal and can be combined.

Illustrative example (non-incremental vs incremental)

Assume the reassignment JSON affects ten partitions in tool order P1 through P10 (that is the deterministic sort: topic name, then partition id). Set --reassignment-batch-size to 3.

Non-incremental 

The plan runs in steps. Each step submits up to three partition reassignments in one request, then the tool waits until every partition in that step has finished before it starts the next step.

  • Step 1: start P1, P2, P3. Wait until all three are finished.
  • Step 2: start P4, P5, P6. Wait until all three are finished.
  • Step 3: start P7, P8, P9. Wait until all three are finished.
  • Step 4: start P10 only (this is the last step; the tool does not wait for completion after this submit in the current design—use --verify to confirm the cluster matches the plan).

If P3 is slow, P4 cannot start until P3 finishes, even if P1 and P2 are already done, because step 1 must fully complete before step 2 begins.

Incremental 

The tool keeps at most three partition reassignments from this plan active at a time. Whenever any one of them completes, the tool can submit the next partition from the queue so it continues to refill up to three active moves.

For example, after P1, P2, and P3 are submitted, if P2 finishes first, the tool can submit P4 while P1 and P3 are still running, still respecting the limit of three in flight. The slowest partition in the current trio does not by itself block starting unrelated later partitions beyond that concurrency cap.

How to choose

  • Use non-incremental when you want clear step boundaries: every partition in the current step finishes before any partition from the next step starts. Easier to reason about and to align with change windows that prefer discrete stages.
  • Use incremental when you want steadier utilization of replication capacity and less head-of-line blocking within the same cap on how many partition moves may run at once.

Compatibility

  • Default CLI: unchanged legacy path (reassignment-batch-size defaults to 0; incremental absent).
  • Brokers / ZK / KRaft: no change.
  • Wire protocol: unchanged (same Admin APIs).

Limitations and future work

  1. No timeout on batch-completion waits in the reference implementation; stuck reassignments can poll indefinitely until operator intervention (same class of risk as long-running admin operations without deadlines).
  2. Incremental + --additional: total cluster in-flight work can exceed N when multiple tool processes run overlapping plans; documentation / optional warnings are recommended.

Rejected alternatives

  1. Only documentation: “Split your JSON manually” — does not scale and yields inconsistent operations.
  2. New broker-side “max concurrent reassignments” quota: much larger scope; tool-side pacing addresses the common case without protocol work.
  3. Order = JSON file order only: rejected in favour of deterministic sorted order so behaviour is reproducible and independent of file editing.

Test plan

  • Unit tests: batch splitting; incremental ordering; failure on second batch; completion predicate shared between wait and incremental removal.
  • Args tests: invalid combinations; execute-only restriction for new flags.
  • Integration / cluster tests: execute with small batch sizes (including batch size 1) in KRaft and ZK modes where the project already runs ClusterTest.
  • Manual: large plan with --reassignment-batch-size and --list / --verify to observe waves and completion.

Documentation

  • Extend the kafka-reassign-partitions section of the Kafka documentation / ops guides: semantics of 0, non-incremental batching, incremental, --additional, and verification with --verify.
  • No labels