Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Accepted Adopted

Discussion thread: here 

JIRA: KAFKA-15045

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {    

   /**
     * NONE: no error detected
     * ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task
     * ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client
     * INVALID_STANDBY_TASK: stateless task assigned as a standby task
     * MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment
     * UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers
     * UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned
     */   
    enum AssignmentError {     
	    NONE,
	    ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,     
	    ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
	    INVALID_STANDBY_TASK,
        MISSING_PROCESS_ID,
        UNKNOWN_PROCESS_ID,
	    UNKNOWN_TASK_ID
  }

  /**
   * @param applicationState the metadata for this Kafka Streams application
   *
   * @return the assignment of active and standby tasks to KafkaStreams clients 
   *
   * @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried,
   *                                 you can throw this exception to keep the assignment unchanged and automatically
   *                                 schedule an immediate followup rebalance. 
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This callback can be used to observe the final assignment returned to the brokers and check for any errors that 
   * were detected while processing the returned assignment. If any errors were found, the corresponding 
   * will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will
   * be thrown up to kill the StreamThread and can be handled as any other uncaught exception would if the application
   * has registered a {@link StreamsUncaughtExceptionHandler}.
   * <p>
   * Note: some kinds of errors will make it impossible for the StreamsPartitionAssignor to parse the TaskAssignment
   * that was returned from the TaskAssignor's {@link #assign}. If this occurs, the {@link GroupAssignment} passed
   * in to this callback will contain an empty map instead of the consumer assignments.
   * 
   * @param assignment:   the final consumer assignments returned to the kafka broker, or an empty assignment map if
   *                      an error prevented the assignor from converting the TaskAssignment into a GroupAssignment
   * @param subscription: the original consumer subscriptions passed into the assignor
   * @param error:        the corresponding error type if one was detected while processing the returned assignment,  
   *                      or AssignmentError.NONE if the returned assignment was valid
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<KafkaStreamsAssignment> assignment();
  }
}

...

Code Block
languagejava
titleKafkaStreamsAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple container class for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public class KafkaStreamsAssignment {

  /* 
   * Construct an instance of KafkaStreamsAssignment with this processId and the given set of
   * assigned tasks. If you want this KafkaStreams client to request a followup rebalance, you
   * can set the followupRebalanceDeadline via the {@link #withFollowupRebalance(Instant)} API.
   *
   * @param processId the processId for the KafkaStreams client that should receive this assignment
   * @param assignment the set of tasks to be assigned to this KafkaStreams client
   *
   * @return a new KafkaStreamsAssignment object with the given processId and assignment
   */
  public static KafkaStreamsAssignment of(final ProcessId processId, final Set<AssignedTask> assignment);

  /**
   * This API can be used to request that a followup rebalance be triggered by the KafkaStreams client 
   * receiving this assignment. The followup rebalance will be initiated after the provided deadline
   * has passed, although it will always wait until it has finished the current rebalance before 
   * triggering a new one. This request will last until the new rebalance, and will be erased if a
   * new rebalance begins before the scheduled followup rebalance deadline has elapsed. The next
   * assignment must request the followup rebalance again if it still wants to schedule one for
   * the given instant, otherwise no additional rebalance will be triggered after that.
   * 
   * @param rebalanceDeadline the instant after which this KafkaStreams client will trigger a followup rebalance
   *
   * @return a new KafkaStreamsAssignment object with the same processId and assignment but with the given rebalanceDeadline
   */
  public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline);

  public ProcessID processId();

  public Set<AssignedTask> tasks(Map<TaskId, AssignedTask> tasks();

  public void assignTask(AssignedTask);

  public void removeTask(AssignedTask);
 
  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  public Instant followupRebalanceDeadline();

  public static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        ACTIVE,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

...

Code Block
languagejava
titleApplicationState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /**
     * @param computeTaskLags whether or not to include task lag information in the returned metadata. Note that passing 
     * in "true" will result in a remote call to fetch changelog topic end offsets and you should pass in "false" unless
     * you specifically need the task lag information.
     *
     * @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
     *
     * @throws TaskAssignmentException if a retriable error occurs while computing KafkaStreamsState metadata. Re-throw
     *                                 this exception to have Kafka Streams retry the rebalance by returning the same
     *                                 assignment and scheduling an immediate followup rebalance
     */
    Map<ProcessID, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return thea setmap of task ids to all tasks in this topology which mustto be assigned
     */
    Map<TaskId, Set<TaskInfo>TaskInfo> allTasks();

}

