Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Adopted

Discussion thread: here 

JIRA: KAFKA-15045

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Finally, there are good reasons for a user to want to extend or modify the behaviour behavior of the Kafka Streams partition assignor beyond just changing the task assignment. For example, a user may want to implement their own initialization logic that initializes resources (much the same way the Streams Partition Assignor initializes internal topics).

...

Code Block
languagejava
titleStreamsConfig
public static class InternalConfig {
        // This will be removed
        public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
}

...

The following classes/interfaces will need to be implemented by the user in order to plug in a custom task assignor

TaskAssignor

The first and most important interface for users to implement in the TaskAssignor itself:

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {    

   /**
   * @param applicationState* theNONE: metadatano forerror thisdetected
 Streams application
   * @return the assignment of active and standby tasks to KafkaStreams clientsACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task
   */
  TaskAssignment assign(ApplicationState applicationState);

  /*** INVALID_STANDBY_TASK: stateless task assigned as a standby task
   * This callback can be used to observe the final assignment returned to the brokers.
   * 
   * @param assignment: the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
  * MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment
     * UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers
     * UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned
     */   
    enum AssignmentError {     
	    NONE,
	    ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,     
	    INVALID_STANDBY_TASK,
        MISSING_PROCESS_ID,
        UNKNOWN_PROCESS_ID,
	    UNKNOWN_TASK_ID
  }

  /**
   * @param applicationState the metadata for this Kafka Streams application
   *
   * @return the assignment of active and standby tasks to kafkaKafkaStreams streamsclients clients
   *
   */
 @throws TaskAssignmentException If publican Collection<KafkaStreamsAssignment> assignment();
  }
}

Another reason for introducing the new TaskAssignment and ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

To solve these problems, we plan to refactor the interface with two goals in mind:

  1. To provide a clean separation of input/output by splitting the ClientState into an input-only KafkaStreamsState metadata class and an output-only KafkaStreamsAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which KafkaStreams client(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new top-level public interfaces, KafkaStreamsState  and KafkaStreamsAssignment :

KafkaStreamsAssignment

First we have the KafkaStreamsAssignment interface, representing the output of the assignment

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public interface KafkaStreamsAssignment {
  ProcessID processId();

  Set<AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  Instant followupRebalanceDeadline();

  static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        STATELESS,
        STATEFUL,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

Read-only APIs

The following APIs are intended for users to read/use but do not need to be implemented in order to plug in a custom assignor

ProcessID

The  ProcessId  is a new wrapper class around the UUID to make things easier to understand:

Code Block
languagejava
titleProcessID
package org.apache.kafka.streams.processor.assignment; 

/** A simple wrapper around UUID that abstracts a Process ID */
public class ProcessID {

    public ProcessID(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }
}
 

KafkaStreamsState

Next we have the KafkaStreamsState  interface, representing the input to the assignor:

error occurs during assignment and you wish for the rebalance to be retried,
   *                                 you can throw this exception to keep the assignment unchanged and automatically
   *                                 schedule an immediate followup rebalance. 
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This callback can be used to observe the final assignment returned to the brokers and check for any errors that 
   * were detected while processing the returned assignment. If any errors were found, the corresponding 
   * will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will
   * be thrown up to kill the StreamThread and can be handled as any other uncaught exception would if the application
   * has registered a {@link StreamsUncaughtExceptionHandler}.
   * <p>
   * Note: some kinds of errors will make it impossible for the StreamsPartitionAssignor to parse the TaskAssignment
   * that was returned from the TaskAssignor's {@link #assign}. If this occurs, the {@link GroupAssignment} passed
   * in to this callback will contain an empty map instead of the consumer assignments.
   * 
   * @param assignment:   the final consumer assignments returned to the kafka broker, or an empty assignment map if
   *                      an error prevented the assignor from converting the TaskAssignment into a GroupAssignment
   * @param subscription: the original consumer subscriptions passed into the assignor
   * @param error:        the corresponding error type if one was detected while processing the returned assignment,  
   *                      or AssignmentError.NONE if the returned assignment was valid
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<KafkaStreamsAssignment> assignment();
  }
}

