Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will introduce a new config that supplies an instance of TaskAssignor  (discussed below). In the future, additional plugins can use the same partition.assignor  prefix:

Code Block
languagejava
titleStreamsConfig
/** {@code partition.assignor.task.assignor.class} */
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface". Defaults to the <@link HighAvailabilityTaskAssignor> class.";

...

Note that the thread-level assignment will remain an un-configurable internal implementation detail of the partition assignor (see "Rejected Alternatives" for further thoughts and reasoning). See Consumer Assignments in the Proposed Changes section below for more details.

To enable users to actually plug something in by implementing taskAssignor TaskAssignor , we will need to move the TaskAssignor interface from o.a.k.streams.processor.internals.assignment to a public package, along with some of the supporting classes such as the assignment configs container class and ClientState which both appear in the TaskAssignor#assign method (although those will be heavily refactored, discussed below). All these new public APIs will be placed in a new non-internal public package that mirrors their old internal package, specifically org.apache.kafka.streams.processor.assignment.

Both the input parameter and return value will be encapsulated in wrapper classes for the sake of forwards compatibility. This will let us avoid the cycle of adding, deprecating, and removing new #assign overloads if/when we want to evolve the assignor in the future, for example to pass in additional metadata or enable the assignor to output new kinds of information or instructions to the StreamsPartitionAssignor. The analogous ConsumerPartitionAssignor works similarly, returning a single GroupAssignment object that wraps the collection of individual consumer assignments for the same reason.

This KIP introduces a number of classes and interfaces. Some of these are intended for the user to implement, and some of them are "read-only" APIs for the user to access but not implement. We'll start with the former category of user-implemented APIs and then go into the read-only "support class" APIs

User APIs

The following classes/interfaces will need to be implemented by the user in order to plug in a custom task assignor

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  /**
   * @param applicationState the metadata for this Streams application
   * @return the assignment of active and standby tasks to KafkaStreams clients
Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  /**
   * @param applicationState the metadata for this Streams application
   * @return the assignment of active and standby tasks to Streams client nodes
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This method allows the assignor plugin to observe the final assignment returned
   * to the brokers.
   * @param assignment: the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   */
  voidTaskAssignment onAssignmentComputedassign(GroupAssignment assignment, GroupSubscription subscriptionApplicationState applicationState);

  /**
   * Wrapper class for This callback can be used to observe the final assignment of active and standbys tasksreturned to individualthe Streams brokers.
   * client nodes
   */
 @param classassignment: TaskAssignmentthe {
final assignment returned to privatethe final Collection<StreamsClientAssignment> streamsClientAssignments;

	/**kafka broker
   * @param * @returnsubscription: the assignmentoriginal ofsubscription taskspassed tointo kafkathe streams clientsassignor
     */
  default void public Collection<StreamsClientAssignment> assignment();onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription) {}

    /**
   * Wrapper *class @returnfor the numberfinal assignment of active Streamsand clientstandbys nodestasks to whichindividual 
 tasks were assigned
* KafkaStreams clients
   */
  class  public int numStreamsClients();

    TaskAssignment {

	/**
     * @return athe String representationassignment of thetasks returnedto assignment,kafka instreams processId orderclients
     */
    @Override
    public StringCollection<KafkaStreamsAssignment> toStringassignment();
  }
}

Another reason for introducing the new TaskAssignment and ApplicationState classes ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client node. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to nodes KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

