Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  /**
   * 
   */
  enum AssignmentError {     
	NONE,
	ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,     
	ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
	INVALID_STANDBY_TASK,
    UNKNOWN_PROCESS_ID,
	UNKNOWN_TASK_ID
  }

  /**
   * @param applicationState the metadata for this Kafka Streams application
   *
   * @return the assignment of active and standby tasks to KafkaStreams clients 
   *
   * @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried,
   *                                 you can throw this exception to keep the assignment unchanged and automatically
   *                                 schedule an immediate followup rebalance. 
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This callback can be used to observe the final assignment returned to the brokers and check for any errors that 
   * were detected while processing the returned assignment. If any errors were found, the corresponding 
   * will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will
   * be thrown up to kill the StreamThread and can be handled as any other uncaught exception would if the application
   * has registered a {@link StreamsUncaughtExceptionHandler}.
   * 
   * @param assignment:   the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   * @param error:        the corresponding error type if one was detected while processing the returned assignment,  
   *                      or AssignmentError.NONE if the returned assignment was valid
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<KafkaStreamsAssignment> assignment();
  }
}

...

As noted in the TaskAssignor  javadocs, the StreamsPartitionAssignor will verify the assignment returned by the task assignor and return an error via #onAssignmentComputed  if any of the following cases are observed while processing the TaskAssignor 's assignment:

...

If any of these errors are detected, the StreamsPartitionAssignor will throw an exception after returning the error code via the #onAssignmentComputed  callback. This error will be bubbled up through the StreamThread to the uncaught exception handler where the user can choose how to react from there, same as any other exception.

If no error is detected, the AssignmentError  code NONE  will be returned in the #onAssignmentComputed  callback.

Consumer Assignments

One major decision in this KIP was whether to encompass the assignment of tasks to consumers/threads within each KafkaStreams client, or to leave that up to the StreamsPartitionAssignor and only carve out the KafkaStreams-level assignment for pluggability. Ultimately we decided on the latter, for several reasons:

...