Another reason for introducing the new TaskAssignment and ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

To solve these problems, we plan to refactor the interface with two goals in mind:

  1. To provide a clean separation of input/output by splitting the ClientState into an input-only KafkaStreamsState metadata class and an output-only KafkaStreamsAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which KafkaStreams client(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new top-level public interfaces, KafkaStreamsState  and KafkaStreamsAssignment :

KafkaStreamsAssignment

Next we have the KafkaStreamsAssignment class, representing the output of the assignment to be created by the TaskAssignor:

Code Block
languagejava
titleKafkaStreamsAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple container class for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public class KafkaStreamsAssignment {

  /* 
   * Construct an instance of KafkaStreamsAssignment with this processId and the given set of
   * assigned tasks. If you want this KafkaStreams client to request a followup rebalance, you
   * can set the followupRebalanceDeadline via the {@link #withFollowupRebalance(Instant)} API.
   *
   * @param processId the processId for the KafkaStreams client that should receive this assignment
   * @param assignment the set of tasks to be assigned to this KafkaStreams client
Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface KafkaStreamsState {
  /**
   * @return the processId of the application instance running on this KafkaStreams client
   */
  ProcessID processId();

  /**
   * Returns the number of consumer clients in the consumer group for this KafkaStreams client, 
   * which represents its overall capacity.
   * <p>
   * NOTE: this is actually the "minimum capacity" of a KafkaStreams client, or the minimum number of assigned
   * active tasks below which the KafkaStreams client will have been over-provisioned and unable to give every
   * available consumer an active task assignment
   *
   * @return the number of consumers on this KafkaStreams client
   */
  int numConsumerClients();

  /**
   * @return the set of consumer client ids for this KafkaStreams client
   */
  SortedSet<String> consumerClientIds();

  /**
   * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance
   */
  SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return enda offsetnew sumKafkaStreamsAssignment -object offsetwith sum
the given processId *and assignment
   */
  public static KafkaStreamsAssignment  Task.LATEST_OFFSET if this was previously an active running task on this client
   */
  long lagFor(final TaskId taskof(final ProcessId processId, final Set<AssignedTask> assignment);

   /**
   * @returnThis theAPI previouscan tasksbe assignedused to thisrequest consumerthat ordereda byfollowup lag,rebalance filtered for any tasks that don't exist inbe triggered by the KafkaStreams client 
   * receiving this assignment
. The followup */
rebalance will SortedSet<TaskId>be prevTasksByLag(finalinitiated String consumerClientId);

  /**after the provided deadline
   * has Returnspassed, aalthough collectionit containingwill allalways (andwait only)until statefulit taskshas infinished the topologycurrent byrebalance {@linkbefore TaskId},
   * mappedtriggering toa itsnew "offset lag sum". This is computed as the difference between the changelog end offsetone. This request will last until the new rebalance, and will be erased if a
   * andnew therebalance currentbegins offset,before summedthe acrossscheduled allfollowup loggedrebalance statedeadline storeshas inelapsed. theThe task.next
   *
 assignment must request *the @returnfollowup arebalance mapagain fromif allit statefulstill taskswants to theirschedule lagone sumfor
   */
 the given Map<TaskIdinstant, Long> statefulTasksToLagSums();

  /**otherwise no additional rebalance will be triggered after that.
   * The
 {@link HostInfo} of* this@param KafkaStreamsrebalanceDeadline client,the ifinstant setafter viawhich the
this KafkaStreams client *will {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} configtrigger a followup rebalance
   *
   * @return thea hostnew infoKafkaStreamsAssignment forobject thiswith KafkaStreamsthe clientsame if configured, else {@code Optional.empty()}processId and assignment but with the given rebalanceDeadline
   */
  public Optional<HostInfo>KafkaStreamsAssignment hostInfowithFollowupRebalance(final Instant rebalanceDeadline);

  /**
public ProcessID processId();

 * Thepublic clientMap<TaskId, tags for this KafkaStreams client, if set any have been via configs using theAssignedTask> tasks();

  public void assignTask(AssignedTask);

  public void removeTask(AssignedTask);
 
  /**
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * Can be used however you want, or passed in to enable the rack-aware standby task assignor.@return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  public  * @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTagsInstant followupRebalanceDeadline();

  public static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        ACTIVE,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }

ApplicationState

The KafkaStreamsState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationState . The methods on the ApplicationState  are basically just the current inputs to the #assign method:


}

Read-only APIs

The following APIs are intended for users to read/use but do not need to be implemented in order to plug in a custom assignor

ProcessID

The  ProcessId  is a new wrapper class around the UUID to make things easier to understand:

Code Block
languagejava
titleProcessId
Code Block
languagejava
titleApplicationState
package org.apache.kafka.streams.processor.assignment; 

/**
 * A read-onlysimple metadatawrapper classaround representingUUID thethat currentabstracts statea ofProcess each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /**
     * @param computeTaskLags whether or not to include task lag information in the returned metadata. Note that passing 
     * in "true" will result in a remote call to fetch changelog topic end offsets Id */
public class ProcessId {

    public ProcessId(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }
}
 

KafkaStreamsState

Next we have the KafkaStreamsState  interface, representing the input to the assignor:

Code Block
languagejava
titleKafkaStreamsState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface KafkaStreamsState {
  /**
   * @return the processId of the application instance running on this KafkaStreams client
   */
  ProcessID processId();

  /**
   * Returns the number of processing threads available to work on tasks for this KafkaStreams client, 
   * which represents its overall capacity for work relative to other KafkaStreams clients.
   *
   * @return the number of processing threads on this KafkaStreams client
   */
  int numProcessingThreads();

  /**
   and you should pass in "false" unless
     * you specifically need the task lag information.
     *
     * @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
     */
    Map<ProcessID, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of allconsumer tasksclient ids infor this topology which must be assigned
  KafkaStreams client
   */
  SortedSet<String>  Set<TaskId> allTasksconsumerClientIds();

    /**
   * @return *
the set of all active *tasks @returnowned theby setconsumers ofon statefulthis andKafkaStreams changeloggedclient taskssince inthe thisprevious topologyrebalance
     */
  SortedSet<TaskId>  Set<TaskId> statefulTaskspreviousActiveTasks();

    /**
     *
     * @return the set of statelessall orstandby changelog-less tasks inowned thisby topology
consumers on this KafkaStreams client since the previous rebalance
   */
  SortedSet<TaskId>  Set<TaskId> statelessTaskspreviousStandbyTasks();

}

TaskAssignmentUtils

We'll also move some of the existing assignment functionality into a utils class that can be called by implementors of the new TaskAssignor . This will allow users to more easily adapt or modify pieces of the complex existing assignment algorithm, without having to re-implement the entire thing from scratch. 

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return end offset sum - offset sum
   *          Task.LATEST_OFFSET if this was previously an active running task on this client
   * @throws UnsupportedOperationException if the user did not request task lags be computed.
    */
  long lagFor(final TaskId task);

  /**
   * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
   * @throws UnsupportedOperationException if the user did not request task lags be computed.
   */
  SortedSet<TaskId> prevTasksByLag(final String consumerClientId);

  /**
   * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
   * and the current offset, summed across all logged state stores in the task.
   *
   * @return a map from all stateful tasks to their lag sum
   * @throws UnsupportedOperationException if the user did not request task lags be computed.
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of this KafkaStreams client, if set via the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the host info for this KafkaStreams client if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  /**
   * The client tags for this KafkaStreams client, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();

  /**
   * @return the rackId for this KafkaStreams client, or {@link Optional#empty()} if none was configured
   */
  Optional<String> rackId();

  }

ApplicationState

The KafkaStreamsState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the next new interface, the ApplicationState . The methods on the ApplicationState  are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /**
     * @param computeTaskLags whether or not to include task lag information in the returned metadata. Note that passing 
     * in "true" will result in a remote call to fetch changelog topic end offsets and you should pass in "false" unless
     * you specifically need the task lag information.
     *
     * @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
     *
     * @throws TaskAssignmentException if a retriable error occurs while computing KafkaStreamsState metadata. Re-throw
     *
Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to KafkaStreams clients according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment
     */
    public static Map<ProcessID, KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, 
                                                                                      final Map<ProcessID, KafkaStreamsAssignment> KafkaStreamsAssignments);

    /**
     * Optimize the active task assignment for rack-awareness
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param tasks                   the set of tasks to reassign if possible. Must already be assigned to a KafkaStreams client
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for active tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final ApplicationState applicationState, 
                                                                                      final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, 
                                                                                      final SortedSet<TaskId> tasks);

 this exception to /**
have Kafka Streams retry  * Optimize the standbyrebalance taskby assignmentreturning forthe rack-awarenesssame
     *
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param applicationState        the metadata and otherassignment infoand describingscheduling thean currentimmediate applicationfollowup staterebalance
     */
     * @return a new map containing the mappings from KafkaStreamsAssignments updated  Map<ProcessID, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);

    /**
     * @return a simple container class with the Streams default rack-awareconfigs relevant to assignment for
 standy tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(final ApplicationState applicationState,AssignmentConfigs assignmentConfigs();

    /**
     * @return a map of task ids to all tasks in this topology to be  assigned
     */
    Map<TaskId, TaskInfo> allTasks();

}