...

  1. To provide a clean separation of input/output by splitting the ClientState into an input-only StreamsClientState KafkaStreamsState metadata class and an output-only StreamsClientAssignment KafkaStreamsAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which nodeKafkaStreams client(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new top-level public interfaces, StreamsClientState KafkaStreamsState  and StreamsClientAssignment KafkaStreamsAssignment :

...

KafkaStreamsAssignment

First we have the StreamsClientAssignment KafkaStreamsAssignment interface, representing the output of the assignment

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on Streams client nodes 
KafkaStreams clients
  */
public interface StreamsClientAssignmentKafkaStreamsAssignment {
  ProcessID processId();

  Set<AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  Instant followupRebalanceDeadline();

  static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        STATELESS,
        STATEFUL,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

ProcessID

Read-only APIs

The following APIs are intended for users to read/use but do not need to be implemented in order to plug in a custom assignor

ProcessID

The  ProcessIdThe  ProcessId  is a new wrapper class around the UUID to make things easier to understand:

Code Block
languagejava
titleProcessID
package org.apache.kafka.streams.processor.assignment; 

/** A simple wrapper around UUID that abstracts a Process ID */
public class ProcessID {
    private final UUID id;

    public ProcessID(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }
}
    int hashCode() {
        return id.hashCode();
    }

    boolean equals(final ProcessID other) {
        if (other == null || getClass() != other.getClass()) {
            return false;
        }
         return id.equals(other.id);
    }
}
 

StreamsClientState

Next we have the StreamsClientState  interface, representing the input to the assignor:

KafkaStreamsState

Next we have the KafkaStreamsState  interface, representing the input to the assignor:

Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface KafkaStreamsState {
  /**
   * @return the processId of the application instance running on this KafkaStreams client
   */
  ProcessID processId();

  /**
   * Returns the number of consumer clients in the consumer group for this KafkaStreams client, 
   * which represents its overall capacity.
   * <p>
   * NOTE: this is actually the "minimum capacity" of a KafkaStreams client, or the minimum number of assigned
   * active tasks below which the KafkaStreams client will have been over-provisioned and unable to give every
   * available consumer an active task assignment
   *
   * @return the number of consumers on this KafkaStreams client
   */
  int numConsumerClients();

  /**
   * @return the set of consumer client ids for this KafkaStreams client
   */
  SortedSet<String> consumerClientIds();

  /**
   * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance
Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface StreamsClientState {
  /**
   * @return the processId of the application instance running on this node
   */
  ProcessID processId();

  /**
   * Returns the number of StreamThreads on this client, which is equal to the number of main consumers
   * and represents its overall capacity.
   * <p>
   * NOTE: this is actually the "minimum capacity" of a node, or the minimum number of assigned
   * active tasks below which the node will have been over-provisioned and unable to give every
   * available StreamThread an active task assignment
   *
   * @return the number of consumers on this node
   */
  intSortedSet<TaskId> numStreamThreadspreviousStandbyTasks();

  /**
   * @returnReturns the total setlag across ofall consumerlogged clientstores idsin forthe alltask. StreamThreadsEqual onto the given node
   */end offset sum if this client
  SortedSet<String> consumers();

  /**
   * @return the set of all active tasks owned by consumers on this node since the previous rebalance * did not have any state for this task on disk.
   *
   * @return end offset sum - offset sum
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumersTask.LATEST_OFFSET if this was previously an active running task on this node since the previous rebalanceclient
   */
  SortedSet<TaskId>long previousStandbyTaskslagFor(final TaskId task);

  /**
   * Returns@return the totalprevious tasks lagassigned acrossto allthis loggedconsumer storesordered inby thelag, task.filtered Equalfor toany thetasks endthat offsetdon't sumexist ifin this clientassignment
   */
 did notSortedSet<TaskId> haveprevTasksByLag(final any state for this task on disk.String consumerClientId);

  /**
   *
 Returns a collection *containing @returnall end(and offsetonly) sumstateful -tasks offsetin sum
the topology by * {@link TaskId},
   * mapped to its "offset lag Tasksum".LATEST_OFFSET ifThis thisis wascomputed previouslyas anthe activedifference runningbetween taskthe onchangelog thisend clientoffset
   */
  long lagFor(final TaskId task);

  /* and the current offset, summed across all logged state stores in the task.
   *
   * @return thea previousmap tasksfrom assignedall tostateful thistasks consumerto orderedtheir by lag, filteredsum
 for any tasks*/
 that don't exist in this assignment
   */
  SortedSet<TaskId> prevTasksByLag(final String consumerMap<TaskId, Long> statefulTasksToLagSums();

  /**
   * ReturnsThe a{@link collectionHostInfo} containingof allthis (and only) stateful tasks in the topology by {@link TaskId},KafkaStreams client, if set via the
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset{@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * and@return the currenthost offset,info summedfor acrossthis allKafkaStreams loggedclient stateif storesconfigured, inelse the{@code taskOptional.empty()}
   */
   * @return a map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  Optional<HostInfo> hostInfo();

  /**
   * The {@linkclient HostInfo}tags offor this nodeKafkaStreams client, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} configStreamsConfig#clientTagPrefix}
   * <p>
   * @return the host info for this node if configured, else {@code Optional.empty()}Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   */
  Optional<HostInfo> hostInfo();

  /**
 @return all *the The client tags forfound in this KafkaStreams client's node, if set any have been via configs using the
   * {@link org.apache{@link org.apache.kafka.streams.StreamsConfig#clientTagPrefixStreamsConfig}
   * <p>/
  Map<String, * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this node's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();
 }

ApplicationState

The StreamsClientState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationState . The methods on the ApplicationState  are basically just the current inputs to the #assign method:

String> clientTags();
 }

ApplicationState

The KafkaStreamsState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationState . The methods on the ApplicationState  are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /**
     * @param computeTaskLags whether or not to include task lag information in the returned metadata. Note that passing 
     * in "true" will result in a remote call to fetch changelog topic end offsets and you should pass in "false" unless
Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /**
     * @return a map from the {@code processId} to {@link StreamsClientState} for all Streams client nodes in this app
     */
 you specifically need Map<ProcessID, StreamsClient> streamsClientStates();

the task lag information.
     /**
     * Makes@return a remotemap callfrom tothe fetch{@code changelogprocessId} topicto end{@link offsetsKafkaStreamsState} and,for ifall successful,KafkaStreams usesclients thein resultsthis to computeapp
     */
 task  lags forMap<ProcessID, each {@link StreamsClientState}.KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);

     /**
     * @return whethera the end offset fetch and lag computation was successful
     */
    boolean computeTaskLags();

    /**
     * @return a simple simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned to a node
     */
    Set<TaskId> allTasks();

    /**
     *
     * @return the set of stateful and changelogged tasks in this topology
     */
    Set<TaskId> statefulTasks();

 
}

TaskAssignmentUtils

   /**
     *
     * @return the set of stateless or changelog-less tasks in this topology
     */
    Set<TaskId> statelessTasks();

}

