Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Accepted Adopted

Discussion thread: here 

JIRA: KAFKA-15045

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleTaskAssignmentUtils
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to KafkaStreams clients according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param KafkaStreamsAssignments the currentKafkaStreams assignmentclient of tasksassignments to KafkaStreamsadd clients
standby     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignmenttasks to
     */
    public static Map<ProcessID, KafkaStreamsAssignment>void defaultStandbyTaskAssignment(final ApplicationState applicationState, 
                                                                                      final Map<ProcessIDMap<ProcessId, KafkaStreamsAssignment> KafkaStreamsAssignments);

    /**
     * Optimize active task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
     * assignment, but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
     * cross-rack traffic.
     * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment.    
     * <p>
     * Note: this method will modify the input {@link KafkaStreamsAssignment} objects and return the same map.
     * It does not make a copy of the map or the KafkaStreamsAssignment objects.      
     * <p>
     * This method optimizes cross-rack traffic for active tasks only. For standby task optimization,
     * use {@link #optimizeRackAwareStandbyTasks}.
     * <p>
     * @paramIt applicationStateis recommended to run this optimization before assigning theany metadatastandby andtasks, otherespecially infoif describingyou the current application statehave configured
     * @paramyour kafkaStreamsAssignmentsKafkaStreams theclients currentwith assignment oftags tasksvia to KafkaStreams clients
     * @param tasksthe rack.aware.assignment.tags config since this method may
     * shuffle around active tasks without considering the client tags and can result in a theviolation setof ofthe tasksoriginal
 to reassign if possible. Must* alreadyclient betag assigned to a KafkaStreams clientassignment's constraints.
     *
     * @param kafkaStreamsAssignments the assignment  
     * @return a map with the KafkaStreamsAssignments updated to minimize cross-rack traffic for active tasksof tasks to KafkaStreams clients to be optimized
     * @param optimizationParams      optional configuration parameters to apply 
     */
    public static Map<ProcessID,void KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final Map<ProcessId, ApplicationStateKafkaStreamsAssignment> applicationStatekafkaStreamsAssignments, 
                                                    final RackAwareOptimizationParams optimizationParams);      

    /**
     * Optimize standby task assignment for rack awareness. This optimization is based on the 
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, task movement.
     * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting 
     * assignment, but can increase the number of tasks shuffled around between clients. 
     * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client
     * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
  final SortedSet<TaskId> tasks);  *    

cross-rack traffic.
     /**
 In an extreme case, *if Optimizewe standbyset task{@code assignmentnonOverlapCost} forto rack0 awareness. This optimization is based on the and @{code trafficCost} to a positive value,
     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} 
     * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost}
     * configs which balance cross rack traffic minimization and task movement.the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,
     * Settingand {@code trafficCostnonOverlapCost} to a largerpositive numbervalue, reducesthe theresulting overallassignment crosswill rackbe trafficidentical ofto the resultinginput assignment.
     * assignment,<p>
 but can increase the number* ofThis tasksmethod shuffledoptimizes aroundcross-rack betweentraffic clients.for 
standby tasks only. For active *task Settingoptimization,
 {@code nonOverlapCost} to a larger* numberuse increases the affinity of tasks to their intended client{@link #optimizeRackAwareActiveTasks}.
     * 
     * and@param reducesKafkaStreamsAssignments the amountcurrent byassignment whichof thetasks rack-awareto optimizationKafkaStreams canclients
 shuffle tasks  around, * at@param theoptimizationParams cost of higher
   optional configuration * cross-rack traffic.parameters to apply 
     */
  * In anpublic extremestatic case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value,
     * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0,void optimizeRackAwareStandbyTasks(final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
                                                     final RackAwareOptimizationParams optimizationParams);

    /**
     * andReturn {@code nonOverlapCost} to a positive value,a "no-op" assignment that just copies the resultingprevious assignment willof be identicaltasks to the input assignment.KafkaStreams clients
     * <p>
     * This@param methodapplicationState optimizesthe cross-rackmetadata trafficand forother standbyinfo tasksdescribing only.the Forcurrent active task optimization,application state
     *
 use {@link #optimizeRackAwareActiveTasks}.
     * 
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param applicationState        the metadata and other info describing the current application state
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for standy tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(final ApplicationState applicationState,
                                                                                       final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments);

    /**
     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients   * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     */
    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState);

    /**
     * Validate the passed-in {@link TaskAssignment} and return an {@link AssignmentError} representing the
     * first error detected in the assignment, or {@link AssignorError.NONE} if the assignment passes the
     * verification check.
     * <p>
     * Note: this verification is performed automatically by the StreamsPartitionAssignor on the assignment
     * returned by the TaskAssignor, and the error returned to the assignor via the {@link TaskAssignor#onAssignmentComputed}
     * callback. Therefore, it is not required to call this manually from the {@link TaskAssignor#assign} method.
     * However if an invalid assignment is returned it will fail the rebalance and kill the thread, so it may be useful to
     * utilize this method in an assignor to verify the assignment before returning it and fix any errors it finds.
     *
     * @param applicationState theThe metadataapplication andfor otherwhich infothis describingtask theassignment currentis applicationbeing stateassessed.
     *
 @param  taskAssignment  * @return aThe new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     task assignment that will be validated.
     *
     * @return {@code AssignmentError.NONE} if the assignment created for this application is valid,
     *         or another {@code AssignmentError} otherwise.
     */
    public static Map<ProcessID, KafkaStreamsAssignment> identityAssignmentAssignmentError validateTaskAssignment(final ApplicationState applicationState);
 },
                                                         final TaskAssignment taskAssignment) {
  }

TaskAssignmentUtils  provides TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

RackAwareOptimizationParams

A simple config container for necessary paramaters and optional overrides to apply when running the active or standby task rack-aware optimizations.

Code Block
languagejava
titleRackAwareOptimizationParams
public static final class RackAwareOptimizationParams {
    private final ApplicationState applicationState;
    private final Optional<Integer> trafficCostOverride;
    private final Optional<Integer> nonOverlapCostOverride;
    private final Optional<SortedSet<TaskId>> tasksToOptimize;


    /**       
     * Return a new config object with no overrides and the tasksToOptimize initialized to the set of all tasks in the given ApplicationState       
     */        
     public static RackAwareOptimizationParams of(final ApplicationState applicationState);        

    /**       
     * Return a new config object with the tasksToOptimize set to all stateful tasks in the given ApplicationState       
     */  
     public RackAwareOptimizationParams forStatefulTasks();  
     
    /**
     * Return a new config object with the tasksToOptimize set to all stateless tasks in the given ApplicationState
     */  
    public RackAwareOptimizationParams forStatelessTasks();

    /**
     * Return a new config object with the provided tasksToOptimize
     */ 
     public RackAwareOptimizationParams forTasks(final SortedSet<TaskId> tasksToOptimize);

    /**
     * Return a new config object with the provided trafficCost override applied
     */ 
    public RackAwareOptimizationParams withTrafficCostOverride(final int trafficCostOverride);

    /**
     * Return a new config object with the provided nonOverlapCost override applied
     */
    public RackAwareOptimizationParams withNonOverlapCostOverride(final int nonOverlapCostOverride);
}

AssignmentConfigs

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public Optional<Integer>OptionalInt trafficCost();
    public Optional<Integer>OptionalInt nonOverlapCost();
    public String rackAwareAssignmentStrategy();
 }

...