TaskInfo

A small interface with metadata for each task to be assigned will be used to pass along information about stateful vs stateless tasks, the mapping of input and changelog topic partitions to tasks, and other essential info such as the rack ids for each topic partition belonging to a given task.

Code Block
languagejava
titleTaskInfo
/**
 * A simple container class corresponding to a given {@link TaskId}.
 * Includes metadata such as whether it's stateful and the names of all state stores
 * belonging to this task, the set of input topic partitions and changelog topic partitions
 * for all logged state stores, and the rack ids of all replicas finalof Map<ProcessID,each KafkaStreamsAssignment>topic kafkaStreamsAssignments);

    /**
     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
     *partition
 * in the task.
 */
public interface TaskInfo {

    TaskId id();

    boolean isStateful();

     * @param applicationState the metadata and other info describing the current application state
     *
 Set<String> stateStoreNames();

	Set<TaskTopicPartition> topicPartitions();    * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     */
    public static Map<ProcessID, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState);
 }

TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

AssignmentConfigs

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public int nonOverlapCost();
}

Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState#kafkaStreamsStates . Users who request the task lags should make sure to handle failures the way they desire, for example by returning a "fallback assignment" (ie one that returns the same assignment as the previous one) via the TaskAssignmentUtils#fallbackAssignment API, and schedule a follow-up rebalance via the followRebalanceDeadline API.

