Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  enum AssignmentError {
    OVERLAPPING_CLIENT,
    OVERLAPPING_STANDBY,
    UNKNOWN_PROCESS_ID
  }

  /**
   * @param applicationState the metadata for this Kafka Streams application
   *
   * @return the assignment of active and standby tasks to KafkaStreams clients 
   *
   * @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried,
   *                                 you can throw this exception to keep the assignment unchanged and automatically
   *                                 schedule an immediate followup rebalance. 
   */
  TaskAssignment assign(ApplicationState applicationState);

  /**
   * This callback can be used to observe the final assignment returned to the brokers.
 and observe any *errors that 
   * @param assignment:were detected while processing the finalreturned assignment returned to. If any errors were found, the kafkarebalance will be broker
   * @param subscription:automatically retried by returning the same assignment as the originalinput, subscriptionscheduling passedan intoimmediate thefollowup assignorrebalance,
   */
 and defaultpassing in voidthe onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription) {}

  /**corresponding AssignmentError in this callback.
   * 
   * @param Wrapperassignment: class for the final assignment ofreturned activeto andthe standbys tasks to individual kafka broker
   * KafkaStreams@param clients
subscription: the original */
subscription passed classinto TaskAssignmentthe {

	/**
assignor
   * @param error:      * @return the assignment of tasks to kafka streams clients
  corresponding error type if one was detected while processing the returned assignment,  
   */
         public  Collection<KafkaStreamsAssignment> assignment();
  }
}           or null if the assignment was valid
   */
  default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual 
   * KafkaStreams clients
   */
  class TaskAssignment {

	/**
     * @return the assignment of tasks to kafka streams clients
     */
    public Collection<KafkaStreamsAssignment> assignment();
  }
}

Another reason for introducing the new TaskAssignment and ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it Another reason for introducing the new TaskAssignment and ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

...

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public int nonOverlapCost();
}
    public int trafficCost();
    public int nonOverlapCost();
}


Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState#kafkaStreamsStates . Users who request the task lags via the computeTaskLags  input flag should make sure to handle failures the way they desire, and can rethrow a thrown TaskAssignmentException  (or just not catch it in the first place) to have Kafka Streams automatically "retry" the rebalance by returning the same assignment and scheduling an immediate followup rebalance. Advanced users who want more control over the "fallback" assignment and/or the timing of immediate followup rebalance(s) can simply swallow the TaskAssignmentException  and use the followupRebalanceDeadline  to schedule followup rebalances, eg to implement a retry/backoff policy


Proposed Changes

On the whole we are not introducing new functionality to Streams itself, but rather are refactoring the existing TaskAssignor interface so that it's (a) pluggable/publicly exposed, and (b) easier to understand, use, and implement. Code-wise the largest change is the breaking up of the ClientState into the new KafkaStreamsState and KafkaStreamsAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState#kafkaStreamsStates . Users who request the task lags should make sure to handle failures the way they desire, for example by returning a "fallback assignment" (ie one that returns the same assignment as the previous one) via the TaskAssignmentUtils#fallbackAssignment API, and schedule a follow-up rebalance via the followRebalanceDeadline API.

Observing The Final Assignment

The custom assignor specifies a task to KafkaStreams client mapping, but the final task to group member mapping is controlled by the Streams partition assignor. It may be useful for implementors of the custom assignor to observe the final task to group member assignment that is returned back to Kafka. For example, it may be useful to record the mapping in an event that can be materialized externally to observe the current assignments. To support this, the TaskAssignor  interface also includes a method named finalAssignment #onAssignmentComputed which is called with the final computed GroupAssignment.

Proposed Changes

.

This callback can also be used to return an error to the user and notify them in the case of an invalid assignment computed by their TaskAssignor. The specifics are discussed in the following section.

Assignment Validation

As noted in the TaskAssignor  javadocs, the StreamsPartitionAssignor will verify the assignment returned by the task assignor and return an error if any of the following cases are observed while processing the TaskAssignor 's assignment:

  1. OVERLAPPING_CLIENT :  multiple KafkaStreams clients assigned with the same active task
  2. OVERLAPPING_STANDBY : active task and standby task assigned to the same KafkaStreams client
  3. UNKNOWN_PROCESS_ID : unrecognized ProcessId  not matching any of the participating consumers

If any of these errors are detected, the StreamsPartitionAssignor will immediately "fail" the rebalance and retry it by scheduling an immediate followup rebalance. If this occurs, the input assignment will be used as the new assignment, and the corresponding error will be returned from the #onAssignmentComputed APIOn the whole we are not introducing new functionality to Streams itself, but rather are refactoring the existing TaskAssignor interface so that it's (a) pluggable/publicly exposed, and (b) easier to understand, use, and implement. Code-wise the largest change is the breaking up of the ClientState into the new KafkaStreamsState and KafkaStreamsAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

Consumer Assignments

One major decision in this KIP was whether to encompass the assignment of tasks to consumers/threads within each KafkaStreams client, or to leave that up to the StreamsPartitionAssignor and only carve out the KafkaStreams-level assignment for pluggability. Ultimately we decided on the latter, for several reasons:

...