Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: change Node to StreamsClient

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  /**
   * @param applicationMetadata the metadata for this Streams application
   * @return the assignment of active and standby tasks to Streams client nodes
   */
  TaskAssignment assign(ApplicationMetadata applicationMetadata);

  /**
   * This method allows the assignor plugin to observe the final assignment returned
   * to the brokers.
   * @param assignment: the final assignment returned to the kafka broker
   * @param subscription: the original subscription passed into the assignor
   */
  void finalAssignmentonAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription);

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual Streams 
   * client nodes
   */
  class TaskAssignment {
    private final Collection<NodeAssignment>Collection<StreamsClientAssignment> nodeAssignmentsstreamsClientAssignments;

	/**
     * @return the assignment of tasks to kafka nodesstreams clients
     */
    public Collection<NodeAssignment>Collection<StreamsClientAssignment> assignment();

    /**
     * @return the number of Streams client nodes to which tasks were assigned
     */
    public int numNodesnumStreamsClients();

    /**
     * @return a String representation of the returned assignment, in processId order
     */
    @Override
    public String toString();
  }
}

...

  1. To provide a clean separation of input/output by splitting the ClientState into an input-only NodeState StreamsClientState metadata class and an output-only NodeAssignment StreamsClientAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which node(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new top-level public interfaces, NodeState StreamsClientState  and NodeAssignment StreamsClientAssignment :

...

StreamsClientAssignment

First we have the NodeAssignment StreamsClientAssignment interface, representing the output of the assignment

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on Streams client nodes 
 */
public interface NodeAssignmentStreamsClientAssignment {
  ProcessID processId();

  long followupRebalanceDeadline();

  Map<TaskId, AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, using ms since the epoch, after which the
   * followup rebalance will be attempted. Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  long followupRebalanceDeadline();

  static class AssignedTask {
    public final boolean isActive;
    public final boolean isStateful;
  }
}

...

Code Block
languagejava
titleProcessID
package org.apache.kafka.streams.processor.assignment; 

/** A simple wrapper around UUID that abstracts a Process ID */
public class ProcessID {
    private final UUID id;

    public ProcessID(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }

    int hashCode() {
        return id.hashCode();
    }

    boolean equals(final ProcessID other) {
        if (other == null || getClass() != other.getClass()) {
            return false;
        }
         return id.equals(other.id);
    }
}
 

...

StreamsClientState

Next we have the NodeState StreamsClientState  interface, representing the input to the assignor:

Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface NodeStateStreamsClientState {
  /**
   * @return the processId of the application instance running on this node
   */
  ProcessID processId();

  /**
   * Returns the number of StreamThreads on this nodeclient, which is equal to the number of main consumers
   * and represents its overall capacity.
   * <p>
   * NOTE: this is actually the "minimum capacity" of a node, or the minimum number of assigned
   * active tasks below which the node will have been over-provisioned and unable to give every
   * available StreamThread an active task assignment
   *
   * @return the number of consumers on this node
   */
  int numStreamThreads();

  /**
   * @return the set of consumer client ids for all StreamThreads on the given node
   */
  SortedSet<String> consumers();

  /**
   * @return the set of all active tasks owned by consumers on this node since the previous rebalance
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumers on this node since the previous rebalance
   */
  SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return end offset sum - offset sum
   *          Task.LATEST_OFFSET if this was previously an active running task on this client
   */
  long lagFor(final TaskId task);

  /**
   * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
   */
  SortedSet<TaskId> prevTasksByLag(final String consumer);

  /**
   * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
   * and the current offset, summed across all logged state stores in the task.
   *
   * @return a map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of this node, if set via the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the host info for this node if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  /**
   * The client tags for this client node, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this node's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();
 }

ApplicationMetadata

The NodeState StreamsClientState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationMetadata . The methods on the ApplicationMetadata  are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface ApplicationMetadata {
    /**
     * @return a map from the {@code processId} to {@link NodeStateStreamsClientState} for all Streams client nodes in this app
     */
    Map<ProcessID, NodeState>StreamsClient> nodeStatesstreamsClientStates();

    /**
     * Makes a remote call to fetch changelog topic end offsets and, if successful, uses the results to compute
     * task lags for each {@link NodeStateStreamsClientState}.
     *
     * @return whether the end offset fetch and lag computation was successful
     */
    boolean computeTaskLags();

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned to a node
     */
    Set<TaskId> allTasks();

    /**
     *
     * @return the set of stateful and changelogged tasks in this topology
     */
    Set<TaskId> statefulTasks(); 
}

...

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A set of utilities to help implement task assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to nodes according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param nodeAssignmentsstreamsClientAssignments the current assignment of tasks to nodes
     */
    public static void defaultStandbyTaskAssignment(final ApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment>StreamsClientAssignment> nodeAssignmentsstreamsClientAssignments) {...}

    /**
     * Optimize the active task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     * @param tasks the set of tasks to reassign if possible. Must already be assigned to a node
     */
    public static void optimizeRackAwareActiveTasks(final ApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment>StreamsClientAssignment> nodeAssignmentsstreamsClientAssignments, final SortedSet<TaskId> tasks) {...}

    /**
     * Optimize the standby task assignment for rack-awareness
     *
     * @param nodeAssignmentsstreamsClientAssignments the current assignment of tasks to nodes
     */
    public static void optimizeRackAwareStandbyTasks(final ApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment>StreamsClientAssignment> nodeAssignmentsstreamsClientAssignments) {...}
}

TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

...

No actual changes to functionality, mainly moving an internal config to the public part of StreamsConfig and bringing along a few currently-internal classes into the public API as some new interfaces and a new public assignment package. Code-wise the largest change is really the breaking up of the ClientState into the new NodeState StreamsClientState and NodeAssignment StreamsClientAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

...