Observing The Final Assignment

The custom assignor specifies a task to KafkaStreams client mapping, but the final task to group member mapping is controlled by the Streams partition assignor. It may be useful for implementors of the custom assignor to observe the final task to group member assignment that is returned back to Kafka. For example, it may be useful to record the mapping in an event that can be materialized externally to observe the current assignments. To support this, the TaskAssignor  interface also includes a method named finalAssignment which is called with the final computed GroupAssignment.

Proposed Changes

  
}

TaskTopicPartition

Another basic metadata container, this indicates whether the partition belongs to a source topic or a changelog topic (or in the case of a source-changelog topic, both) as well the rack ids of replicas hosting this partition, if available:

Code Block
languagejava
titleTaskTopicPartition
package org.apache.kafka.streams.processor.assignment;
 
/**
 * This is a simple container class used during the assignment process to distinguish
 * TopicPartitions type. Since the assignment logic can depend on the type of topic we're
 * looking at, and the rack information of the partition, this container class should have
 * everything necessary to make informed task assignment decisions.
 */
public interface TaskTopicPartition {
    /**
     *
     * @return the {@code TopicPartition} for this task.
     */
    TopicPartition topicPartition();

    /**
     *
     * @return whether the underlying topic is a source topic or not. Source changelog topics
     *         are both source topics and changelog topics.
     */
    boolean isSource();