TaskAssignmentUtils

We'll also move some of the existing assignment functionality into a utils class that can be called by implementors of the new TaskAssignor . This will allow users to more easily adapt or modify pieces of the complex existing assignment algorithm, without having to re-implement the entire thing from scratch. 

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to nodesKafkaStreams clients according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param streamsClientAssignmentsapplicationState the current assignment of tasks to nodes
 the metadata and other * info describing the current application state
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     *
     * @return a new map containing the mappings from streamsClientAssignmentsKafkaStreamsAssignments updated with the default standby assignment
     */
    public static Map<ProcessID, StreamsClientAssignment>KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, final
 Map<ProcessID,  StreamsClientAssignment>  streamsClientAssignments) {...}

      /**
     * Optimize the active   task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     * @param tasks the set of tasks to reassign if possible. Must already be assigned to a node
     * @return a new map containing the mappings from streamsClientAssignments updated with the default rack-aware assignmentfinal forMap<ProcessID, active tasksKafkaStreamsAssignment> KafkaStreamsAssignments);

      /**/
    public static* Map<ProcessID,Optimize StreamsClientAssignment>the optimizeRackAwareActiveTasks(finalactive ApplicationStatetask applicationState,assignment final Map<ProcessID, StreamsClientAssignment> streamsClientAssignments, final SortedSet<TaskId> tasks) {...}

