Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStreamsConfig
/** {@code partition.assignor.task.assignor.class} */
public static final String PARTITION_ASSIGNOR_TASK_ASSIGNOR_CLASS_CONFIG = "partition.assignor.task.assignor.class";
private static final String PARTITION_ASSIGNOR_TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface". Defaults to the <@link HighAvailabilityTaskAssignor> class.";

...

Code Block
languagejava
titleTaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  /**
   * @param applicationMetadata the metadata for this Streams application
   * @return the assignment of active and standby tasks to Streams client nodes
   */
  TaskAssignment assign(final ApplicationMetadata applicationMetadata);

  /**
   * Wrapper class forThis method allows the assignor plugin to observe the final assignment ofreturned
 active and standbys* tasks to individualthe Streams brokers.
   * @param clientassignment: nodes
the final assignment */
returned to classthe TaskAssignmentkafka {broker
   * private@param finalsubscription: Collection<NodeAssignment> nodeAssignments;

	/**
     * @return the assignment of tasks to nodesthe original subscription passed into the assignor
     */
  void finalAssignment(GroupAssignment publicassignment, Collection<NodeAssignment>GroupSubscription assignment(subscription);

    /**
   * Wrapper *class @returnfor the numberfinal assignment of active Streamsand clientstandbys nodestasks to whichindividual tasksStreams were assigned
   * client nodes
   */
  class TaskAssignment public{
 int numNodes();

  private final Collection<NodeAssignment> nodeAssignments;

	/**
     * @return athe Stringassignment representation of thetasks returned assignment, in processId orderto nodes
     */
    @Override
    public StringCollection<NodeAssignment> toStringassignment();

   }
}

Another reason for introducing the new TaskAssignment and ApplicationMetadata classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each client node. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to nodes is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

To solve these problems, we plan to refactor the interface with two goals in mind:

 /**
     * @return the number of Streams client nodes to which tasks were assigned
     */
    public int numNodes();

    /**
     * @return a String representation of the returned assignment, in processId order
     */
    @Override
    public String toString();
  }
}

Another reason for introducing the new TaskAssignment and ApplicationMetadata classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each client node. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to nodes is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

To solve these problems, we plan to refactor the interface with two goals in mind:

  1. To provide a clean separation of input/output by splitting the ClientState into an input-only NodeState metadata class and an output-only NodeAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by
  3. To provide a clean separation of input/output by splitting the ClientState into an input-only NodeState metadata class and an output-only NodeAssignment return value class
  4. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which node(s) should trigger a rejoin and when to request the subsequent rebalance

...

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment; 
 
/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on Streams client nodes 
 */
public interface NodeAssignment {
  ProcessID ProcessID processId();

  Set<TaskId>long activeAssignmentfollowupRebalanceDeadline();

  Set<TaskId> activeStatefulAssignment();
  
  Set<TaskId> activeStatelessAssignmentMap<TaskId, AssignedTask> assignment();

  Set<TaskId> standbyAssignment();

  /**
   * @return the actual deadline in objective time, using ms since the epoch, after which the
   * followup rebalance will be attempted. Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  long followupRebalanceDeadline();

  static class AssignedTask {
    public final boolean isActive;
    public final boolean isStateful;
  }
}

ProcessID

The  ProcessId  is a new wrapper class around the UUID to make things easier to understand:

...

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public int nonOverlapCost();
} int nonOverlapCost();
}


Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationMetadata.computeTaskLags . This means we need some way to communicate to the streams partition assignor that it should retain the same assignment and schedule a follow-up rebalance. To do this, we will add the exception type StreamsAssignorRetryableException . If the TaskAssignor  throws this exception, StreamsPartitionAssignor  catches it, does fallback assignment (ie returns the same assignment as the previous one), and schedules a follow-up rebalance.

Observing The Final Assignment

The custom assignor specifies a task to node mapping, but the final task to group member mapping is controlled by the Streams partition assignor. It may be useful for implementors of the custom assignor to observe the final task to group member assignment that is returned back to Kafka. For example, it may be useful to record the mapping in an event that can be materialized externally to observe the current assignments. To support this, the TaskAssignor  interface also includes a method named finalAssignment which is called with the final computed GroupAssignmentFinally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationMetadata.computeTaskLags . This means we need some way to communicate to the streams partition assignor that it should retain the same assignment and schedule a follow-up rebalance. To do this, we will add the exception type StreamsAssignorRetryableException . If the TaskAssignor  throws this exception, StreamsPartitionAssignor  catches it, does fallback assignment (ie returns the same assignment as the previous one), and schedules a follow-up rebalance.

Proposed Changes

No actual changes to functionality, mainly moving an internal config to the public part of StreamsConfig and bringing along a few currently-internal classes into the public API as some new interfaces and a new public assignment package. Code-wise the largest change is really the breaking up of the ClientState into the new NodeState and NodeAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

...