    /**
     *
     * @return whether the underlying topic is a changelog topic or not. Source changelog topics
     *         are both source topics and changelog topics.
     */
    boolean isChangelog();

    /**
     *
     * @return the broker rack ids on which this topic partition resides. If no information could
     *         be found, this will return an empty optional value.
     */
    Optional<Set<String>> rackIds();
}

TaskAssignmentUtils

We'll also move some of the existing assignment functionality into a utils class that can be called by implementors of the new TaskAssignor . This will allow users to more easily adapt or modify pieces of the complex existing assignment algorithm, without having to re-implement the entire thing from scratch. 

Code Block
languagejava
titleTaskAssignmentUtils
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to KafkaStreams clients according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param KafkaStreamsAssignments the KafkaStreams client assignments to add standby tasks to
     */
    public static void defaultStandbyTaskAssignment(final ApplicationState applicationState, 
                                                    final Map<ProcessId, KafkaStreamsAssignment> KafkaStreamsAssignments);

    /**
     * Optimize active task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
     * assignment, but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
     * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment.    
     * <p>
     * This method optimizes cross-rack traffic for active tasks only. For standby task optimization,
     * use {@link #optimizeRackAwareStandbyTasks}.
     * <p>
     * It is recommended to run this optimization before assigning any standby tasks, especially if you have configured
     * your KafkaStreams clients with assignment tags via the rack.aware.assignment.tags config since this method may
     * shuffle around active tasks without considering the client tags and can result in a violation of the original
     * client tag assignment's constraints.
     *
     * @param kafkaStreamsAssignments the assignment of tasks to KafkaStreams clients to be optimized
     * @param optimizationParams      optional configuration parameters to apply 
     */
    public static void optimizeRackAwareActiveTasks(final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
                                                    final RackAwareOptimizationParams optimizationParams);      

    /**
     * Optimize standby task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
     * assignment, but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
     * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment.
     * <p>
     * This method optimizes cross-rack traffic for standby tasks only. For active task optimization,
     * use {@link #optimizeRackAwareActiveTasks}.
     * 
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param optimizationParams      optional configuration parameters to apply 
     */
    public static void optimizeRackAwareStandbyTasks(final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
                                                     final RackAwareOptimizationParams optimizationParams);

    /**
     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
     *
     * @param applicationState the metadata and other info describing the current application state
     *
     * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     */
    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState);

    /**
     * Validate the passed-in {@link TaskAssignment} and return an {@link AssignmentError} representing the
     * first error detected in the assignment, or {@link AssignorError.NONE} if the assignment passes the
     * verification check.
     * <p>
     * Note: this verification is performed automatically by the StreamsPartitionAssignor on the assignment
     * returned by the TaskAssignor, and the error returned to the assignor via the {@link TaskAssignor#onAssignmentComputed}
     * callback. Therefore, it is not required to call this manually from the {@link TaskAssignor#assign} method.
     * However if an invalid assignment is returned it will fail the rebalance and kill the thread, so it may be useful to
     * utilize this method in an assignor to verify the assignment before returning it and fix any errors it finds.
     *
     * @param applicationState The application for which this task assignment is being assessed.
     * @param taskAssignment   The task assignment that will be validated.
     *
     * @return {@code AssignmentError.NONE} if the assignment created for this application is valid,
     *         or another {@code AssignmentError} otherwise.
     */
    public static AssignmentError validateTaskAssignment(final ApplicationState applicationState,
                                                         final TaskAssignment taskAssignment) {
  }

TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

RackAwareOptimizationParams

A simple config container for necessary paramaters and optional overrides to apply when running the active or standby task rack-aware optimizations.

Code Block
languagejava
titleRackAwareOptimizationParams
public static final class RackAwareOptimizationParams {
    private final ApplicationState applicationState;
    private final Optional<Integer> trafficCostOverride;
    private final Optional<Integer> nonOverlapCostOverride;
    private final Optional<SortedSet<TaskId>> tasksToOptimize;


    /**       
     * Return a new config object with no overrides and the tasksToOptimize initialized to the set of all tasks in the given ApplicationState       
     */        
     public static RackAwareOptimizationParams of(final ApplicationState applicationState);        

