Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  enum AssignmentError {     
	NONE,
	ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,     
	ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
	INVALID_STANDBY_TASK,
    UNKNOWN_PROCESS_ID,
	UNKNOWN_TASK_ID
  }

  /**
   * @param applicationState the metadata for this Kafka Streams application
   *
   * @return the assignment of active and standby tasks to KafkaStreams clients 
   *
   * @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried,
   *                                 you can throw this exception to keep the assignment unchanged and automatically
   *                                 schedule an immediate followup rebalance. 
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This callback can be used to observe the final assignment returned to the brokers and check for any errors that 
   * were detected while processing the returned assignment. If any errors were found, the corresponding 
   * will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will
   * be thrown up to kill the StreamThread and can be handled as any other uncaught exception would if the application
   * has registered a {@link StreamsUncaughtExceptionHandler}.
   * 
   * @param assignment:   the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   * @param error:        the corresponding error type if one was detected while processing the returned assignment,  
   *                      or AssignmentError.NONE if the returned assignment was valid
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<KafkaStreamsAssignment> assignment();
  }
}

...

Code Block
languagejava
titleNodeAssignmentKafkaStreamsAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public interface KafkaStreamsAssignment {
  ProcessID processId();

  Set<AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  Instant followupRebalanceDeadline();

  static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        ACTIVE,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

...

Code Block
languagejava
titleProcessIDProcessId
package org.apache.kafka.streams.processor.assignment; 

/** A simple wrapper around UUID that abstracts a Process IDId */
public class ProcessIDProcessId {

    public ProcessIDProcessId(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }
}
 

...

Code Block
languagejava
titleNodeStateKafkaStreamsState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface KafkaStreamsState {
  /**
   * @return the processId of the application instance running on this KafkaStreams client
   */
  ProcessID processId();

  /**
   * Returns the number of processing threads available to work on tasks for this KafkaStreams client, 
   * which represents its overall capacity for work relative to other KafkaStreams clients.
   *
   * @return the number of processing threads on this KafkaStreams client
   */
  int numProcessingThreads();

  /**
   * @return the set of consumer client ids for this KafkaStreams client
   */
  SortedSet<String> consumerClientIds();

  /**
   * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance
   */
  SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return end offset sum - offset sum
   *          Task.LATEST_OFFSET if this was previously an active running task on this client
   */
  long lagFor(final TaskId task);

  /**
   * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
   */
  SortedSet<TaskId> prevTasksByLag(final String consumerClientId);

  /**
   * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
   * and the current offset, summed across all logged state stores in the task.
   *
   * @return a map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of this KafkaStreams client, if set via the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the host info for this KafkaStreams client if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  /**
   * The client tags for this KafkaStreams client, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();
 }

...

Code Block
languagejava
titleApplicationMetadataTaskAssignmentUtils
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to KafkaStreams clients according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment
     */
    public static Map<ProcessID, KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, 
                                                                                      final Map<ProcessID, KafkaStreamsAssignment> KafkaStreamsAssignments);

    /**
     * Optimize the active task assignment for rack-awareness
     *
     * @param applicationState        the metadata and other info describing the current application state
     * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param tasks                   the set of tasks to reassign if possible. Must already be assigned to a KafkaStreams client
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for active tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final ApplicationState applicationState, 
                                                                                      final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments, 
                                                                                      final SortedSet<TaskId> tasks);

    /**
     * Optimize the standby task assignment for rack-awareness
     *
     * @param KafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
     * @param applicationState        the metadata and other info describing the current application state
     *
     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default rack-aware assignment for standy tasks
     */
    public static Map<ProcessID, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(final ApplicationState applicationState,
                                                                                       final Map<ProcessID, KafkaStreamsAssignment> kafkaStreamsAssignments);

    /**
     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
     *
     * @param applicationState the metadata and other info describing the current application state
     *
     * @return a new map containing an assignment that replicates exactly the previous assignment reported in the applicationState
     */
    public static Map<ProcessID, KafkaStreamsAssignment> identityAssignment(final ApplicationState applicationState);
 }

...

  1. ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES :  multiple KafkaStreams clients assigned with the same active task
  2. OVERLAPPINGACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS : active task and standby task assigned to the same KafkaStreams client
  3. INVALID_STANDBY_TASK: stateless task assigned as a standby task
  4. UNKNOWN_PROCESS_ID : unrecognized ProcessId  not matching any of the participating consumers
  5. UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned

If any of these errors are detected, the StreamsPartitionAssignor will immediately "fail" the rebalance and retry it by scheduling an immediate followup rebalance. If this occurs, the input assignment will be used as the new assignment, and the corresponding error will be returned from the #onAssignmentComputed API

...