Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following classes/interfaces will need to be implemented by the user in order to plug in a custom task assignor

TaskAssignor

The first and most important interface for users to implement in the TaskAssignor itself:

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  enum AssignmentError {
     OVERLAPPING_CLIENT,
    OVERLAPPING_STANDBY  
	NONE,
	ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,     
	ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
    UNKNOWN_PROCESS_ID
  }

  /**
   * @param applicationState the metadata for this Kafka Streams application
   *
   * @return the assignment of active and standby tasks to KafkaStreams clients 
   *
   * @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried,
   *                                 you can throw this exception to keep the assignment unchanged and automatically
   *                                 schedule an immediate followup rebalance. 
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This callback can be used to observe the final assignment returned to the brokers and observecheck for any errors that 
   * were detected while processing the returned assignment. If any errors were found, the rebalance corresponding 
   * will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will
   * be automaticallythrown retriedup byto returningkill the sameStreamThread assignmentand ascan thebe input,handled schedulingas anany immediateother followupuncaught rebalance,
exception would if *the andapplication
 passing in the* correspondinghas AssignmentErrorregistered ina this{@link callbackStreamsUncaughtExceptionHandler}.
   * 
   * @param assignment:   the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   * @param error:        the corresponding error type if one was detected while processing the returned assignment,  
   *                      or nullAssignmentError.NONE if the returned assignment was valid
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<KafkaStreamsAssignment> assignment();
  }
}

...

This gives us the following two new top-level public interfaces, KafkaStreamsState  and KafkaStreamsAssignment :

KafkaStreamsAssignment

First Next we have the KafkaStreamsAssignment interface, representing the output of the assignment

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public interface KafkaStreamsAssignment {
  ProcessID processId();

  Set<AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  Instant followupRebalanceDeadline();

  static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        STATELESSACTIVE,
        STATEFUL,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

...

Code Block
languagejava
titleProcessID
package org.apache.kafka.streams.processor.assignment; 

/** A simple wrapper around UUID that abstracts a Process ID */
public class ProcessID {

    public ProcessID(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }
}
 

KafkaStreamsState

Next we have the KafkaStreamsState  interface, representing the input to the assignor:

...

The KafkaStreamsState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final next new interface, the ApplicationState . The methods on the ApplicationState  are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /**
     * @param computeTaskLags whether or not to include task lag information in the returned metadata. Note that passing 
     * in "true" will result in a remote call to fetch changelog topic end offsets and you should pass in "false" unless
     * you specifically need the task lag information.
     *
     * @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
     *
     * @throws TaskAssignmentException if a retriable error occurs while computing KafkaStreamsState metadata. Re-throw
     *                                 this exception to have Kafka Streams retry the rebalance by returning the same
     *                                 assignment and scheduling an immediate followup rebalance
     */
    Map<ProcessID, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned
     */
    Set<TaskId> allTasks();

    /**
     *
     * @return the set of stateful and changelogged tasks in this topology
     */
    Set<TaskId> statefulTasks();

    /**
     *
     * @return the set of stateless or changelog-less tasks in this topology
     */
    Set<TaskId> statelessTasks();

}

TaskAssignmentUtils

We'll also move some of the existing assignment functionality into a utils class that can be called by implementors of the new TaskAssignor . This will allow users to more easily adapt or modify pieces of the complex existing assignment algorithm, without having to re-implement the entire thing from scratch. 

...

As noted in the TaskAssignor  javadocs, the StreamsPartitionAssignor will verify the assignment returned by the task assignor and return an error if any of the following cases are observed while processing the TaskAssignor 's assignment:

  1. OVERLAPPING_CLIENT ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES :  multiple KafkaStreams clients assigned with the same active task
  2. OVERLAPPING_STANDBY : active task and standby task assigned to the same KafkaStreams client
  3. UNKNOWN_PROCESS_ID : unrecognized ProcessId  not matching any of the participating consumers

...