Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  /**
   * @param applicationMetadataapplicationState the metadata for this Streams application
   * @return the assignment of active and standby tasks to Streams client nodes
   */
  TaskAssignment assign(ApplicationMetadataApplicationState applicationMetadataapplicationState);

  /**
   * This method allows the assignor plugin to observe the final assignment returned
   * to the brokers.
   * @param assignment: the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   */
  void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription);

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual Streams 
   * client nodes
   */
  class TaskAssignment {
    private final Collection<StreamsClientAssignment> streamsClientAssignments;

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<StreamsClientAssignment> assignment();

    /**
     * @return the number of Streams client nodes to which tasks were assigned
     */
    public int numStreamsClients();

    /**
     * @return a String representation of the returned assignment, in processId order
     */
    @Override
    public String toString();
  }
}

Another reason for introducing the new TaskAssignment and ApplicationMetadata classes ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each client node. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to nodes is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

...

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on Streams client nodes 
 */
public interface StreamsClientAssignment {
  ProcessID processId();

  Map<TaskId,Set<AssignedTask> AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  Instant followupRebalanceDeadline();

  static class AssignedTask {
    enum Type {
     public final boolean isActive;
   STATELESS,
        STATEFUL,
     public final boolean isStateful;STANDBY
  }
}

ProcessID

  }
    
    public Type type();

    public TaskId id();
  }
}

ProcessID

The  ProcessId  is a new wrapper class The  ProcessId  is a new wrapper class around the UUID to make things easier to understand:

...

Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface StreamsClientState {
  /**
   * @return the processId of the application instance running on this node
   */
  ProcessID processId();

  /**
   * Returns the number of StreamThreads on this client, which is equal to the number of main consumers
   * and represents its overall capacity.
   * <p>
   * NOTE: this is actually the "minimum capacity" of a node, or the minimum number of assigned
   * active tasks below which the node will have been over-provisioned and unable to give every
   * available StreamThread an active task assignment
   *
   * @return the number of consumers on this node
   */
  int numStreamThreads();

  /**
   * @return the set of consumer client ids for all StreamThreads on the given node
   */
  SortedSet<String> consumers();

  /**
   * @return the set of all active tasks owned by consumers on this node since the previous rebalance
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumers on this node since the previous rebalance
   */
  SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return end offset sum - offset sum
   *          Task.LATEST_OFFSET if this was previously an active running task on this client
   */
  long lagFor(final TaskId task);

  /**
   * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
   */
  SortedSet<TaskId> prevTasksByLag(final String consumer);

  /**
   * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
   * and the current offset, summed across all logged state stores in the task.
   *
   * @return a map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of this node, if set via the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the host info for this node if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  /**
   * The client tags for this client node, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this node's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();
 }

...

ApplicationState

The StreamsClientState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationMetadata ApplicationState . The methods on the ApplicationMetadata ApplicationState  are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface ApplicationMetadataApplicationState {
    /**
     * @return a map from the {@code processId} to {@link StreamsClientState} for all Streams client nodes in this app
     */
    Map<ProcessID, StreamsClient> streamsClientStates();

    /**
     * Makes a remote call to fetch changelog topic end offsets and, if successful, uses the results to compute
     * task lags for each {@link StreamsClientState}.
     *
     * @return whether the end offset fetch and lag computation was successful
     */
    boolean computeTaskLags();

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned to a node
     */
    Set<TaskId> allTasks();

    /**
     *
     * @return the set of stateful and changelogged tasks in this topology
     */
    Set<TaskId> statefulTasks(); 
}

...

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to nodes according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param streamsClientAssignments the current assignment of tasks to nodes
     */
 @return a new publicmap staticcontaining voidthe defaultStandbyTaskAssignment(finalmappings ApplicationMetadatafrom applicationMetadata,streamsClientAssignments finalupdated Map<ProcessID,with StreamsClientAssignment> streamsClientAssignmentsthe default standby assignment
     */
    public static Map<ProcessID, StreamsClientAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, final Map<ProcessID, StreamsClientAssignment> streamsClientAssignments) {...}

    /**
     * Optimize the active task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     * @param tasks the set of tasks to reassign if possible. Must already be assigned to a node to a node
     * @return a new map containing the mappings from streamsClientAssignments updated with the default rack-aware assignment for active tasks
       */
    public static voidMap<ProcessID, StreamsClientAssignment> optimizeRackAwareActiveTasks(final ApplicationMetadataApplicationState applicationMetadataapplicationState, final Map<ProcessID, StreamsClientAssignment> streamsClientAssignments, final SortedSet<TaskId> tasks) {...}

    /**
     * Optimize the standby task assignment for rack-awareness
     *
     * @param streamsClientAssignments the current assignment of tasks to nodes
     * @return a new map containing the mappings from streamsClientAssignments updated with the default rack-aware assignment for standy tasks
     */
    public static Map<ProcessID, voidStreamsClientAssignment> optimizeRackAwareStandbyTasks(final ApplicationMetadataApplicationState applicationMetadataapplicationState, final Map<ProcessID, StreamsClientAssignment> streamsClientAssignments) {...}
}

...

Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationMetadataApplicationState.computeTaskLags . This means we need some way to communicate to the streams partition assignor that it should retain the same assignment and schedule a follow-up rebalance. To do this, we will add the exception type StreamsAssignorRetryableException . If the TaskAssignor  throws this exception, StreamsPartitionAssignor  catches it, does fallback assignment (ie returns the same assignment as the previous one), and schedules a follow-up rebalance.

...