Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {    

   /**
     * NONE: no error detected
     * ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task
     * ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client
     * INVALID_STANDBY_TASK: stateless task assigned as a standby task
     * MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment
     * UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers
     * UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned
     */   
    enum AssignmentError {     
	    NONE,
	    ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,     
	    ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
	    INVALID_STANDBY_TASK,
        MISSING_PROCESS_ID,
        UNKNOWN_PROCESS_ID,
	    UNKNOWN_TASK_ID
  }

  /**
   * @param applicationState the metadata for this Kafka Streams application
   *
   * @return the assignment of active and standby tasks to KafkaStreams clients 
   *
   * @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried,
   *                                 you can throw this exception to keep the assignment unchanged and automatically
   *                                 schedule an immediate followup rebalance. 
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This callback can be used to observe the final assignment returned to the brokers and check for any errors that 
   * were detected while processing the returned assignment. If any errors were found, the corresponding 
   * will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will
   * be thrown up to kill the StreamThread and can be handled as any other uncaught exception would if the application
   * has registered a {@link StreamsUncaughtExceptionHandler}.
   * <p>
   * Note: some kinds of errors will make it impossible for the StreamsPartitionAssignor to parse the TaskAssignment
   * that was returned from the TaskAssignor's {@link #assign}. If this occurs, the {@link GroupAssignment} passed
   * in to this callback will contain an empty map instead of the consumer assignments.
   * 
   * @param assignment:   the final consumer assignments returned to the kafka broker, or an empty assignment map if
   *                      an error prevented the assignor from converting the TaskAssignment into a GroupAssignment
   * @param subscription: the original consumer subscriptions passed into the assignor
   * @param error:        the corresponding error type if one was detected while processing the returned assignment,  
   *                      or AssignmentError.NONE if the returned assignment was valid
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<KafkaStreamsAssignment> assignment();
  }
}

...

Code Block
languagejava
titleKafkaStreamsAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple container class for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public class KafkaStreamsAssignment {

  /* 
   * Construct an instance of KafkaStreamsAssignment with this processId and the given set of
   * assigned tasks. If you want this KafkaStreams client to request a followup rebalance, you
   * can set the followupRebalanceDeadline via the {@link #withFollowupRebalance(Instant)} API.
   *
   * @param processId the processId for the KafkaStreams client that should receive this assignment
   * @param assignment the set of tasks to be assigned to this KafkaStreams client
   *
   * @return a new KafkaStreamsAssignment object with the given processId and assignment
   */
  public static KafkaStreamsAssignment of(final ProcessId processId, final Set<AssignedTask> assignment);

  /**
   * This API can be used to request that a followup rebalance be triggered by the KafkaStreams client 
   * receiving this assignment. The followup rebalance will be initiated after the provided deadline
   * has passed, although it will always wait until it has finished the current rebalance before 
   * triggering a new one. This request will last until the new rebalance, and will be erased if a
   * new rebalance begins before the scheduled followup rebalance deadline has elapsed. The next
   * assignment must request the followup rebalance again if it still wants to schedule one for
   * the given instant, otherwise no additional rebalance will be triggered after that.
   * 
   * @param rebalanceDeadline the instant after which this KafkaStreams client will trigger a followup rebalance
   *
   * @return a new KafkaStreamsAssignment object with the same processId and assignment but with the given rebalanceDeadline
   */
  public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline);

  public ProcessID processId();

  public Set<AssignedTask>Map<TaskId, AssignedTask> tasks();

  public void assignTask(AssignedTask);

  public void removeTask(AssignedTask);
 
  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  public Instant followupRebalanceDeadline();

  public static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        ACTIVE,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

...

Code Block
languagejava
titleApplicationState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /**
     * @param computeTaskLags whether or not to include task lag information in the returned metadata. Note that passing 
     * in "true" will result in a remote call to fetch changelog topic end offsets and you should pass in "false" unless
     * you specifically need the task lag information.
     *
     * @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
     *
     * @throws TaskAssignmentException if a retriable error occurs while computing KafkaStreamsState metadata. Re-throw
     *                                 this exception to have Kafka Streams retry the rebalance by returning the same
     *                                 assignment and scheduling an immediate followup rebalance
     */
    Map<ProcessID, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return thea setmap of task ids to all tasks in this topology whichto must be assigned
     */
    Set<TaskInfo>Map<TaskId, TaskInfo> allTasks();

}

TaskInfo

A small interface with metadata for each task to be assigned will be used to pass along information about stateful vs stateless tasks, the mapping of input and changelog topic partitions to tasks, and other essential info such as the rack ids for each topic partition belonging to a given task.

...

  1. ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES :  multiple KafkaStreams clients assigned with the same active taskACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS : active task and standby task assigned to the same KafkaStreams client
  2. INVALID_STANDBY_TASK: stateless task assigned as a standby task
  3. MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment
  4. UNKNOWN_PROCESS_ID : unrecognized ProcessId  not matching any of the participating consumers
  5. UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned

...