Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleApplicationState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /**
     * @param computeTaskLags whether or not to include task lag information in the returned metadata. Note that passing 
     * in "true" will result in a remote call to fetch changelog topic end offsets and you should pass in "false" unless
     * you specifically need the task lag information.
     *
     * @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
     *
     * @throws TaskAssignmentException if a retriable error occurs while computing KafkaStreamsState metadata. Re-throw
     *                                 this exception to have Kafka Streams retry the rebalance by returning the same
     *                                 assignment and scheduling an immediate followup rebalance
     */
    Map<ProcessID, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned
     */
    Set<TaskId>Set<TaskI
nfo> allTasks();

}

TaskInfo

A small interface with metadata for each task to be assigned will be used to pass along information about stateful vs stateless tasks, the mapping of input and changelog topic partitions to tasks, and other essential info such as the rack ids for each topic partition belonging to a given task.

Code Block
languagejava
titleTaskInfo
/**
 * A simple container class  /**
     *
     * @return the set of stateful and changelogged tasks in this topology
     */
    Set<TaskId> statefulTasks();

    /**
     *
     * @return the set of stateless or changelog-less tasks in this topology
     */
    Set<TaskId> statelessTasks();
corresponding to a given {@link TaskId}.
 * Includes metadata such as whether it's stateful and the names of all state stores
 * belonging to this task, the set of input topic partitions and changelog topic partitions
 * for all logged state stores, and the rack ids of all replicas of each topic partition
 * in the task.
 */
public interface TaskInfo {

    TaskId id();

    boolean isStateful();

    Set<String> stateStoreNames();

    Set<TopicPartition> inputTopicPartitions();

    Set<TopicPartition> changelogTopicPartitions();

    Map<TopicPartition, Set<String>> partitionToRackIds();
}


TaskAssignmentUtils

We'll also move some of the existing assignment functionality into a utils class that can be called by implementors of the new TaskAssignor . This will allow users to more easily adapt or modify pieces of the complex existing assignment algorithm, without having to re-implement the entire thing from scratch. 

...