TaskInfo

A small interface with metadata for each task to be assigned will be used to pass along information about stateful vs stateless tasks, the mapping of input and changelog topic partitions to tasks, and other essential info such as the rack ids for each topic partition belonging to a given task.

...

Code Block
languagejava
titleTaskAssignmentUtils
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to KafkaStreams clients according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param KafkaStreamsAssignments the currentKafkaStreams assignmentclient of tasksassignments to KafkaStreamsadd clients
standby     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignmenttasks to
     */
    public static Map<ProcessID, KafkaStreamsAssignment>void defaultStandbyTaskAssignment(final ApplicationState applicationState, 
                                                    final Map<ProcessId, KafkaStreamsAssignment> KafkaStreamsAssignments);

    /**
     * Optimize active task assignment for rack awareness. This optimization is            final Map<ProcessID, KafkaStreamsAssignment> KafkaStreamsAssignments);

    /**
     * Optimize active task assignment for rack awareness. This optimization is based based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
     * assignment, but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
     * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment.    
     * <p>
       * Note:This this method willoptimizes modifycross-rack thetraffic inputfor {@linkactive KafkaStreamsAssignment}tasks objectsonly. andFor returnstandby the same map.task optimization,
     * Ituse does not make a copy of the map or the KafkaStreamsAssignment objects.      {@link #optimizeRackAwareStandbyTasks}.
     * <p>
     * ThisIt methodis optimizesrecommended cross-rackto trafficrun forthis activeoptimization tasksbefore only.assigning Forany standby task optimizationtasks,
 especially if you have *configured
 use {@link #optimizeRackAwareStandbyTasks}.
  * your KafkaStreams *
clients with assignment tags via * @param applicationState    the rack.aware.assignment.tags config since this method may
    the metadata and other info describing the current application state
     * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients* shuffle around active tasks without considering the client tags and can result in a violation of the original
     * client tag assignment's constraints.
     *
 @param tasks   * @param kafkaStreamsAssignments the assignment of tasks to KafkaStreams clients to be optimized
     * the set of tasks to reassign if possible. Must already be assigned to a KafkaStreams client
     *@param optimizationParams      optional configuration parameters to apply 
     */
    public static 
