Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  /**
   * @param applicationState the metadata for this Kafka Streams application
   *
   * @return the assignment of active and standby tasks to KafkaStreams clients 
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This callback can be used to observe the final assignment returned to the brokers.* @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried,
   * 
   * @param assignment: the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
you can throw this exception to keep the assignment unchanged and automatically
   *                          * @return the assignment of tasks to kafkaschedule streamsan clients
immediate followup rebalance. 
   */
  TaskAssignment  public Collection<KafkaStreamsAssignment> assignment(assign(ApplicationState applicationState);

  }
}

Another reason for introducing the new TaskAssignment and ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

To solve these problems, we plan to refactor the interface with two goals in mind:

  1. To provide a clean separation of input/output by splitting the ClientState into an input-only KafkaStreamsState metadata class and an output-only KafkaStreamsAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which KafkaStreams client(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new top-level public interfaces, KafkaStreamsState  and KafkaStreamsAssignment :

KafkaStreamsAssignment

First we have the KafkaStreamsAssignment interface, representing the output of the assignment

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public interface KafkaStreamsAssignment {
  ProcessID processId();

  Set<AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  Instant followupRebalanceDeadline();

  static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        STATELESS,
        STATEFUL,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

Read-only APIs

The following APIs are intended for users to read/use but do not need to be implemented in order to plug in a custom assignor

ProcessID

...

/**
   * This callback can be used to observe the final assignment returned to the brokers.
   * 
   * @param assignment: the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<KafkaStreamsAssignment> assignment();
  }
}

Another reason for introducing the new TaskAssignment and ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

To solve these problems, we plan to refactor the interface with two goals in mind:

  1. To provide a clean separation of input/output by splitting the ClientState into an input-only KafkaStreamsState metadata class and an output-only KafkaStreamsAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which KafkaStreams client(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new top-level public interfaces, KafkaStreamsState  and KafkaStreamsAssignment :

KafkaStreamsAssignment

First we have the KafkaStreamsAssignment interface, representing the output of the assignment

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public interface KafkaStreamsAssignment {
  ProcessID processId();

  Set<AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  Instant followupRebalanceDeadline();

  static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        STATELESS,
        STATEFUL,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

Read-only APIs

The following APIs are intended for users to read/use but do not need to be implemented in order to plug in a custom assignor

ProcessID

The  ProcessId  is a new wrapper class around the UUID to make things easier to understand:

Code Block
languagejava
titleProcessID
package org.apache.kafka.streams.processor.assignment; 

/** A simple wrapper around UUID that abstracts a Process ID */
public class ProcessID {

    public ProcessID(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }
}
 

KafkaStreamsState

Next we have the KafkaStreamsState  interface, representing the input to the assignor:

Code Block
languagejava
titleProcessIDNodeState
package org.apache.kafka.streams.processor.assignment; 

/** A simple wrapper around UUID that abstracts a Process ID */
public class ProcessID {

    public ProcessID(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }
}
 

KafkaStreamsState

Next we have the KafkaStreamsState  interface, representing the input to the assignor:

Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface KafkaStreamsState {assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface KafkaStreamsState {
  /**
   * @return the processId of the application instance running on this KafkaStreams client
   */
  ProcessID processId();

  /**
   * Returns the number of processing threads available to work on tasks for this KafkaStreams client, 
   * which represents its overall capacity for work relative to other KafkaStreams clients.
   *
   * @return the number of processing threads on this KafkaStreams client
   */
  int numProcessingThreads();

  /**
   * @return the set of consumer client ids for this KafkaStreams client
   */
  SortedSet<String> consumerClientIds();

  /**
   * @return the processIdset of all active thetasks applicationowned instanceby runningconsumers on this KafkaStreams client since the previous rebalance
    */
  ProcessIDSortedSet<TaskId> processIdpreviousActiveTasks();

  /**
   * Returns@return the numberset of consumerall clientsstandby intasks theowned consumerby groupconsumers foron this KafkaStreams client, 
since the  * which represents its overall capacity.previous rebalance
   * <p>/
   * NOTE: this is actually the "minimum capacity" of a KafkaStreams client, or the minimum number of assigned
   * active tasks below which the KafkaStreams client will have been over-provisioned and unable to give every
   * available consumer an active task assignmentSortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return the number@return ofend consumersoffset onsum this- KafkaStreamsoffset clientsum
   */
     int numConsumerClients();

  /**
  Task.LATEST_OFFSET *if @returnthis thewas setpreviously ofan consumeractive clientrunning idstask foron this KafkaStreams client
   */
  SortedSet<String>long consumerClientIdslagFor(final TaskId task);

  /**
   * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
   */
  SortedSet<TaskId> previousActiveTasksprevTasksByLag(final String consumerClientId);

  /**
   * @returnReturns thea setcollection ofcontaining all standby(and tasksonly) ownedstateful bytasks consumers on this KafkaStreams client since in the previoustopology rebalance
   */
  SortedSet<TaskId> previousStandbyTasks();

  /**by {@link TaskId},
   * Returnsmapped to theits total"offset lag acrosssum". allThis loggedis storescomputed inas the task.difference Equalbetween tothe thechangelog end offset
 sum if this* client
and the current *offset, didsummed notacross haveall anylogged state forstores in thisthe task on disk.
   *
   * @return end offset sum - offset a map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * Task.LATEST_OFFSET if this was previously an active running task on this clientThe {@link HostInfo} of this KafkaStreams client, if set via the
   */
  long lagFor(final TaskId task);

  /*{@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the previoushost tasksinfo assignedfor to this consumerKafkaStreams orderedclient byif lagconfigured, filteredelse for any tasks that don't exist in this assignment{@code Optional.empty()}
   */
  SortedSet<TaskId>Optional<HostInfo> prevTasksByLaghostInfo(final String consumerClientId);

  /**
   * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offsetThe client tags for this KafkaStreams client, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * and the current offset, summed across all logged state stores in the taskCan be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return aall the mapclient fromtags allfound statefulin tasksthis toKafkaStreams their lag sumclient's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of this KafkaStreams client, if set via the
   * {@linkMap<String, String> clientTags();
 }

ApplicationState

The KafkaStreamsState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationState . The methods on the ApplicationState  are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationState
package  org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the host info for this KafkaStreams client if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  .processor.assignment;

/**
 *  * The client tags for thisA read-only metadata class representing the current state of each KafkaStreams client, ifwith setat anyleast haveone beenStreamThread viaparticipating configsin usingthis therebalance
 */
public interface *ApplicationState {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
    /**
   * <p>
 * @param *computeTaskLags Canwhether beor usednot howeverto youinclude want,task orlag passedinformation in tothe enablereturned the rack-aware standby task assignor.
metadata. Note that passing 
     *
   * @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();
 }

ApplicationState

The KafkaStreamsState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationState . The methods on the ApplicationState  are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface ApplicationState {
    /** in "true" will result in a remote call to fetch changelog topic end offsets and you should pass in "false" unless
     * you specifically need the task lag information.
     *
     * @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
     *
     * @throws TaskAssignmentException if a retriable error occurs while computing KafkaStreamsState metadata. Re-throw
     * @param  computeTaskLags whether or not to include task lag information in the returned metadata. Note that passing 
     * in "true" will result in a remote call tothis fetchexception changelogto topichave endKafka offsetsStreams andretry youthe shouldrebalance passby inreturning "false"the unlesssame
     *      you specifically need the task lag information.
     *
     * @return a map from the {@code processId} to {@link KafkaStreamsState} forassignment alland KafkaStreamsscheduling clientsan inimmediate thisfollowup apprebalance
     */
    Map<ProcessID, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned
     */
    Set<TaskId> allTasks();

    /**
     *
     * @return the set of stateful and changelogged tasks in this topology
     */
    Set<TaskId> statefulTasks();

    /**
     *
     * @return the set of stateless or changelog-less tasks in this topology
     */
    Set<TaskId> statelessTasks();

}

...