This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateRejected (modified existing assignor algorithm instead of adding new one, so no public APIs were changed)

Discussion thread: here

JIRA: KAFKA-9987

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KIP-429 we added the new CooperativeStickyAssignor which leverages on the underlying sticky assignment algorithm of the existing StickyAssignor (moved to AbstractStickyAssignor). The algorithm is fairly complex as it tries to optimize stickiness while satisfying perfect balance in the case individual consumers may be subscribed to different subsets of the topics. While it does a pretty good job at what it promises to do, it doesn't scale well with large numbers of consumers and partitions. To give a concrete example, users have reported that it takes 2.5 minutes for the assignment to complete with just 2100 consumers reading from 2100 partitions.

Since partitions revoked during the first of two cooperative rebalances will remain unassigned until the end of the second rebalance, it's important for the rebalance to be as fast as possible.

If we can constrain the problem a bit, we can simplify the algorithm greatly. In many cases the individual consumers won't be subscribed to some random subset of the total subscription, they will all be subscribed to the same set of topics and rely on the assignor to balance the partition workload. It would be nice to provide an additional cooperative assignor OOTB that performs efficiently for the many use cases that satisfy this constraint.

Public Interfaces

We will add a new assignor implementing the `ConsumerPartitionAssignor` interface, which can then be plugged in to get efficient cooperative rebalancing OOTB.

/**
 * A cooperative assignor that is optimized for the case with all consumers subscribed to the same set of topics.
 * If your group does not satisfy this constraint, you should use the {@link CooperativeStickyAssignor} instead.
 * <p>
 * This assignor guarantees a balanced partition assignment such that the number of assigned partitions will
 * differ by at most one across all members of the group. It aims to be as "sticky" as possible without violating
 * balance such that the same assignment will always be generated when there are no membership or metadata changes.
 * Note that as a cooperative assignor, any partitions that are transferring ownership will be removed from the 
 * assignment until they have been safely revoked. A followup rebalance will automatically be triggered to 
assign
 * such partitions to their new owner according to the cooperative rebalancing protocol.
 * <p>
 * IMPORTANT: if upgrading from 2.3 or earlier, you must follow a specific upgrade path in order to safely turn on
 * cooperative rebalancing. See the <a href="https://kafka.apache.org/documentation/#upgrade_240_notable">upgrade guide</a>
 * for details.
 */
public class ConstrainedCooperativeStickyAssignor extends AbstractStickyAssignor {

    @Override
    public String name() {
        return "constrained-cooperative-sticky";
    }

    @Override
    public List<RebalanceProtocol> supportedProtocols() {
        return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER);
    }
}


Proposed Changes

The constrained sticky algorithm is as follows.

Let N be the number of consumers participating in the rebalance, and P be the total number of partitions summed across all subscribed topics. Since we assume that all consumers are subscribed to all topics, we can assert that in the balanced assignment each consumer will have between (P/N)floor and (P/N)ceil  partitions. 

We can do the assignment in just a few linear passes. First define the following sets:

C_f := (P/N)_floor, the floor capacity
C_c := (P/N)_ceil, the ceiling capacity

members := the sorted set of all consumers
partitions := the set of all partitions
unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions
unfilled_members := the set of consumers not yet at capacity, initialized to empty
max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty
member.owned_partitions := the set of previously owned partitions encoded in the Subscription

// Reassign as many previously owned partitions as possible
for member : members
		remove any partitions that are no longer in the subscription from its owned partitions
		remove all owned_partitions if the generation is old
		if member.owned_partitions.size < C_f
			assign all owned partitions to member and remove from unassigned_partitions
			add member to unfilled_members
		else if member.owned_partitions.size == C_f
			assign first C_f owned_partitions to member and remove from unassigned_partitions
		else
			assign first C_c owned_partitions to member and remove from unassigned_partitions
			add member to max_capacity_members

sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 .... (for data parallelism)
sort unfilled_members by memberId (for determinism)

// Fill remaining members up to C_f
for member : unfilled_members
	compute the remaining capacity as C = C_f - num_assigned_partitions
	pop the first C partitions from unassigned_partitions and assign to member

// Steal partitions from members with max_capacity if necessary
if we run out of partitions before getting to the end of unfilled members:
	for member : unfilled_members
		poll for first member in max_capacity_members and remove one partition
		assign this partition to the unfilled member
	
// Distribute remaining partitions, one per consumer, to fill some up to C_c if necessary
if we run out of unfilled_members before assigning all partitions:
	for partition : unassigned_partitions
		assign to next member in members that is not in max_capacity_members (then add member to max_capacity_members) 


Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

Improving the existing sticky algorithm

As mentioned above, the existing algorithm is good at what it aims to do and is considerably more flexible than the proposed assignor here. It seems best to add a new assignor to meet the needs of some without preventing others from using the only current cooperative assignor that may meet their needs