Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTaskAssignmentUtils
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to KafkaStreams clients according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment
     */
    public static Map<ProcessID, KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, 
                                                                                      final Map<ProcessID, KafkaStreamsAssignment> KafkaStreamsAssignments);

    /**
     * Optimize the active task assignment for rack- awareness
. This optimization is based on *the 
     * @param applicationState  {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
      the metadata* and other info describing the current application state{@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * @paramconfigs kafkaStreamsAssignmentswhich thebalance currentcross assignmentrack oftraffic tasksminimization toand KafkaStreamstask clientsmovement.
     * @param tasks* Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
  the  set of* tasksassignment, tobut reassigncan ifincrease possible.the Mustnumber alreadyof betasks assignedshuffled toaround abetween KafkaStreamsclients. client
     *
 Setting    * @return{@code nonOverlapCost} to a newlarger mapnumber containingincreases the mappingsaffinity fromof KafkaStreamsAssignmentstasks updatedto withtheir theintended defaultclient
 rack-aware assignment for active tasks
* and    */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final ApplicationState applicationState, 
      reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
     * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment.
     * <p>
     * This method optimizes cross-rack traffic for active tasks only. For standby task optimization,
     * use {@link #optimizeRackAwareStandbyTasks}.
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param tasks                   the set of tasks to reassign if possible. Must already be assigned to a KafkaStreams client
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for active tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final ApplicationState applicationState, 
                                                                                      final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, 
                                                                                      final SortedSet<TaskId> tasks);      /**
     * Optimize standby task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
     * assignment, but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
   final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment.
     * <p>
     * This method optimizes cross-rack traffic finalfor SortedSet<TaskId>standby tasks);

    /** only. For active task optimization,
     * Optimizeuse the standby task assignment for rack-awareness{@link #optimizeRackAwareActiveTasks}.
     * 
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param applicationState        the metadata and other info describing the current application state
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for standy tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(final ApplicationState applicationState,
                                                                                       final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments);

    /**
     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
     *
     * @param applicationState the metadata and other info describing the current application state
     *
     * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     */
    public static Map<ProcessID, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState);
 }

...