    /**       
     * Return a new config object with the tasksToOptimize set to all stateful tasks in the given ApplicationState       
     */  
     public RackAwareOptimizationParams forStatefulTasks();  
     
    /**
     * Return a new config object with the tasksToOptimize set to all stateless tasks in the given ApplicationState
     */  
    public RackAwareOptimizationParams forStatelessTasks();

    /**
     * Return a new config object with the provided tasksToOptimize
     */ 
     public RackAwareOptimizationParams forTasks(final SortedSet<TaskId> tasksToOptimize);

    /**
     * Return a new config object with the provided trafficCost override applied
     */ 
    public RackAwareOptimizationParams withTrafficCostOverride(final int trafficCostOverride);

    /**
     * Return a new config object with the provided nonOverlapCost override applied
     */
    public RackAwareOptimizationParams withNonOverlapCostOverride(final int nonOverlapCostOverride);
}

AssignmentConfigs

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public OptionalInt trafficCost();
    public OptionalInt nonOverlapCost();
    public String rackAwareAssignmentStrategy();
 }


Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState#kafkaStreamsStates . Users who request the task lags via the computeTaskLags  input flag should make sure to handle failures the way they desire, and can rethrow a thrown TaskAssignmentException  (or just not catch it in the first place) to have Kafka Streams automatically "retry" the rebalance by returning the same assignment and scheduling an immediate followup rebalance. Advanced users who want more control over the "fallback" assignment and/or the timing of immediate followup rebalance(s) can simply swallow the TaskAssignmentException  and use the followupRebalanceDeadline  to schedule followup rebalances, eg to implement a retry/backoff policy


Proposed Changes

On the whole we are not introducing new functionality to Streams itself, but rather are refactoring the existing TaskAssignor interface so that it's (a) pluggable/publicly exposed, and (b) easier to understand, use, and implement. Code-wise the largest change is the breaking up of the ClientState into the new KafkaStreamsState and KafkaStreamsAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

Observing The Final Assignment

The custom assignor specifies a task to KafkaStreams client mapping, but the final task to group member mapping is controlled by the Streams partition assignor. It may be useful for implementors of the custom assignor to observe the final task to group member assignment that is returned back to Kafka. For example, it may be useful to record the mapping in an event that can be materialized externally to observe the current assignments. To support this, the TaskAssignor  interface also includes a method named #onAssignmentComputed which is called with the final computed GroupAssignment.

This callback can also be used to return an error to the user and notify them in the case of an invalid assignment computed by their TaskAssignor. The specifics are discussed in the following section.

Assignment Validation

As noted in the TaskAssignor  javadocs, the StreamsPartitionAssignor will verify the assignment returned by the task assignor and return an error via #onAssignmentComputed  if any of the following cases are observed while processing the TaskAssignor 's assignment:

  1. ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES :  multiple KafkaStreams clients assigned with the same active task
  2. INVALID_STANDBY_TASK: stateless task assigned as a standby task
  3. MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment
  4. UNKNOWN_PROCESS_ID : unrecognized ProcessId  not matching any of the participating consumers
  5. UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned

If any of these errors are detected, the StreamsPartitionAssignor will throw an exception after returning the error code via the #onAssignmentComputed  callback. This error will be bubbled up through the StreamThread to the uncaught exception handler where the user can choose how to react from there, same as any other exception.

If no error is detected, the AssignmentError  code NONE  will be returned in the #onAssignmentComputed  callback.On the whole we are not introducing new functionality to Streams itself, but rather are refactoring the existing TaskAssignor interface so that it's (a) pluggable/publicly exposed, and (b) easier to understand, use, and implement. Code-wise the largest change is the breaking up of the ClientState into the new KafkaStreamsState and KafkaStreamsAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

Consumer Assignments

One major decision in this KIP was whether to encompass the assignment of tasks to consumers/threads within each KafkaStreams client, or to leave that up to the StreamsPartitionAssignor and only carve out the KafkaStreams-level assignment for pluggability. Ultimately we decided on the latter, for several reasons:

...