...
Finally, there are good reasons for a user to want to extend or modify the behaviour behavior of the Kafka Streams partition assignor beyond just changing the task assignment. For example, a user may want to implement their own initialization logic that initializes resources (much the same way the Streams Partition Assignor initializes internal topics).
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.processor.assignment; public interface TaskAssignor extends Configurable { /** * NONE: no error detected * ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task * ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client * INVALID_STANDBY_TASK: stateless task assigned as a standby task * MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment * UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers * UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned */ enum AssignmentError { NONE, ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES, ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS, INVALID_STANDBY_TASK, MISSING_PROCESS_ID, UNKNOWN_PROCESS_ID, UNKNOWN_TASK_ID } /** * @param applicationState the metadata for this Kafka Streams application * * @return the assignment of active and standby tasks to KafkaStreams clients * * @throws TaskAssignmentException If an error occurs during assignment and you wish for the rebalance to be retried, * you can throw this exception to keep the assignment unchanged and automatically * schedule an immediate followup rebalance. */ TaskAssignment assign(ApplicationState applicationState); /** * This callback can be used to observe the final assignment returned to the brokers and check for any errors that * were detected while processing the returned assignment. If any errors were found, the corresponding * will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will * be thrown up to kill the StreamThread and can be handled as any other uncaught exception would if the application * has registered a {@link StreamsUncaughtExceptionHandler}. * <p> * Note: some kinds of errors will make it impossible for the StreamsPartitionAssignor to parse the TaskAssignment * that was returned from the TaskAssignor's {@link #assign}. If this occurs, the {@link GroupAssignment} passed * in to this callback will contain an empty map instead of the consumer assignments. * * @param assignment: the final consumer assignmentassignments returned to the kafka broker, or an empty assignment map if * an error prevented the assignor from converting the TaskAssignment into a GroupAssignment * @param subscription: the original consumer subscriptionsubscriptions passed into the assignor * @param error: the corresponding error type if one was detected while processing the returned assignment, * or AssignmentError.NONE if the returned assignment was valid */ default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {} /** * Wrapper class for the final assignment of active and standbys tasks to individual * KafkaStreams clients */ class TaskAssignment { /** * @return the assignment of tasks to kafka streams clients */ public Collection<KafkaStreamsAssignment> assignment(); } } |
...