for rack-awareness
     *
     /**
* @param applicationState      * Optimize the standbymetadata taskand assignmentother for rack-awareness
     *info describing the current application state
     * @param streamsClientAssignmentskafkaStreamsAssignments the current assignment of tasks to nodesKafkaStreams clients
     * @param tasks                   the set of tasks to reassign if possible. Must already be assigned to a KafkaStreams client
     *
     * @return a new map containing the mappings from streamsClientAssignmentsKafkaStreamsAssignments updated with the default rack-aware assignment for standyactive tasks
     */
    public static Map<ProcessID, StreamsClientAssignment>KafkaStreamsAssignment> optimizeRackAwareStandbyTasksoptimizeRackAwareActiveTasks(final ApplicationState applicationState, 
 final Map<ProcessID, StreamsClientAssignment> streamsClientAssignments) {...}
}

TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

AssignmentConfigs

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public int nonOverlapCost();
}

Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState.computeTaskLags . This means we need some way to communicate to the streams partition assignor that it should retain the same assignment and schedule a follow-up rebalance. To do this, we will add the exception type StreamsAssignorRetryableException . If the TaskAssignor  throws this exception, StreamsPartitionAssignor  catches it, does fallback assignment (ie returns the same assignment as the previous one), and schedules a follow-up rebalance.

Observing The Final Assignment

The custom assignor specifies a task to node mapping, but the final task to group member mapping is controlled by the Streams partition assignor. It may be useful for implementors of the custom assignor to observe the final task to group member assignment that is returned back to Kafka. For example, it may be useful to record the mapping in an event that can be materialized externally to observe the current assignments. To support this, the TaskAssignor  interface also includes a method named finalAssignment which is called with the final computed GroupAssignment.

Proposed Changes

No actual changes to functionality, mainly moving an internal config to the public part of StreamsConfig and bringing along a few currently-internal classes into the public API as some new interfaces and a new public assignment package. Code-wise the largest change is really the breaking up of the ClientState into the new StreamsClientState and StreamsClientAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

Compatibility, Deprecation, and Migration Plan

Since this was formally an internal config and not part of the public API, we don't need to go through the usual deprecation path. See "Rejected Alternatives" for some slightly more nuanced discussion here

Test Plan

Mostly nothing to report here as there should already be tests in place for this config, however I will check the existing test coverage during implementation and fill in any gaps as needed to make sure it's possible to set either of the OOTB assignors (HA or Sticky) as well as a custom assignor.

Rejected Alternatives

                                                                                     final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, 
                                                                                      final SortedSet<TaskId> tasks);

    /**
     * Optimize the standby task assignment for rack-awareness
     *
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param applicationState        the metadata and other info describing the current application state
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for standy tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(final ApplicationState applicationState,
                                                                                       final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments);

    /**
     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
     *
     * @param applicationState the metadata and other info describing the current application state
     *
     * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     */
    public static Map<ProcessID, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState);
 }

TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

AssignmentConfigs

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public int nonOverlapCost();
}


Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState#kafkaStreamsStates . Users who request the task lags should make sure to handle failures the way they desire, for example by returning a "fallback assignment" (ie one that returns the same assignment as the previous one) via the TaskAssignmentUtils#fallbackAssignment API, and schedule a follow-up rebalance via the followRebalanceDeadline API.

Observing The Final Assignment

The custom assignor specifies a task to KafkaStreams client mapping, but the final task to group member mapping is controlled by the Streams partition assignor. It may be useful for implementors of the custom assignor to observe the final task to group member assignment that is returned back to Kafka. For example, it may be useful to record the mapping in an event that can be materialized externally to observe the current assignments. To support this, the TaskAssignor  interface also includes a method named finalAssignment which is called with the final computed GroupAssignment.

Proposed Changes

On the whole we are not introducing new functionality to Streams itself, but rather are refactoring the existing TaskAssignor interface so that it's (a) pluggable/publicly exposed, and (b) easier to understand, use, and implement. Code-wise the largest change is the breaking up of the ClientState into the new KafkaStreamsState and KafkaStreamsAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

