...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.processor.assignment; /** * A set of utilities to help implement task assignment */ public final class TaskAssignmentUtils { /** * Assign standby tasks to KafkaStreams clients according to the default logic. * <p> * If rack-aware client tags are configured, the rack-aware standby task assignor will be used * * @param applicationState the metadata and other info describing the current application state * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients * * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment */ public static Map<ProcessID, KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, final Map<ProcessID, KafkaStreamsAssignment> KafkaStreamsAssignments); /** * Optimize the active task assignment for rack- awareness . This optimization is based on *the * @param applicationState {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} the metadata* and other info describing the current application state{@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost} * @paramconfigs kafkaStreamsAssignmentswhich thebalance currentcross assignmentrack oftraffic tasksminimization toand KafkaStreamstask clientsmovement. * @param tasks* Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting the set of* tasksassignment, tobut reassigncan ifincrease possible.the Mustnumber alreadyof betasks assignedshuffled toaround abetween KafkaStreamsclients. client * Setting * @return{@code nonOverlapCost} to a newlarger mapnumber containingincreases the mappingsaffinity fromof KafkaStreamsAssignmentstasks updatedto withtheir theintended defaultclient rack-aware assignment for active tasks * and */ public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final ApplicationState applicationState, reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher * cross-rack traffic. * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value, * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0, * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment. * <p> * This method optimizes cross-rack traffic for active tasks only. For standby task optimization, * use {@link #optimizeRackAwareStandbyTasks}. * * @param applicationState the metadata and other info describing the current application state * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients * @param tasks the set of tasks to reassign if possible. Must already be assigned to a KafkaStreams client * * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for active tasks */ public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final ApplicationState applicationState, final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, final SortedSet<TaskId> tasks); /** * Optimize standby task assignment for rack awareness. This optimization is based on the * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost} * configs which balance cross rack traffic minimization and task movement. * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting * assignment, but can increase the number of tasks shuffled around between clients. * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, * cross-rack traffic. * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value, * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0, * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment. * <p> * This method optimizes cross-rack traffic finalfor SortedSet<TaskId>standby tasks); /** only. For active task optimization, * Optimizeuse the standby task assignment for rack-awareness{@link #optimizeRackAwareActiveTasks}. * * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients * @param applicationState the metadata and other info describing the current application state * * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for standy tasks */ public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(final ApplicationState applicationState, final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments); /** * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients * * @param applicationState the metadata and other info describing the current application state * * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState */ public static Map<ProcessID, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState); } |
...