Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Next we have the KafkaStreamsAssignment interface class, representing the output of the assignment to be created by the TaskAssignor:

Code Block
languagejava
titleKafkaStreamsAssignment
titleKafkaStreamsAssignment
package org.apache.kafka.streams.processor.assignment; 

/**
 * A simple container class for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public class KafkaStreamsAssignment {

  /* 
   * Construct an instance of KafkaStreamsAssignment with this processId and the given set of
   * assigned tasks. If you want this KafkaStreams client to request a followup rebalance, you
   * can set the followupRebalanceDeadline via the {@link #withFollowupRebalance(Instant)} API.
   *
   * @param processId the processId for the KafkaStreams client that should receive this assignment
   * @param assignment the set of tasks to be assigned to this KafkaStreams client
   *
   * @return a new KafkaStreamsAssignment object with the given processId and assignment
   */
  public static KafkaStreamsAssignment of(final ProcessId processId, final Set<AssignedTask> assignment);

  /**
   * This API can be used to request that a followup rebalance be triggered by the KafkaStreams client 
   * receiving this assignment. The followup rebalance will be initiated after the provided deadline
   * has passed, although it will always wait until it has finished the current rebalance before 
   * triggering a new one. This request will last until the new rebalance, and will be erased if a
   * new rebalance begins before the scheduled followup rebalance deadline has elapsed. The next
   * assignment must request the followup rebalance again if it still wants to schedule one for
   * the given instant, otherwise no additional rebalance will be triggered after that.
   * 
   * @param rebalanceDeadline the instant after which this KafkaStreams client will trigger a followup rebalance
   *
   * @return a new KafkaStreamsAssignment object with the same processId and assignment but with the given rebalanceDeadline
   */
  public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline);

  publicpackage org.apache.kafka.streams.processor.assignment; 

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on KafkaStreams clients
  */
public interface KafkaStreamsAssignment {
  ProcessID processId();

  public Set<AssignedTask> assignment();

  /**
   * @return the actual deadline in objective time, after which the followup rebalance will be attempted.
   * Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  public Instant followupRebalanceDeadline();

  static public static class AssignedTask {

    public AssignedTask(final TaskId id, final Type taskType);

    enum Type {
        ACTIVE,
        STANDBY
    }
    
    public Type type();

    public TaskId id();
  }
}

...

Code Block
languagejava
titleKafkaStreamsState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
 */
public interface KafkaStreamsState {
  /**
   * @return the processId of the application instance running on this KafkaStreams client
   */
  ProcessID processId();

  /**
   * Returns the number of processing threads available to work on tasks for this KafkaStreams client, 
   * which represents its overall capacity for work relative to other KafkaStreams clients.
   *
   * @return the number of processing threads on this KafkaStreams client
   */
  int numProcessingThreads();

  /**
   * @return the set of consumer client ids for this KafkaStreams client
   */
  SortedSet<String> consumerClientIds();

  /**
   * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance
   */
  SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return end offset sum - offset sum
   *          Task.LATEST_OFFSET if this was previously an active running task on this client
   * @throws UnsupportedOperationException if the user did not request task lags be computed.
    */
  long lagFor(final TaskId task);

  /**
   * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
   * @throws UnsupportedOperationException if the user did not request task lags be computed.
   */
  SortedSet<TaskId> prevTasksByLag(final String consumerClientId);

  /**
   * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
   * and the current offset, summed across all logged state stores in the task.
   **
   * @return a map from all stateful tasks to their lag sum
   * @throws @returnUnsupportedOperationException aif mapthe fromuser alldid statefulnot tasksrequest totask theirlags lagbe sumcomputed.
    */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of this KafkaStreams client, if set via the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the host info for this KafkaStreams client if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  /**
   * The client tags for this KafkaStreams client, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();

  /**
   * @return the rackId for this KafkaStreams client, or {@link Optional#empty()} if none was configured
   */
  Optional<String> rackId();

  }

ApplicationState

The KafkaStreamsState  will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the next new interface, the ApplicationState . The methods on the ApplicationState  are basically just the current inputs to the #assign method:

...