...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.processor.assignment; /** * A set of utilities to help implement task assignment */ public final class TaskAssignmentUtils { /** * Assign standby tasks to KafkaStreams clients according to the default logic. * <p> * If rack-aware client tags are configured, the rack-aware standby task assignor will be used * * @param applicationState the metadata and other info describing the current application state * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients * * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment */ public static Map<ProcessID, KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, final Map<ProcessID, KafkaStreamsAssignment> KafkaStreamsAssignments); /** * Optimize active task assignment for rack awareness. This optimization is based on the * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost} * configs which balance cross rack traffic minimization and task movement. * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting * assignment, but can increase the number of tasks shuffled around between clients. * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher * cross-rack traffic. * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value, * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0, * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment. * <p> * Note: this method will modify the input {@link KafkaStreamsAssignment} objects and return the same map. * It does not make a copy of the map or the KafkaStreamsAssignment objects. * <p> * This method optimizes cross-rack traffic for active tasks only. For standby task optimization, * use {@link #optimizeRackAwareStandbyTasks}. * * @param applicationState the metadata and other info describing the current application state * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients * @param tasks the set of tasks to reassign if possible. Must already be assigned to a KafkaStreams client * * @return a new map containingwith the mappings from KafkaStreamsAssignments updated withto theminimize default cross-rack-aware assignmenttraffic for active tasks */ public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final ApplicationState applicationState, final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, final SortedSet<TaskId> tasks); /** * Optimize standby task assignment for rack awareness. This optimization is based on the * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost} * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG nonOverlapCost} * configs which balance cross rack traffic minimization and task movement. * Setting {@code trafficCost} to a larger number reduces the overall cross rack traffic of the resulting * assignment, but can increase the number of tasks shuffled around between clients. * Setting {@code nonOverlapCost} to a larger number increases the affinity of tasks to their intended client * and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher * cross-rack traffic. * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} to a positive value, * the resulting assignment will have an absolute minimum of cross rack traffic. If we set {@code trafficCost} to 0, * and {@code nonOverlapCost} to a positive value, the resulting assignment will be identical to the input assignment. * <p> * This method optimizes cross-rack traffic for standby tasks only. For active task optimization, * use {@link #optimizeRackAwareActiveTasks}. * * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients * @param applicationState the metadata and other info describing the current application state * * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for standy tasks */ public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(final ApplicationState applicationState, final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments); /** * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients * * @param applicationState the metadata and other info describing the current application state * * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState */ public static Map<ProcessID, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState); } |
...