Consumer Assignments

One major decision in this KIP was whether to encompass the assignment of tasks to consumers/threads within each KafkaStreams client, or to leave that up to the StreamsPartitionAssignor and only carve out the KafkaStreams-level assignment for pluggability. Ultimately we decided on the latter, for several reasons:

  1. To keep this KIP looking towards the future: we are working on moving away from the consumer-per-StreamThread model and hope to soon have only a single consumer per KafkaStreams client. If we only have to assign tasks at the KafkaStreams client level, we can implement this new feature completely transparently and users who plug in a custom task assignor won't have to do anything to adapt, and we don't need to deprecate any APIs in the assignor after this change.
  2. To shield users from subtle, difficult, and/or internal protocol requirements and constraints: for example with cooperative rebalancing, it is vital that the assignor make sure not to revoke and reassign a partition to a new consumer within the same rebalance. This is a non-trivial constraint and would be easy for users to mess up, and further, we don't want users to have to worry about protocol-level details.
  3. To retain control over the final assignment: right now we apply some niche optimizations/bugfixes like assigning transient standbys to avoid losing in-memory state, as well as some other subtle logic that currently resides in the last leg of the StreamsPartitionAssignor's algorithm that tackles the distribution of KafkaStreams client tasks to threads. We don't want users to have to implement all of this again or else risk running into the same bugs and regressions that we've already found and fixed based on the past years of operating the StreamsPartitionAssignor.
  4. To keep things simple: this is frankly already a complicated enough feature, and adding consumer-level assignment only makes things worse. We didn't see any practical application for adjusting the thread-level assignment since it pretty much always makes sense to do the "sticky-but-balanced" assignment that Streams currently applies. In the end, while there have been many problems and complaints with the current KafkaStreams client assignment, we haven't seen or heard anyone request control over the thread-level assignment and could not think of any use case for this ourselves, and so it simply does not feel worth the added complexity.

This topic is touched on again in the Rejective Alternatives section

Compatibility, Deprecation, and Migration Plan

Since this was formally an internal config and not part of the public API, we don't need to go through the usual deprecation path. See "Rejected Alternatives" for some slightly more nuanced discussion here

Test Plan

Mostly nothing to report here as there should already be tests in place for this config, however I will check the existing test coverage during implementation and fill in any gaps as needed to make sure it's possible to set either of the OOTB assignors (HA or Sticky) as well as a custom assignor.

Rejected Alternatives

  1. One obvious question here is whether we want to still deprecate the old internal config anyway, out of compassion for any who may already be using it despite it not being considered public. Personally I think this would be reasonable but don't feel strongly one way or another.
  2. Another possibility that was considered and ultimately decided against was whether to encompass the thread-level assignment in this KIP, and bring that into the public API and make it pluggable as well. We determined that this did not seem necessary to do as part of the initial KIP, especially considering the large scope we have already reached. However it's worth noting that a followup KIP that builds on the new public API(s) introduced here would become much more feasible should someone wish to customize the thread-level logic at some point in the future.
  3. One obvious question here is whether we want to still deprecate the old internal config anyway, out of compassion for any who may already be using it despite it not being considered public. Personally I think this would be reasonable but don't feel strongly one way or another.
  4. Another possibility that was considered and ultimately decided against was whether to encompass the thread-level assignment in this KIP, and bring that into the public API and make it pluggable as well. We determined that this did not seem necessary to do as part of the initial KIP, especially considering the large scope we have already reached. However it's worth noting that a followup KIP that builds on the new public API(s) introduced here would become much more feasible should someone wish to customize the thread-level logic at some point in the future. If/when that question is brought up, we'll have to address a few other concerns we had that contributed to our decision to exclude this for now, such as validating the thread assignment for correctness according to the cooperative rebalancing protocol, or niche optimizations like transient standbys to avoid losing in-memory state, and some other subtle logic that currently resides in the last leg of the StreamsPartitionAssignor's algorithm that tackles the distribution of node tasks to threads