...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.processor.assignment; public interface TaskAssignor extends Configurable { enum AssignmentError { OVERLAPPING_CLIENT, OVERLAPPING_STANDBY, UNKNOWN_PROCESS_ID } /** * @param applicationState the metadata for this Kafka Streams application * * @return the assignment of active and standby tasks to KafkaStreams clients * * @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried, * you can throw this exception to keep the assignment unchanged and automatically * schedule an immediate followup rebalance. */ TaskAssignment assign(ApplicationState applicationState); /** * This callback can be used to observe the final assignment returned to the brokers. and observe any *errors that * @param assignment:were detected while processing the finalreturned assignment returned to. If any errors were found, the kafkarebalance will be broker * @param subscription:automatically retried by returning the same assignment as the originalinput, subscriptionscheduling passedan intoimmediate thefollowup assignorrebalance, */ and defaultpassing in voidthe onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription) {} /**corresponding AssignmentError in this callback. * * @param Wrapperassignment: class for the final assignment ofreturned activeto andthe standbys tasks to individual kafka broker * KafkaStreams@param clients subscription: the original */ subscription passed classinto TaskAssignmentthe { /** assignor * @param error: * @return the assignment of tasks to kafka streams clients corresponding error type if one was detected while processing the returned assignment, */ public Collection<KafkaStreamsAssignment> assignment(); } } or null if the assignment was valid */ default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {} /** * Wrapper class for the final assignment of active and standbys tasks to individual * KafkaStreams clients */ class TaskAssignment { /** * @return the assignment of tasks to kafka streams clients */ public Collection<KafkaStreamsAssignment> assignment(); } } |
Another reason for introducing the new TaskAssignment and ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it Another reason for introducing the new TaskAssignment and ApplicationState classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each KafkaStreams client. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to KafkaStreams clients is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.processor.assignment; public class AssignmentConfigs { public long acceptableRecoveryLag(); public int maxWarmupReplicas(); public int numStandbyReplicas(); public long probingRebalanceIntervalMs(); public List<String> rackAwareAssignmentTags(); public int trafficCost(); public int nonOverlapCost(); } public int trafficCost(); public int nonOverlapCost(); } |
Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState#kafkaStreamsStates
. Users who request the task lags via the computeTaskLags
input flag should make sure to handle failures the way they desire, and can rethrow a thrown TaskAssignmentException
(or just not catch it in the first place) to have Kafka Streams automatically "retry" the rebalance by returning the same assignment and scheduling an immediate followup rebalance. Advanced users who want more control over the "fallback" assignment and/or the timing of immediate followup rebalance(s) can simply swallow the TaskAssignmentException
and use the followupRebalanceDeadline
to schedule followup rebalances, eg to implement a retry/backoff policy
Proposed Changes
On the whole we are not introducing new functionality to Streams itself, but rather are refactoring the existing TaskAssignor interface so that it's (a) pluggable/publicly exposed, and (b) easier to understand, use, and implement. Code-wise the largest change is the breaking up of the ClientState into the new KafkaStreamsState and KafkaStreamsAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationState#kafkaStreamsStates
. Users who request the task lags should make sure to handle failures the way they desire, for example by returning a "fallback assignment" (ie one that returns the same assignment as the previous one) via the TaskAssignmentUtils#fallbackAssignment API, and schedule a follow-up rebalance via the followRebalanceDeadline API.
Observing The Final Assignment
The custom assignor specifies a task to KafkaStreams client mapping, but the final task to group member mapping is controlled by the Streams partition assignor. It may be useful for implementors of the custom assignor to observe the final task to group member assignment that is returned back to Kafka. For example, it may be useful to record the mapping in an event that can be materialized externally to observe the current assignments. To support this, the TaskAssignor
interface also includes a method named finalAssignment
#onAssignmentComputed
which is called with the final computed GroupAssignment.
Proposed Changes
.
This callback can also be used to return an error to the user and notify them in the case of an invalid assignment computed by their TaskAssignor. The specifics are discussed in the following section.
Assignment Validation
As noted in the TaskAssignor
javadocs, the StreamsPartitionAssignor will verify the assignment returned by the task assignor and return an error if any of the following cases are observed while processing the TaskAssignor
's assignment:
OVERLAPPING_CLIENT
: multiple KafkaStreams clients assigned with the same active taskOVERLAPPING_STANDBY
: active task and standby task assigned to the same KafkaStreams clientUNKNOWN_PROCESS_ID
: unrecognizedProcessId
not matching any of the participating consumers
If any of these errors are detected, the StreamsPartitionAssignor will immediately "fail" the rebalance and retry it by scheduling an immediate followup rebalance. If this occurs, the input assignment will be used as the new assignment, and the corresponding error will be returned from the #onAssignmentComputed APIOn the whole we are not introducing new functionality to Streams itself, but rather are refactoring the existing TaskAssignor interface so that it's (a) pluggable/publicly exposed, and (b) easier to understand, use, and implement. Code-wise the largest change is the breaking up of the ClientState into the new KafkaStreamsState and KafkaStreamsAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before.
Consumer Assignments
One major decision in this KIP was whether to encompass the assignment of tasks to consumers/threads within each KafkaStreams client, or to leave that up to the StreamsPartitionAssignor and only carve out the KafkaStreams-level assignment for pluggability. Ultimately we decided on the latter, for several reasons:
...