Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTaskAssignmentUtils
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to KafkaStreams clients according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param KafkaStreamsAssignments the currentKafkaStreams assignmentclient ofassignments tasks to KafkaStreamsadd clients
standby tasks    *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment
 to
     */
    public static Map<ProcessID,void KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, 
                                                                                      final Map<ProcessIDMap<ProcessId, KafkaStreamsAssignment> KafkaStreamsAssignments);

    /**
     * Optimize active task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
     * assignment, but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
     * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment.    
     * <p>
     * Note: this method will modify the input {@link KafkaStreamsAssignment} objects and return the same map.
     * It does not make a copy of the map or the KafkaStreamsAssignment objects.      
     * <p>
     * This method optimizes cross-rack traffic for active tasks only. For standby task optimization,
     * use {@link #optimizeRackAwareStandbyTasks}.
     * <p>
     * @paramIt applicationStateis recommended to run this optimization before assigning theany metadatastandby andtasks, otherespecially infoif describingyou the current application statehave configured
     * @paramyour kafkaStreamsAssignmentsKafkaStreams theclients currentwith assignment oftags tasksvia to KafkaStreams clients
     * @param tasksthe rack.aware.assignment.tags config since this method may
     * shuffle around active tasks without considering the client tags and can result in a theviolation setof ofthe tasksoriginal
 to reassign if possible. Must* alreadyclient betag assigned to a KafkaStreams clientassignment's constraints.
     *
     * @param kafkaStreamsAssignments the assignment  
     * @return a map with the KafkaStreamsAssignments updated to minimize cross-rack traffic for active tasksof tasks to KafkaStreams clients to be optimized
     * @param optimizationParams      optional configuration parameters to apply 
     */
    public static Map<ProcessID, KafkaStreamsAssignment>void optimizeRackAwareActiveTasks(final ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
                                                    final RackAwareOptimizationParams optimizationParams);      

    /**
     * Optimize standby task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, 
                   task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
     * assignment, but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
     final* SortedSet<TaskId> tasks);      

    /**
     * Optimize standby task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG@code nonOverlapCost}
 to a positive value, *the configsresulting whichassignment balancewill crossbe rackidentical trafficto minimizationthe andinput task movementassignment.
     * Setting<p>
 {@code trafficCost} to a larger* numberThis reducesmethod the overalloptimizes cross -rack traffic offor thestandby resultingtasks 
only. For active task  * assignmentoptimization,
 but can increase the number* ofuse tasks shuffled around between clients. {@link #optimizeRackAwareActiveTasks}.
     * Setting
 {@code nonOverlapCost} to a larger* number@param increasesKafkaStreamsAssignments the current affinityassignment of tasks to theirKafkaStreams intended clientclients
     * @param *optimizationParams and reduces the amount by whichoptional theconfiguration rack-awareparameters optimizationto canapply shuffle
 tasks around, at the cost of higher    */
    public static void optimizeRackAwareStandbyTasks(final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
                                                     final RackAwareOptimizationParams optimizationParams);

    /**
     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
     *
     * cross-rack traffic. @param applicationState the metadata and other info describing the current application state
     *
 In an  extreme * case,@return ifa wenew setmap {@codecontaining nonOverlapCost}an toassignment 0that andreplicates @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment.
     * <p>exactly the previous assignment reported in the applicationState
     */
    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState);

    /**
     * Validate the passed-in {@link TaskAssignment} and return an {@link AssignmentError} representing the
     * first error detected in the assignment, or {@link AssignorError.NONE} if the assignment passes the
     * Thisverification methodcheck.
 optimizes cross-rack traffic for standby* tasks<p>
 only. For active task optimization,
* Note: this verification is *performed useautomatically {@link #optimizeRackAwareActiveTasks}.
     * by the StreamsPartitionAssignor on the assignment
     * @paramreturned KafkaStreamsAssignmentsby the TaskAssignor, currentand assignmentthe oferror tasksreturned to KafkaStreamsthe clients
assignor via the {@link TaskAssignor#onAssignmentComputed}
 * @param applicationState  * callback. Therefore, it is not therequired metadatato andcall otherthis infomanually describingfrom the current{@link applicationTaskAssignor#assign} statemethod.
     *
 However  if  *an @returninvalid aassignment newis mapreturned containingit thewill mappingsfail fromthe KafkaStreamsAssignmentsrebalance updatedand withkill the default rack-aware assignment for standy tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(final ApplicationState applicationState,
                     thread, so it may be useful to
     * utilize this method in an assignor to verify the assignment before returning it and fix any errors it finds.
     *
     * @param applicationState The application for which this task assignment is being assessed.
     * @param taskAssignment   The task assignment that will be validated.
     *
     * @return {@code AssignmentError.NONE} if the assignment created for this application is valid,
     * final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments);

    /**
   or another * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients{@code AssignmentError} otherwise.
     */
    public static AssignmentError validateTaskAssignment(final ApplicationState applicationState,
     *
     * @param applicationState the metadata and other info describing the current application state
     *
     * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     */
    public static Map<ProcessID, KafkaStreamsAssignment> identityAssignment(final ApplicationStateTaskAssignment applicationStatetaskAssignment); {
  }

TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

...