void optimizeRackAwareActiveTasks(final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
 *  @return  a  map  with  the  KafkaStreamsAssignments  updated  to  minimize  cross-rack  traffic  for  active  tasks
      */
        public  static  Map<ProcessID,  KafkaStreamsAssignment>  optimizeRackAwareActiveTasks(final final ApplicationState applicationState, 
  RackAwareOptimizationParams optimizationParams);      

    /**
     * Optimize standby task assignment for rack awareness. This optimization is based on the 
     *      {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
    final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments* assignment, 
but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
     * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a   final SortedSet<TaskId> tasks);      

    /**
     * Optimize standby task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG@code nonOverlapCost}
 to a positive value, *the configsresulting whichassignment balancewill crossbe rackidentical trafficto minimizationthe andinput task movementassignment.
     * Setting<p>
 {@code trafficCost} to a larger* numberThis reducesmethod the overalloptimizes cross -rack traffic offor thestandby resultingtasks 
only. For active task  * assignment, but can increase the number of tasks shuffled around between clients. optimization,
     * use {@link #optimizeRackAwareActiveTasks}.
     * Setting
 {@code nonOverlapCost} to a larger* number@param increasesKafkaStreamsAssignments the affinitycurrent assignment of tasks to theirKafkaStreams intended clientclients
       * and@param reducesoptimizationParams the amount by which the rack-awareoptional optimizationconfiguration canparameters shuffleto tasksapply around,
 at the cost of higher
   */
    public static void * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,optimizeRackAwareStandbyTasks(final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
                                                     final RackAwareOptimizationParams optimizationParams);

    /**
     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
     *
 and {@code nonOverlapCost} to a* positive@param value,applicationState the resultingmetadata assignmentand willother beinfo identicaldescribing tothe thecurrent inputapplication assignment.state
     * <p>
     * @return *a Thisnew methodmap optimizescontaining cross-rackan trafficassignment forthat standbyreplicates tasksexactly only.the Forprevious activeassignment taskreported optimization,
in the applicationState
     */
 use {@link #optimizeRackAwareActiveTasks}.
 public static Map<ProcessId, KafkaStreamsAssignment> * identityAssignment(final ApplicationState applicationState);

    /**
 *  @param KafkaStreamsAssignments the* currentValidate assignmentthe of tasks to KafkaStreams clients
     * @param applicationState        the metadata and other info describing the current application statepassed-in {@link TaskAssignment} and return an {@link AssignmentError} representing the
     * first error detected in the assignment, or {@link AssignorError.NONE} if the assignment passes the
     * verification check.
     * @return a* new<p>
 map containing the mappings from* KafkaStreamsAssignmentsNote: updatedthis withverification theis defaultperformed rack-awareautomatically assignmentby forthe standyStreamsPartitionAssignor tasks
on the    */assignment
    public static* Map<ProcessID,returned KafkaStreamsAssignment>by optimizeRackAwareStandbyTasks(final ApplicationState applicationState,
           the TaskAssignor, and the error returned to the assignor via the {@link TaskAssignor#onAssignmentComputed}
     * callback. Therefore, it is not required to call this manually from the {@link TaskAssignor#assign} method.
     * However if an invalid assignment is returned it will fail the rebalance and kill the thread, so it may be useful to
     * utilize this method in an assignor to verify the assignment before returning it and fix any errors     it finds.
  final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments);

    /**
     * Return@param a "no-op" assignment that just copies the previousapplicationState The application for which this task assignment ofis tasks to KafkaStreams clientsbeing assessed.
     *
 @param taskAssignment   *The @paramtask applicationStateassignment thethat metadatawill andbe othervalidated.
 info describing the current application state*
     *
     * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     @return {@code AssignmentError.NONE} if the assignment created for this application is valid,
     *         or another {@code AssignmentError} otherwise.
     */
    public static Map<ProcessID, KafkaStreamsAssignment> identityAssignmentAssignmentError validateTaskAssignment(final ApplicationState applicationState);
 },
                                                         final TaskAssignment taskAssignment) {
  }

TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.(either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

RackAwareOptimizationParams

A simple config container for necessary paramaters and optional overrides to apply when running the active or standby task rack-aware optimizations.

Code Block
languagejava
titleRackAwareOptimizationParams
public static final class RackAwareOptimizationParams {
    private final ApplicationState applicationState;
    private final Optional<Integer> trafficCostOverride;
    private final Optional<Integer> nonOverlapCostOverride;
    private final Optional<SortedSet<TaskId>> tasksToOptimize;


    /**       
     * Return a new config object with no overrides and the tasksToOptimize initialized to the set of all tasks in the given ApplicationState       
     */        
     public static RackAwareOptimizationParams of(final ApplicationState applicationState);        

    /**       
     * Return a new config object with the tasksToOptimize set to all stateful tasks in the given ApplicationState       
     */  
     public RackAwareOptimizationParams forStatefulTasks();  
     
    /**
     * Return a new config object with the tasksToOptimize set to all stateless tasks in the given ApplicationState
     */  
    public RackAwareOptimizationParams forStatelessTasks();

    /**
     * Return a new config object with the provided tasksToOptimize
     */ 
     public RackAwareOptimizationParams forTasks(final SortedSet<TaskId> tasksToOptimize);

    /**
     * Return a new config object with the provided trafficCost override applied
     */ 
    public RackAwareOptimizationParams withTrafficCostOverride(final int trafficCostOverride);

    /**
     * Return a new config object with the provided nonOverlapCost override applied
     */
    public RackAwareOptimizationParams withNonOverlapCostOverride(final int nonOverlapCostOverride);
}

AssignmentConfigs

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public intOptionalInt trafficCost();
    public intOptionalInt nonOverlapCost();
    public String rackAwareAssignmentStrategy();
 }


Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState#kafkaStreamsStates . Users who request the task lags via the computeTaskLags  input flag should make sure to handle failures the way they desire, and can rethrow a thrown TaskAssignmentException  (or just not catch it in the first place) to have Kafka Streams automatically "retry" the rebalance by returning the same assignment and scheduling an immediate followup rebalance. Advanced users who want more control over the "fallback" assignment and/or the timing of immediate followup rebalance(s) can simply swallow the TaskAssignmentException  and use the followupRebalanceDeadline  to schedule followup rebalances, eg to implement a retry/backoff policy

...

  1. ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES :  multiple KafkaStreams clients assigned with the same active taskACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS : active task and standby task assigned to the same KafkaStreams client
  2. INVALID_STANDBY_TASK: stateless task assigned as a standby task
  3. MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment
  4. UNKNOWN_PROCESS_ID : unrecognized ProcessId  not matching any of the participating consumers
  5. UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned

...