Status
Current state: Accepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Note that this is joint work by Lucas Brutschy and Bruno Cadonna .
Motivation
Kafka Streams clients use a protocol to distribute stream processing tasks among Streams clients. Since the inception of Streams, this has worked by piggybacking custom byte payloads on the pre-KIP-848 consumer rebalance protocol. A custom partition assignor on one of the Streams clients would perform task assignment and various other centralized tasks, such as internal topic initialization. While this approach has served its purpose well and the streams partition assignor has achieved impressive flexibility and complexity over the years, it has also had a number of deficiencies:
A large fraction of the production incidents in Kafka Streams are caused by bugs in the rebalancing logic or misconfiguration thereof. With the rebalance logic being run on the processing node, the implementer of the business logic is also typically responsible for configuring, upgrading and debugging the rebalance logic.
When rebalance problems occur, the timeline of events and information that caused the rebalance and influenced the assignment is spread across all clients, making it difficult to debug.
Since streams versions on the client are notoriously slow to be picked up by teams building the processing logic, bugs in the rebalance logic are prevalent for years after being fixed on trunk.
Since assignment logic is controlled by the streams application, tuning the parameters of the assignment logic requires redeployment of the application, which in itself triggers rebalances (and reassignments), creating a vicious cycle.
With the rebalance logic being based on the metadata on one of the Streams clients, metadata inconsistencies among clients have caused a variety of issues.
Particularly, the use of raw byte arrays in the protocol means that brokers cannot understand or validate the payload that is propagated among Streams clients.
In the implementation, piggybacking the task assignment logic on the consumer partition assignment and injecting the required Streams metadata and functionality into the general-purpose
KafkaConsumer
interface has required a number of workarounds, increasing code complexity and hurting the maintainability of the code. This has affected both the streams code, which had to work around the restrictions of the consumer interface, and the consumer, whose interface had to be padded to account for Kafka Streams requirements.Because the protocol is not Kafka Streams native, advanced features like “smooth scale out” or “application shutdown on error” and others require workarounds like probing rebalances, which are disruptive for the running application as well as often hard to reason about by users.
KIP-848 introduced the next generation of the consumer rebalance protocol, which also requires a redefinition of the streams rebalance protocol. We propose to use this change in the protocol to solve or alleviate the above problems by introducing a dedicated protocol for Kafka Streams, which uses dedicated RPCs for task rebalancing and where centralized logic such as topic initialization and task assignment are performed centrally by the brokers’ group coordinator.
Design Goals
We propose to introduce new RPCs designed to make Streams initialization and task assignment first-class citizens of the Kafka protocol. The proposed RPCs are built on top of the following design goals:
We want to inherit all the advantages of the new consumer rebalance protocol introduced in KIP-848, such as greatly simplified clients, task assignment on the broker by default, truly incremental and cooperative rebalancing without global synchronization, and the ability of each instance to initiate a metadata update to the rebalance logic.
Move central initialization tasks, such as the creation of internal topics, to the group coordinator.
Make all data required for initialization and assignment explicit in the protocol and deserializable by the broker, with corresponding compatibility guarantees.
Establish a single source of truth for stream application metadata such as the internal topic configuration, the topology and the assignment configuration to detect inconsistencies early by persisting them on the broker.
Make the assignment logic parameters centrally tunable, without the need to redeploy the clients.
Make active tasks, standby tasks and warm-up tasks first-class citizens in assignment and reconciliation
Proposed Changes
Overview
We introduce the concept of a Streams group in parallel to a consumer group. Streams clients use a dedicated heartbeat RPC, StreamsGroupHeartbeat
to join a group, leave a group, and update the group coordinator about its currently owned tasks and its client-specific metadata. The group coordinator manages a Streams group similarly to a consumer group, continuously updating the group member metadata via heartbeat responses and running assignment logic when changes are detected.
To assign tasks among Streams clients, the group coordinator uses topology metadata that is initialized members joins the group and persisted in the consumer offsets topic. To trigger topology initialization and/or detect topology inconsistencies, each heartbeat contains a unique ID of the topology (which can be derived automatically from the topology via hashing).
Streams groups
We will introduce a new group type called streams
to the group coordinator. For each Streams group, we define new record key and value types for the group metadata, topology metadata and group member metadata. The records are persisted in the __consumer_offsets
topic. A group can either be a streams group, a share group or a consumer group, defined by the first heartbeat request using the corresponding GroupId
.
Joining, leaving and staying in a Streams group
During the complete lifetime of a Streams client, its members continuously send StreamsGroupHeartbeatRequest
s to the group coordinator. The requests contain the following information:
All non-assignment metadata as defined in the
ConsumerGroupHeartbeatRequest
such as the group ID, member ID, member epoch, instance ID, rack ID and rebalance timeout.The currently assigned active, standby and warm-up tasks. Each task is identified by a subtopology ID (which can be an arbitrary string in protocol for future extensibility, even though subtopologies are currently numbered) and a partition number.
The process ID to identify multiple members running in the same process (i.e. the same Streams client).
The user-defined endpoint to be used for running interactive queries.
- Topology metadata needed for creating internal topics and computing the assignment.
The topology epoch, which is defined by the client, and will be 0 by default. The epoch can be bumped by the client to deploy a new version of the topology metadata.
For each task, the current sum of task changelog offsets and task changelog end offsets. These are used to determine whether a warm-up task is below
acceptable.recovery.lag
and can be used to optimize the assignment in the assignor. For more details on these offsets see "Cumulative task changelog offsets and task changelog end offsets" section belowA flag to request shutdown of the whole application (only if the member epoch is the group leave epoch).
Group ID, member ID, member epoch, and instance ID (if defined by the client) are sent with each heartbeat request. Any other information that has not changed since the last heartbeat can be omitted, unless an error occurred.
Heartbeat handling & response
If the heartbeat is valid and any data in the heartbeat has changed, the group coordinator will update its internal representation of the member correspondingly and persist the updated member metadata in a record on the __consumer_offsets
topic. It then potentially updates the group assignment and reconciles member assignments, which is described in the section "Rebalance Process".
The group coordinator responds with a StreamsGroupHeartbeatResponse
, which contains
All non-assignment metadata as defined in the
ConsumerGroupHeartbeatResponse
such as the member ID (when joining as a new member), new member epoch and heartbeat interval ms.The current target assignment for this member, as sets of active, standby and warm-up task IDs.
For each client in the group, the user-defined endpoint and the assigned tasks for IQ
Again, fields that don’t change will be omitted from the heartbeat response.
Initializing and updating topology metadata on the broker
Whenever a member joins the Streams group, the first heartbeat request contains metadata of the topology. The metadata describes the topology as a set of subtopologies, each identified by a unique string identifier and containing the following metadata that is relevant for creation of internal topics and assignment:
The set of source topics. Here, the source topics can be defined through a Google RE2/J regular expression.
The set of repartition sink topics. We do not include non-repartition sink topics or output topics that the topology produces in using dynamic routing.
The set of changelog topics, replication factors and other topic-level configurations.
The set of repartitions topics, the number of partitions for each of them (optional), replication factors and other topic-level configurations.
- The set of copartition groups, that is, the set of source topics (user source topics, or internal repartition source topics) that have to have the same number of partitions, e.g. because they are being used inside a join.
Note that the metadata of topology does not specify the number of input partitions a source topology has, and similarly also does not specify the number of partitions for changelog topics, and may omit the number of partitions for some repartition topics (if it can be inferred). By leaving the number of partitions parametric, the broker can allow changing the number of partitions for certain topologies, without reinitializing the group. In particular, stateless topologies without repartitioning steps, can support any number of input partitions without reinitializing the broker-side topology.
The metadata sent to the broker via the heartbeat
is persisted as a new record type in the __consumer_offsets
topic and is part of the group coordinator state in subsequent requests. If one or more source topics or internal topics do not exist, the group will enter state NOT_READY
and will attempt to create any missing internal topics (see more details below). Only after the topology metadata is initialized and all source topics and internal topics exist, the group coordinator starts assigning tasks to the members of the group.
Topology updates
To avoid unintentional topology updates or rollbacks, updating the broker-side topology after it has been initialized for a group requires bumping the client-side configuration topology.epoch
, which defaults to 0. By default, whenever a Streams member joins the group with a topology that is different from the current topology of the streams group, and the topology epoch is not bumped, the group coordinator will respond with an error, indicating that the member is incorrectly updating the broker-side topology.
Any member can update the topology by sending a topology with a bumped topology epoch when joining. The group coordinator then updates the broker-side topology with the topology metadata sent by the member. If the topology epoch of an existing member does not correspond to the current topology epoch of the group anymore (so the topology of the member is stale w.r.t. the group's topology), this will be indicated in the heartbeat response (status STALE_TOPOLOGY)
sent to that member (for informational purposes, e.g., logging a warning).
The topology epoch of the group and the topology epoch of each member are provided to the assignor, which will make sure that members with a topology epoch that does not correspond to the group's topology epoch will not get any new tasks assigned, avoiding problems caused by topology incompatibilities.
Example
For example of a rolling bounce to upgrade the topology, let’s assume a sticky assignor with stateless tasks and no standby tasks and one thread per process for illustration.
Assume we have 3 clients A, B, and C with a uniform assignment (#0 is the topology epoch):
A(#0): T1, T2, T3
B(#0): T4, T5, T6
C(#0): T7, T8, T9
We shut down C to update its topology. Once we shut down C, the assignment is the following:
A(#0): T1, T2, T3, T7
B(#0): T4, T5, T6, T8, T9
Then, C rejoins the group with topology epoch #1. This topology epoch bumps the group's topology epoch, and C also includes the information to initialize the new topology in its first heartbeat. From now on, A and B will not get new tasks assigned, but C will get new tasks assigned because now #1 is the group's topology epoch. The rest of the assignment is unchanged - just that A and B cannot get any new tasks, but they can retain some of the tasks they have. So since the sticky assignor balances out the assignment, we should end up with something like this:
A(#0): T2, T3, T7
B(#0): T5, T6, T8
C(#1): T1, T4, T9
B is shut down for the topology update. Since, again, A cannot get new tasks but C can, the new assignment is:
A(#0): T2, T3, T7
C(#1): T1, T4, T9, T5, T6, T8
Client B comes back with the new topology epoch, and gets new tasks assigned:
A(#0): T2, T3, T7
B(#1): T1, T4, T9
C(#1): T5, T6, T8
And so on.
The broker will reject members that attempt to join with a stale topology epoch, or with the current topology epoch but different topology metadata.
Handling topic topology mismatches
It can happen that the group is initialized to a topology, but source / sink or internal topics required by the topology do not exist or differ in their configuration from what is required for the topology to successfully execute. This is typically detected during the handling of the streams group heartbeat in the group coordinator, where we detect changes in either the topology or the topic metadata on the broker, triggering "topology configuration" process, in which the group coordinator performs the following steps:
- Check that all source topics exists, resolve source topic regular expressions and check that each of them resolve to at least one topic.
- Check that "copartition groups" are satisfied, that is, all source topics that are supposed to be copartitioned are indeed copartitioned.
- Derive the required number of partitions for all internal topics from the source topic configuration.
- Check that all internal topics exist with the right configuration.
If any source topics or internal topics are missing, the group enters a state NOT_READY.
In NOT_READY
, all heartbeats will be handled as usual (so they typically should not fail), but in the heartbeat response, the status will indicate which kind of problem exists - all members will get an empty assignment when the group is in NOT_READY
state. Below, we describe the behavior of the protocol when any mismatch is detected during topic/topology configuration. The behaviors are ordered by precedence, for example, if source topics and internal topics are missing, then the groups takes on the behavior for missing source topics, not the behavior for missing internal topics.
- Source topics missing
Condition: A source topic is missing or a source topic regex resolves to zero topics.
Behavior: The group will enter/remain in stateNOT_READY
. Heartbeat responses will indicate statusMISSING_SOURCE_TOPICS
. In the status detail, we specify all missing source topics and all regular expressions matching zero topics. - Topics incorrectly partitioned
Condition: Some topics are incorrectly partitioned, if two topics are supposed to be copartitioned according to the topology, but in the current topic metadata on the broker, the number of partitions for the two topics is different, or the number of partitions in a changelog topic does not correspond to the maximal number of source topic partition for that subtopology.
Behavior: The group will enter/remain in stateNOT_READY
. Heartbeat responses will indicate statusINCORRECTLY_PARTITIONED_TOPICS
. In the status detail, we specify at least one incorrectly partitioned topic. - Internal topics are missing
Condition: One or more internal topics are missing.
Behavior: If discovered during a heartbeat, the group coordinator will attempt to create the internal topics by sending a corresponding topic create request will be sent to the Kraft coordinator. There can only be one such request in-flight at a time, an appropriate back-off mechanism will be used to prevent too many attempts to create the topics. If the appropriate ACL for topic creation are not assigned to the principle executing the heartbeat, no such attempt will be made. The group will enter/remain in stateNOT_READY
. Heartbeat responses will indicate statusMISSING_INTERNAL_TOPICS
. In the status detail, we specify whether an attempt to create the topics was made, whether and why a previous attempt failed, whether sufficient ACLs to create the topics are available. - Topic configuration mismatches
If an internal topic exists, but does not have the same configuration as defined in the topology (all parameters of the topic beside number of partitions, that is, replication factor, retention time, etc.), this will be logged on the broker, but otherwise be ignored.
Describing and listing streams groups
Streams groups will be returned in ListGroupsResponse
with GroupType
equal to the string streams
. The group ID of a streams group can be used in OffsetFetchRequest
and OffsetCommitRequest
as usual. Sending a ConsumerGroupHeartbeatRequest
to a streams group will return an GROUP_ID_NOT_FOUND
error. Sending a StreamsGroupHeartbeatRequest
to a consumer group will similarly return an GROUP_ID_NOT_FOUND
error.
There will be a new RPC DescribeStreamsGroup
that returns, given the group ID, all metadata related to the streams group, such as
The topology metadata of the group. This topology metadata is the result of the above "topology configuration" process, so it contains the topology metadata as initialized by one of the streams group members, but with a concrete number of partitions for each topic, and with source topic regular expressions resolved to a specific set of topics.
- The topology epoch of the group.
The latest member metadata that each member provided through the
StreamsGroupHeartbeat
API.The current target assignment generated by the assignor.
We also include extensions of the Admin API and the command-line tools to describe and modify streams groups.
Rebalance Process
Similar as in KIP-848 consumer groups, the rebalance process is entirely driven by the group coordinator, and based on the same three epochs: group epoch, assignment epoch, and member epoch. The main difference is the assigned resource - instead of assigning partitions to consumers, we assign tasks consisting of a subtopology ID and a partition number, and each task can be assigned to a client in three roles: as an active task, as a standby task or as a warm-up task.
We only explain the main differences in the rebalance process in regular consumer groups here.
The partition metadata for the group, which is tracked by the group coordinator, is the partition metadata of all input topics (i.e., user source topics and internal repartition topics we read from).
The group epoch is bumped:
- When a member joins or leaves the group.
- When a member is fenced or removed from the group by the group coordinator.
- When the partition metadata is updated. For instance when a new partition is added or a new topic matching the subscribed topics is created.
- When a member with an assigned warm-up task reports a task changelog offset and task changelog end offset whose difference is less than
acceptable.recovery.lag
. - When a member updates its topology metadata, rack ID, client tags or process ID. Note: Typically, these do not change within the lifetime of a Streams client, so this only happens when a member with static membership rejoins with an updated configuration.
- When an assignment configuration for the group is updated.
Assignment
Every time the group epoch is bumped, a new task assignment is computed by calling the task assignor, which consists of assigning tasks as active, standby or warm-up tasks to members of the group. The task assignor is configured on the broker side both through a global static configuration that defined the default assignor for all groups, and a dynamic group-level configuration that sets the assignor for a specific group.
The AK implementation will provide the following assignors
highly_available
- Similar to the currentHighAvailabilityTaskAssignor
sticky
- similar to the currentStickyTaskAssignor
Which assignors are provided by the group coordinator is not part of the protocol and is specific to each implementation. In this KIP, we do not introduce a pluggable interface for a task assignors, but we leave it open to do so in the future.
If no task assignor is configured globally or on the group-level, the broker picks the assignor. The broker is free to make a dynamic decision here - the AK implementation will choose highly_available
for stateful topologies and sticky
for stateless topologies.
Each task assignor must make sure to fulfill the following invariants:
Each warm-up task that has reached
acceptable.recovery.lag
must be turned into an active task or a standby task, or be unassigned from the member.A stateful task (in the various roles) cannot be assigned to two clients with the same
processId
.
Cumulative task changelog offsets and task changelog end offsets
Whenever a stateful task is added or removed to/from a Streams client, or when warm-up task reaches acceptable.recovery.lag
and in regular intervals, defined by a broker-side configuration task.offset.interval.ms
, each client reports two sets of offsets, the sums of task changelog offsets and the sums of task changelog end offsets. These sets can be used by the assignors, in particular the highly_available
assignor, to determine which tasks are caught-up to acceptable.recovery.lag
and optimize the assignment when multiple clients with a partial copy of the state exist.
Cumulative task changelog offsets
The cumulative changelog offset of a task is the sum of all positions in the changelog topics of the local state stores belonging to the task. A Streams member reports the cumulative changelog offsets for all tasks with local state. That is:
For each processing (active) task, the sum of the offsets last fully replicated to the broker in the changelog topic of each local state. Under at-least-once guarantees, this is going to correspond to the high watermark, under exactly-once semantics, it’s going to be the last stable offset.
For each restoring (active, standby or warm-up), task the sum of the positions in the changelog topic and checkpointed to the local state directory.
For each dormant task (task which is not owned, but which has state locally on disk), the sum of the checkpointed positions in the changelog topic present in its state directory. The client only reports the offsets for dormant tasks that we manage to acquire the lock to the state directory for.
Members that run in the same process (and use the same state directory) may report offsets for overlapping sets of dormant tasks. These offsets can conflict (since they are recorded at different points in time), but these conflicts can easily be resolved by taking the most recently received offsets. The current assignors will be updated to take this into account.
Cumulative task changelog end offsets
Similarly, the member reports the sum of the end offsets in all changelog topics for the currently owned tasks, if available. This simplifies the broker-side task assignment, since the broker doesn’t need to fetch the current end-offsets. Specifically:
For active processing tasks, the cumulative task changelog end offset is the same as the cumulative task changelog offset
For all restoring tasks (active, standby or warm-up), the cumulative end-offset is the sum of the last end offsets cached by the restore consumer. If an end offset of a topic partition is unknown, no end-offset is reported.
For dormant tasks, no end-offset is reported.
Using cumulative task changelog offsets and cumulative task changelog end offsets
By reporting cumulative task changelog offsets and cumulative task changelog end offsets instead of the cumulative task lag, we can determine most task lags in the group coordinator, without the group coordinator or the streams client having to do additional requests. Even if the end offsets of the topic partition is not known to that member, as long as it is known to another member in the group. However, it may happen that no end offset is known in the group coordinator - for example, for tasks that are dormant on at least one client, and no member currently owns that task as active, standby or warm-up tasks. In these cases, the task with unknown end-offsets are never considered to be caught-up, however, the cumulative task changelog offset can still be used when deciding on where to place a new active, standby or warm-up task - typically, by selecting the client with the maximal cumulative task changelog offset that fulfils all other requirements.
Reconciliation of the group
Once a new target assignment is installed, each member will independently reconcile their current assignment with their new target assignment. Ultimately, each member will converge to their target epoch and assignment. The reconciliation process handles active tasks and standby/warm-up tasks differently:
Active tasks are reconciled like topic partitions in the KIP-848 consumer group protocol, that is, for reconciling the target assignment, the reconciliation follows three phases:
The group coordinator first asks the member to revoke any active tasks that are not assigned to that member in the target assignment any more, by removing those active tasks from the set of tasks sent to the member in the heartbeat response
The member must first confirm revocation of these active tasks by removing them from the set of active tasks sent in the heartbeat request
Now, active tasks will be incrementally assigned to the member. An active task is assigned as soon as no other client owns it anymore, that is, once the previous owner (if any) has confirmed the revocation of the corresponding active task.
Standby and warm-up tasks are reconciled in parallel with active tasks, but following slightly different logic:
As for active tasks, standby and warm-up tasks removed from the members target assignment are removed from the assignment sent in the heartbeat response immediately, that is, with the next heartbeat. The member confirms the revocation of the tasks as soon as the state directory (or any other resource related to the task) is closed.
Newly assigned standby and warm-up tasks await that any member with the same
processId
owning that task (as active or standby) confirms revocation of the task by removing it from the corresponding set in a heartbeat response. That is, a task that is not an active task or a standby task on any other member with the sameprocessId
can be assigned immediately. Otherwise, it is assigned as soon as the blocking task is revoked. Target assignments that assign a task to two members with the sameprocessId
are invalid.
Streams Group States
The possible states of the streams group are EMPTY
, ASSIGNING
, RECONCILING
, STABLE
, NOT_READY
, DEAD
as for consumer groups.
Global & Dynamic Group Configuration
We make core assignment options configurable centrally on the broker, without relying on each clients configuration. This allows tuning a streams group without redeploying the streams application. The three core assignment options will be introduced on the broker-side: acceptable.recovery.lag
, num.warmup.replicas
and num.standby.replicas
. They can be configured both globally on the broker, and dynamically for specific streams groups through the IncrementalAlterConfigs
and DescribeConfigs
RPCs.
Online Migration from a Classic Consumer Group to a Streams Group
As in KIP-848, upgrading from a classic consumer group currently used in Kafka Streams to the new streams group is possible by rolling the Streams clients, assuming the Streams protocol is enabled on the brokers. When the first consumer of a Streams client using the new Streams rebalance protocol joins the group, the group is converted from a classic group to a streams group. When the last consumer of an Streams client using the new Streams rebalance protocol leaves the group, the group is converted back to a classic group. Note that the group epoch starts at the current group generation. During the migration, all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgraded consumers are translated to the new Streams protocol.
Let's recapitulate how the classic rebalance protocol works in Streams. First, the consumers in a Streams client join or re-join the group with the JoinGroup API. The JoinGroup request contains the subscribed topics, the owned partitions, the generation ID, the Streams-specific subscription info, and some other fields. The Streams-specific subscription info contains the process ID of the Streams client, the owned active and standby tasks, the task offset sums, the user endpoints for Interactive Query, and some more fields. When all the consumers of the Streams clients of the Streams application have joined, the group coordinator picks a leader for the group out of the consumers and sends back the JoinGroup response to all the members of the group (i.e., consumers that joined). The JoinGroup response contains the member ID, the generation ID, and the member ID of the leader (to make the leader aware of leadership) as well as all the consumer and Streams-specific metadata about subscribed topics, owned partitions, tasks, etc. The leader uses the data in the JoinGroup response for computing the assignment. Second, all the members in the Streams clients collect their assignment—computed by the leader—by using the SyncGroup API. The leader sends the computed assignment with the SyncGroup request to the group coordinator, and the group coordinator distributes the assignment to the members through the SyncGroup response. In parallel, the members heartbeat with the Heartbeat API in order to maintain their session. The Heartbeat API is also used by the group coordinator to inform the members about an ongoing rebalance. All those interactions are synchronized on the generation of the group. It is important to note that the consumer does not make any assumption about the generation ID. It basically uses what it receives from the group coordinator. The classic rebalance protocol used in Streams supports two modes: Eager and Cooperative. In the eager mode, the consumer revokes all its partitions before rejoining the group during a rebalance. In the cooperative mode, the consumer does not revoke any partitions before rejoining the group. However, it revokes the partitions that it does not own anymore when it receives its new assignment and rejoins immediately if it had to revoke any partitions.
The Streams rebalance protocol relies on the StreamsGroupHeartbeat API to do all the above. Concretely, the API updates the group state, provides the active, standby, and warm-up tasks owned by the Streams client, gets back the assignment, and updates the session. We can remap those to the classic protocol as follows: The JoinGroup API updates the group state and provides the active and standby tasks owned (warm-ups are standby tasks in the classic protocol), the SyncGroup API gets back the task assignment, and the Heartbeat API updates the session. The main difference here is that the JoinGroup and SyncGroup do not run continuously. The group coordinator has to trigger it when it is needed by returning the REBALANCE_IN_PROGRESS error in the heartbeat response.
The implementation of the new Streams protocol in the group coordinator will handle the JoinGroup and SyncGroup APIs of the classic protocol. The group coordinator will ensure that a rebalance is triggered when the assignment of a Streams client on the classic protocol needs to be updated by returning a REBALANCE_IN_PROGRESS error in the heartbeat response.
When the first consumer of a Streams client that uses the new Streams rebalance protocol joins a classic group, the classic group in the group coordinator will be transformed to a streams group. The generation ID becomes the group epoch. The consumer of the Streams client initializes the group with the topology metadata. A rebalance is triggered by the group coordinator for all consumers still on the classic protocol by sending the REBALANCE_IN_PROGRESS error in the heartbeat. The consumers on the classic protocol send their owned active and standby tasks in the JoinGroup request to the group coordinator. The consumers on the Streams protocol send their owned active, standby, and warm-up tasks (i.e., should be empty since they just joined) through the StreamsGroupHeartbeat request to the group coordinator. In contrast to the classic protocol, the group coordinator does not pick a leader for computing the assignment but computes the assignment itself. That is the target assignment. The group epoch is increased. From the target assignment, the current member assignment is computed depending on the owned tasks. If members do not need to revoke any tasks, their member epoch is increased to the group epoch. If the members need to revoke a task, their member epoch stays the same. The group coordinator sends the new member epoch alongside the current member assignment through the StreamsGroupHeartbeat response to the members on the Streams protocol. The members still on the classic protocol receive the member epoch through the JoinGroup response, but they still need to wait for the SyncGroup response for their current assignment. Basically, the group coordinator translates the JoinGroup and SyncGroup API to the Streams protocol internally and communicates to members still on the classic protocol via JoinGroup, SyncGroup as well as Heartbeat and with the members on the Streams protocol via the StreamsGroupHeartbeat.
The Streams protocol also assigns warm-up tasks. However, the classic protocol does not have any notion of a warm-up task. If the group coordinator assigns a warm-up task to a Streams client on the classic protocol, that warm-up task is translated to a standby task in the assignment for the Streams client on the classic protocol. The group coordinator chooses one of the Streams clients on the classic protocol to trigger a probing rebalance.
A more detailed description of this process can be found in KIP-848 in Section Supporting Online Consumer Group Upgrade.
Example
- classic group (generation=23)
- A
- B
- assignment
- A - active=[0_0, 0_2, 0_4], standby=[0_1, 0_3, 0_5]
- B - active=[0_1, 0_3, 0_5], standby=[0_0, 0_2, 0_4]
C joins using the Streams protocol. The classic group is transformed to a streams group.
- streams group (group epoch=24)
- A (classic)
- B (classic)
- C (streams)
- target assignment (epoch=24)
- A - active=[0_0, 0_2, 0_3], standby=[0_1, 0_5], warm-up=[]
- B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
- C - active=[], standby=[0_0, 0_4], warm-up=[0_2, 0_5]
- member assignment
- A
- Receives REBALANCE_IN_PROGRESS error in heartbeat response
- JoinGroupRequest: active=[0_0, 0_2, 0_4], standby=[0_1, 0_3, 0_5]
- JoinGroupResponse: generation ID=24
- SyncGroupResponse: active=[0_0, 0_2, 0_3], standby=[0_1, 0_5]
- B
- Receives REBALANCE_IN_PROGRESS error in heartbeat response
- JoinGroupRequest: active=[0_1, 0_3, 0_5], standby=[0_0, 0_2, 0_4]
- JoinGroupResponse: generation ID=24
- SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
- C
- StreamsGroupHeartbeat: epoch=24, active=[], standby=[0_0, 0_4], warm-up=[0_2, 0_5]
- A
C's warm-up task 0_2 is caught up. A is requested to revoke active task 0_2, thus A does not increase its generation ID.
- streams group (group epoch=25)
- A (classic)
- B (classic)
- C (streams)
- target assignment (epoch=25)
- A - active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
- B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
- C - active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]
- member assignment
- A
- Receives REBALANCE_IN_PROGRESS error in heartbeat response
- JoinGroupRequest: active=[0_0, 0_2, 0_3], standby=[0_1, 0_5], warm-up=[]
- JoinGroupResponse: generation ID=24
- SyncGroupResponse: active=[0_0, 0_3], standby=[0_1, 0_5]
- B
- Receives REBALANCE_IN_PROGRESS error in heartbeat response
- JoinGroupRequest: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
- JoinGroupResponse: generation ID=25
- SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
- C
- StreamsGroupHeartbeat: epoch=25, active=[], standby=[0_0, 0_4], warm-up=[0_5]
- A
A follow-up rebalance is triggered so that A can report the revoked active task 0_2. Since A does not need to revoke tasks anymore the generation ID is increased.
- streams group (group epoch=25)
- A (classic)
- B (classic)
- C (streams)
- target assignment (epoch=25)
- A - active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
- B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
- C - active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]
- member assignment
- A
- Receives REBALANCE_IN_PROGRESS error in heartbeat response
- JoinGroupRequest: active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
- JoinGroupResponse: generation ID=25
- SyncGroupResponse: active=[0_0, 0_3], standby=[0_1, 0_5]
- B
- Receives REBALANCE_IN_PROGRESS error in heartbeat response
- JoinGroupRequest: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
- JoinGroupResponse: generation ID=25
- SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
- C
- StreamsGroupHeartbeat: epoch=25, active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]
- A
If C leaves the group now, the group coordinator transforms back the group to a classic group and only use JoinGroup, SyncGroup, and Heartbeat to communicate with the members. The records of the Streams protocol are deleted from the __consumer_offsets
topic.
Public Interfaces
This section lists the changes impacting the public interfaces.
KRPC
New Errors
The conditions in which these errors are returned are stated further down.
STREAMS_INVALID_TOPOLOGY
- The supplied topology is invalid. Returned if the client sends a topology that does not fulfill the expected invariants, see below in the sections "Request Validation".STREAMS_INVALID_TOPOLOGY_EPOCH
- The client provided a invalid topology epoch with respect to the stream group state, for example, the topology was changed by the epoch not bumped.STREAMS_TOPOLOGY_FENCED
- When a client attempts to join with an outdated topology epoch.
StreamsGroupHeartbeat
The StreamsGroupHeartbeat
API is the new core API used by streams application to form a group. The API allows members to initialize a topology, advertise their state, and their owned tasks. The group coordinator uses it to assign/revoke tasks to/from members. This API is also used as a liveness check.
Request Schema
The member must set all the (top level) fields with the exception of RackId
and InstanceId
when it joins for the first time or when an error occurs (e.g. request timed out). Otherwise, it is expected to only fill in the fields which have changed since the last heartbeat.
{ "apiKey": TBD, "type": "request", "listeners": ["broker"], "name": "StreamsGroupHeartbeatRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID generated by the streams consumer. The member ID must be kept during the entire lifetime of the streams consumer process." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." }, { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided or if it didn't change since the last heartbeat; the instance ID for static membership otherwise." }, { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of the member otherwise." }, { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its tasks otherwise." }, { "name": "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The topology metadata of the streams application. Used to initialize the topology of the group and to check if the topology corresponds to the topology initialized for the group. Only sent when memberEpoch = 0, must be non-empty. Null otherwise.", "fields": [ { "name": "Epoch", "type": "int32", "versions": "0+", "about": "The epoch of the topology. Used to check if the topology corresponds to the topology initialized on the brokers." }, { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", "about": "The sub-topologies of the streams application.", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "String to uniquely identify the subtopology. Deterministically generated from the topology" }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+", "about": "The regular expressions identifying topics the subtopology reads from." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", "about": "The repartition topics the subtopology writes to." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of source topics that are internally created repartition topics. Created automatically." }, { "name": "CopartitionGroups", "type": "[]CopartitionGroup", "versions": "0+", "about": "A subset of source topics that must be copartitioned.", "fields": [ { "name": "SourceTopics", "type": "[]int16", "versions": "0+", "about": "The topics the topology reads from. Index into the array on the subtopology level." }, { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+", "about": "Regular expressions identifying topics the subtopology reads from. Index into the array on the subtopology level." }, { "name": "RepartitionSourceTopics", "type": "[]int16", "versions": "0+", "about": "The set of source topics that are internally created repartition topics. Index into the array on the subtopology level." } ]} ]} ] }, { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Currently owned active tasks for this client. Null if unchanged since last heartbeat." }, { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Currently owned standby tasks for this client. Null if unchanged since last heartbeat." }, { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Currently owned warm-up tasks for this client. Null if unchanged since last heartbeat." }, { "name": "ProcessId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Identity of the streams instance that may have multiple consumers. Null if unchanged since last heartbeat." }, { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "User-defined endpoint for Interactive Queries. Null if unchanged since last heartbeat or if not defined on the client." }, { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Used for rack-aware assignment algorithm. Null if unchanged since last heartbeat." }, { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Cumulative changelog offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." }, { "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Cumulative changelog end-offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." }, { "name": "ShutdownApplication", "type": "bool", "versions": "0+", "default": false, "about": "Whether all Streams clients in the group should shut down." } ], "commonStructs": [ { "name": "KeyValue", "versions": "0+", "fields": [ { "name": "Key", "type": "string", "versions": "0+", "about": "key of the config" }, { "name": "Value", "type": "string", "versions": "0+", "about": "value of the config" } ]}, { "name": "TopicInfo", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The name of the topic." }, { "name": "Partitions", "type": "int32", "versions": "0+", "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, { "name": "ReplicationFactor", "type": "int16", "versions": "0+", "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." }, { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", "about": "Topic-level configurations as key-value pairs." } ]}, { "name": "Endpoint", "versions": "0+", "fields": [ { "name": "Host", "type": "string", "versions": "0+", "about": "host of the endpoint" }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "port of the endpoint" } ]}, { "name": "TaskOffset", "versions": "0+", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition." }, { "name": "Offset", "type": "int64", "versions": "0+", "about": "The offset." } ]}, { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } ]} ] }
Required ACL
READ
on groupCREATE
on cluster resource, orCREATE
on all topics inStateChangelogTopics
andRepartitionSourceTopics
- Note that this ACLs are only required if the group coordinator should create the internal topics implicitly. If the internal topics are created explicitly, this ACL is not needed for the Streams group heartbeat.
DESCRIBE_CONFIGS
on all topics included in the message
Request Validation
INVALID_REQUEST
is returned should the request not obey to the following invariants:
GroupId
must be non-empty.Either
MemberId
is non-empty orMemberEpoch
is 0.MemberEpoch
must be >= -2.InstanceId
, if not null, must be non-empty.RebalanceTimeoutMs
must be larger than zero in the first heartbeat request.ActiveTasks
,StandbyTasks
andWarmupTasks
have to be disjoint setsEach element of
ActiveTasks
,StandbyTasks
andWarmupTasks
has to be a valid task ID in the topology initialized for the group ID.ActiveTasks
,StandbyTasks
andWarmupTasks
have to be non-null and empty when joining (member epoch is 0)- If
MemberEpoch
is 0,Topology
should be non-null. Otherwise, it should benull.
STREAMS_INVALID_TOPOLOGY
is returned when the request contains a new topology and should the topology not obey the following invariants:
A
StateChangelogTopic
topics must not have a defined partition number.A
R
epartitionSourceTopic
cannot be inSourceTopics
orStateChangelogTopics
of any subtopology.A
S
tateChangelogTopic
cannot be inSourceTopics
orRepartitionSinkTopic
orRepartitionSourceTopics
of any subtopology.A
R
epartitionSourceTopic
of one subtopology must be aRepartitionSinkTopic
of at least one other subtopology.- All indices in
CopartitionGroups
must be valid indices in the corresponding topic arrays.
Request Handling
When the group coordinator handles a StreamsGroupHeartbeat
request:
- Performs request validation.
If the member joins the group (i.e. member epoch is 0):
- Look up or create the group.
- If the group is created the topology epoch of the group is set to the topology epoch sent by the member.
GROUP_ID_NOT_FOUND
is returned if the group ID is associated with a group type that is notstreams
orclassic
(the latter will be allowed for migration).- Compare the topology epoch of the request ER to the topology epoch of the group EG
- If ER=EG, check if topology metadata in the request is equal to the topology metadata of the group, otherwise fail with
STREAMS_INVALID_TOPOLOGY_EPOCH
- If ER<EG, fail with
STREAMS_TOPOLOGY_FENCED
- If ER>EG+1, fail with
STREAMS_INVALID_TOPOLOGY_EPOCH
- If ER=EG+1, update the topology by writing the new topology record to the offset topic
- If ER=EG, check if topology metadata in the request is equal to the topology metadata of the group, otherwise fail with
- Creates the member.
- Look up or create the group.
- If the member is already part of the group (i.e. member epoch is greater than 0):
- Looks up the group.
GROUP_ID_NOT_FOUND
is returned if the group ID does not exist anymore.- If the member does not exist, returns
UNKNOWN_MEMBER_ID
- Checks whether the member epoch matches the member epoch in its current assignment.
FENCED_MEMBER_EPOCH
is returned otherwise. The member is also removed from the group.- There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that it knows about and its request will be rejected with a
FENCED_MEMBER_EPOCH
. This will be handled as in KIP-848.
- There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that it knows about and its request will be rejected with a
- If the topology epoch in the request is less than the topology epoch of the group, set state
STALE_TOPOLOGY
in the response.
- Updates information of the member if needed. The group epoch is incremented if there is any change.
- If the topology or topic metadata changed, detect any topology / topic mismatches as described earlier in this document.
- Reconcile the member assignments as explained earlier in this document.
Reponse Schema
The group coordinator will only set the ActiveTasks
, StandbyTasks
and WarmupTasks
fields until the member acknowledges that it has converged to the desired assignment. This is done to ensure that the members converge to the target assignment.
{ "apiKey": TBD, "type": "response", "name": "StreamsGroupHeartbeatResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_MEMBER_ID (version 0+) // - FENCED_MEMBER_EPOCH (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - CLUSTER_AUTHORIZATION_FAILED (version 0+) // - STREAMS_INVALID_TOPOLOGY (version 0+) // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+) // - STREAMS_TOPOLOGY_FENCED (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no error" }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member id is always generated by the streams consumer."}, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The member epoch." }, { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+", "about": "The heartbeat interval in milliseconds." }, { "name": "AcceptableRecoveryLag", "type": "int32", "versions": "0+", "about": "The maximal lag a warm-up task can have to be considered caught-up." }, { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+", "about": "The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed." }, { "name": "Status", "type": "[]Status", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Indicate zero or more status for the group. Null if unchanged since last heartbeat." }, // The streams app knows which partitions to fetch from given this information { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Assigned active tasks for this client. Null if unchanged since last heartbeat." }, { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Assigned standby tasks for this client. Null if unchanged since last heartbeat." }, { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." }, // IQ-related information { "name": "PartitionsByUserEndpoint", "type": "[]EndpointToPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." , "fields": [ { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "about": "User-defined endpoint to connect to the node" }, { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+", "about": "All partitions available on the node" } ] } ], "commonStructs": [ { "name": "Status", "versions": "0+", "fields": [ // Possible status codes // 0 - STALE_TOPOLOGY - The topology epoch supplied is lower than the topology epoch for this streams group. // 1 - MISSING_SOURCE_TOPICS - One or more source topics are missing or a source topic regex resolves to zero topics. // Missing topics are indicated in the StatusDetail. // 2 - INCORRECTLY_PARTITIONED_TOPICS - One or more topics are incorrectly partitioned, that is, they are not copartitioned despite being // part of a copartition group, or the number of partitions in a changelog topic does not correspond // to the maximal number of source topic partition for that subtopology. // Incorrectly partitioned topics are indicated in the StatusDetail. // 3 - MISSING_INTERNAL_TOPICS - One or more internal topics are missing. // Missing topics are indicated in the StatusDetail. // The group coordinator will attempt to create all missing internal topics, if any errors occur during // topic creation, this will be indicated in StatusDetail. // 4 - SHUTDOWN_APPLICATION - A client requested the shutdown of the whole application. { "name": "StatusCode", "type": "int8", "versions": "0+", "about": "A code to indicate that a particular status is active for the group membership" }, { "name": "StatusDetail", "type": "string", "versions": "0+", "about": "A string representation of the status." } ]}, { "name": "TopicPartition", "versions": "0+", "fields": [ { "name": "Topic", "type": "string", "versions": "0+", "about": "topic name" }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "partitions" } ]}, { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } ]}, { "name": "Endpoint", "versions": "0+", "fields": [ { "name": "Host", "type": "string", "versions": "0+", "about": "host of the endpoint" }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "port of the endpoint" } ]} ] }
StreamsGroupDescribe API
Request Schema
{ "apiKey": TBD, "type": "request", "listeners": ["broker"], "name": "StreamsGroupDescribeRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The ids of the groups to describe" }, { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", "about": "Whether to include authorized operations." } ] }
Required ACL
Read Group
Request Validation
INVALID_REQUEST
is returned should the request not obey the schema definition.
INVALID_GROUP_ID
is returned should the group ID be empty.
Request Handling
When the group coordinator handle a StreamsGroupDescribeRequest request:
Checks whether the group ids exists. If it does not,
GROUP_ID_NOT_FOUND
is returned. Similarly, it is returned if the group ID is associated with a group type that is notstreams
.Looks up the groups and returns the response.
Response Schema
{ "apiKey": TBD, "type": "response", "name": "StreamsGroupDescribeResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - INVALID_GROUP_ID (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", "about": "Each described group.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The describe error, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group ID string." }, { "name": "GroupState", "type": "string", "versions": "0+", "about": "The group state string, or the empty string." }, { "name": "GroupEpoch", "type": "int32", "versions": "0+", "about": "The group epoch." }, { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", "about": "The assignment epoch." }, { "name": "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The topology metadata currently initialized for the streams application. Can be null in case of a describe error.", "fields": [ { "name": "Epoch", "type": "int32", "versions": "0+", "about": "The epoch of the currently initialized topology for this group." }, { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The subtopologies of the streams application. This contains the configured subtopologies, where the number of partitions are set and any regular expressions are resolved to actual topics. Null if the group is uninitialized, source topics are missing or incorrectly partitioned.", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "String to uniquely identify the subtopology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the subtopology reads from." }, { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", "about": "The repartition topics the subtopology writes to." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of source topics that are internally created repartition topics. Created automatically." } ]} ]}, { "name": "Members", "type": "[]Member", "versions": "0+", "about": "The members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The member epoch." }, { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The member instance ID for static membership." }, { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack ID." }, { "name": "ClientId", "type": "string", "versions": "0+", "about": "The client ID." }, { "name": "ClientHost", "type": "string", "versions": "0+", "about": "The client host." }, { "name": "TopologyEpoch", "type": "int32", "versions": "0+", "about": "The epoch of the topology on the client." }, { "name": "ProcessId", "type": "string", "versions": "0+", "about": "Identity of the streams instance that may have multiple clients. " }, { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "User-defined endpoint for Interactive Queries. Null if not defined for this client." }, { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "about": "Used for rack-aware assignment algorithm." }, { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+", "about": "Cumulative changelog offsets for tasks." }, { "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+", "about": "Cumulative changelog end offsets for tasks." }, { "name": "Assignment", "type": "Assignment", "versions": "0+", "about": "The current assignment." }, { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", "about": "The target assignment." }, { "name": "IsClassic", "type": "bool", "versions": "0+", "about": "True for classic members that have not been upgraded yet." } ]}, { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this group." } ] } ], "commonStructs": [ { "name": "Endpoint", "versions": "0+", "fields": [ { "name": "Host", "type": "string", "versions": "0+", "about": "host of the endpoint" }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "port of the endpoint" } ]}, { "name": "TaskOffset", "versions": "0+", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition." }, { "name": "Offset", "type": "int64", "versions": "0+", "about": "The offset." } ]}, { "name": "TopicPartitions", "versions": "0+", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]}, { "name": "Assignment", "versions": "0+", "fields": [ { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "about": "Active tasks for this client." }, { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "about": "Standby tasks for this client." }, { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "about": "Warm-up tasks for this client. " } ]}, { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } ]}, { "name": "KeyValue", "versions": "0+", "fields": [ { "name": "Key", "type": "string", "versions": "0+", "about": "key of the config" }, { "name": "Value", "type": "string", "versions": "0+", "about": "value of the config" } ]}, { "name": "TopicInfo", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The name of the topic." }, { "name": "Partitions", "type": "int32", "versions": "0+", "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, { "name": "ReplicationFactor", "type": "int16", "versions": "0+", "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." }, { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", "about": "Topic-level configurations as key-value pairs." } ]} ] }
Response Handling
Nothing particular.
Records
This section describes the new record types required for the new protocol. Similar to KIP-848, we define new record types that persist all data related to groups, their members and their assignments in the __consumer_offsets
compacted topic. Each record has a dedicated key type that uses the version to differentiate the records from the records introduced in KIP-848, and those used to persist the committed offsets.
Compared to consumer groups, we introduce one extra record type, that stores information about the topology of the group.
The size of the persisted member metadata in the __consumer_offset
topic will have to respect the max message size. But those records are persisted separately for each member, so it should be less of a concern. The same problem affects plain consumers in KIP-848 for very large consumer groups. Eventually, we may implement a way to split the records into multiple messages, and there are currently plans to add transaction support to the group coordinator to make sure the records can be updated atomically. For the first version of the protocol, we will limit those records by the max message size.
Group Metadata
The assignment-related metadata for a group with N members is stored with N+1 records. One metadata record per member and one metadata record for the group.
- The member metadata record, keyed by the group ID and the member ID, stores all relevant metadata that is sent by a member to the coordinator in a heartbeat request. A notable exception are task changelog offsets and task changelog end offsets, which are not persisted, as they are constantly changing.
- The group metadata, keyed by the group ID, contains the group epoch.
The records evolve in the following way:
- When the first member joins the group, the group metadata and the member metadata records for the member are first created.
- Every time a new member joins the group, another member metadata record is added
- Whenever a member updates its member metadata through the heartbeat, a new version of the member metadata record is appended.
- When a member leaves the group, a tombstone is written for the member metadata key.
When the group lost all members (or never had any) and offsets.retention.minutes
expires, the group metadata tombstone is written. This can only happen in group state EMPTY.
StreamsGroupMemberMetadataKey
{ "type": "data", "name": "StreamsGroupMemberMetadataKey", "validVersions": TBD, "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": TBD, "about": "The group id." }, { "name": "MemberId", "type": "string", "versions": TBD, "about": "The member id." } ] }
StreamsGroupMemberMetadataValue
{ "type": "data", "name": "StreamsGroupMemberMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "InstanceId", "versions": "0+", "nullableVersions": "0+", "type": "string", "about": "The (optional) instance ID for static membership." }, { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string", "about": "The (optional) rack id." }, { "name": "ClientId", "versions": "0+", "type": "string", "about": "The client id." }, { "name": "ClientHost", "versions": "0+", "type": "string", "about": "The client host." }, { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "The rebalance timeout." }, { "name": "TopologyEpoch", "type": "int32", "versions": "0+", "about": "The epoch of the topology. Must be non-zero." }, { "name": "ProcessId", "type": "string", "versions": "0+", "about": "Identity of the streams instance that may have multiple consumers." }, { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "User-defined endpoint for running interactive queries on this instance." }, { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "about": "Used for rack-aware assignment algorithm." } ], "commonStructs": [ { "name": "Endpoint", "versions": "0+", "fields": [ { "name": "Host", "type": "string", "versions": "0+", "about": "host of the endpoint" }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "port of the endpoint" } ]}, { "name": "KeyValue", "versions": "0+", "fields": [ { "name": "Key", "type": "string", "versions": "0+", "about": "key of the config" }, { "name": "Value", "type": "string", "versions": "0+", "about": "value of the config" } ] } ] }
StreamsGroupMetadataKey
{ "type": "data", "name": "StreamsGroupMetadataKey", "validVersions": TBD, "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": TBD, "about": "The group id." } ] }
StreamsGroupMetadataValue
{ "type": "data", "name": "StreamsGroupMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Epoch", "versions": "0+", "type": "int32", "about": "The group epoch." } ] }
Group Topology Metadata
The topology for the group, and the metadata for input topic partitions consumed in the topology, are persisted in two records.
- The partition metadata record, keyed by the group ID, contains metadata of all topic partitions consumed by the current topology
- The topology record, keyed by the group ID, stores the topology metadata sent in the heartbeat request when members join.
The records are evolved in the following way
- When the group coordinator receives valid topology metadata in a heartbeat request, both records are created, using the topology sent in the request and the topic metadata currently known to the group coordinator.
- When the group coordinator detects that the partition metadata for one of the input partitions used in the topology changed (by comparing the previous partition metadata record to the currently known topic metadata in the broker) during the handling of a group heartbeat, the stream group partition metadata record is updated, and a new assignment is computed.
When the group lost all members and offsets.retention.minutes
expires, tombstones is written to remove the records.
StreamsGroupPartitionMetadataKey
{ "type": "data", "name": "StreamsGroupPartitionMetadataKey", "validVersions": TBD, "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": TBD, "about": "The group id." } ] }
StreamsGroupPartitionMetadataValue
{ "type": "data", "name": "StreamsGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Topics", "versions": "0+", "type": "[]TopicMetadata", "about": "The list of topic metadata.", "fields": [ { "name": "TopicId", "versions": "0+", "type": "uuid", "about": "The topic id." }, { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", "about": "The number of partitions of the topic." }, { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata", "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [ { "name": "Partition", "versions": "0+", "type": "int32", "about": "The partition number." }, { "name": "Racks", "versions": "0+", "type": "[]string", "about": "The set of racks that the partition is mapped to." } ]} ]} ] }
StreamsGroupTopologyKey
{ "type": "data", "name": "StreamsGroupTopologyKey", "validVersions": TBD, "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": TBD, "about": "The group id." } ] }
StreamsGroupTopologyValue
{ "type": "data", "name": "StreamsGroupTopologyValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Epoch", "type": "int32", "versions": "0+", "about": "The epoch of the topology. " }, { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", "about": "The sub-topologies of the streams application.", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "String to uniquely identify the subtopology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+", "about": "Regular expressions identifying topics the subtopology reads from." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of state changelog topics associated with this subtopology." }, { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", "about": "The repartition topics the subtopology writes to." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of source topics that are internally created repartition topics." }, { "name": "CopartitionGroups", "type": "[]CopartitionGroup", "versions": "0+", "about": "A subset of source topics that must be copartitioned.", "fields": [ { "name": "SourceTopics", "type": "[]int16", "versions": "0+", "about": "The topics the topology reads from. Index into the array on the subtopology level." }, { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+", "about": "Regular expressions identifying topics the subtopology reads from. Index into the array on the subtopology level." }, { "name": "RepartitionSourceTopics", "type": "[]int16", "versions": "0+", "about": "The set of source topics that are internally created repartition topics. Index into the array on the subtopology level." } ] } ] } ], "commonStructs": [ { "name": "TopicConfig", "versions": "0+", "fields": [ { "name": "key", "type": "string", "versions": "0+", "about": "The key of the topic-level configuration." }, { "name": "value", "type": "string", "versions": "0+", "about": "The value of the topic-level configuration," } ] }, { "name": "TopicInfo", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The name of the topic." }, { "name": "Partitions", "type": "int32", "versions": "0+", "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, { "name": "ReplicationFactor", "type": "int16", "versions": "0+", "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." }, { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", "about": "Topic-level configurations as key-value pairs." } ]} ] }
Current Member Assignment
For each member, there is a record to store the current member assignment. The record is first created when a member joins (with an empty assignment). Every time an assignment is computed for the member during reconciliation, it is compared to the previous assignment for the member. If the assignment changed, a new version of the current member assignment record is appended to the consumer offset topic. When the member leaves the group, a tombstone is written to remove the record.
StreamsGroupCurrentMemberAssignmentKey
{ "type": "data", "name": "StreamsGroupCurrentMemberAssignmentKey", "validVersions": TBD, "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": TBD, "about": "The group id." }, { "name": "MemberId", "type": "string", "versions": TBD, "about": "The member id." } ] }
StreamsGroupCurrentMemberAssignmentValue
{ "type": "data", "name": "StreamsGroupCurrentMemberAssignmentValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "MemberEpoch", "versions": "0+", "type": "int32", "about": "The current member epoch that is expected from the member in the heartbeat request." }, { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32", "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." }, { "name": "State", "versions": "0+", "type": "int8", "about": "The member state. See StreamsGroupMember.MemberState for the possible values." }, { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned active tasks for this streams client." }, { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned standby tasks for this streams client." }, { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned warm-up tasks for this streams client." }, { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": "[]TaskIds", "about": "The active tasks that must be revoked by this member." }, { "name": "StandbyTasksPendingRevocation", "versions": "0+", "type": "[]TaskIds", "about": "The standby tasks that must be revoked by this member." }, { "name": "WarmupTasksPendingRevocation", "versions": "0+", "type": "[]TaskIds", "about": "The warmup tasks that must be revoked by this member." } ], "commonStructs": [ { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } ]} ] }
Streams Group Target Assignment
The target assignment is stored in N + 1 records, where N is the number of members in the group. When a new target assignment is computed, the group coordinator will compare it with the current target assignment and only write the difference between the two assignments to the __consumer_offsets
partition. That also means if a member left the group, a corresponding tombstone record is written. The assignment must be atomic, so the group coordinator will ensure that all the records are written in a single batch, limiting the size to the maximum batch size (default 1MB), as in consumer groups.
StreamsGroupTargetAssignmentMemberKey
{ "type": "data", "name": "StreamsGroupTargetAssignmentMemberKey", "validVersions": TBD, "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": TBD, "about": "The group id." }, { "name": "MemberId", "type": "string", "versions": TBD, "about": "The member id." } ] }
StreamsGroupTargetAssignmentMemberValue
{ "type": "data", "name": "StreamsGroupTargetAssignmentMemberValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned active tasks for this streams client." }, { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned standby tasks for this streams client." }, { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned warm-up tasks for this streams client." } ], "commonStructs": [ { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } ]} ] }
StreamsGroupTargetAssignmentMetadataKey
{ "type": "data", "name": "StreamsGroupTargetAssignmentMetadataKey", "validVersions": TBD, "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": TBD, "about": "The group id." } ] }
StreamsGroupTargetAssignmentMetadataValue
{ "type": "data", "name": "StreamsGroupTargetAssignmentMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "AssignmentEpoch", "versions": "0+", "type": "int32", "about": "The assignment epoch." } ] }
Broker Metrics
The existing group metrics are extended to differentiate between streams groups and consumer groups and account for streams group states.
Number of groups based on type of the protocol, where the list of protocols is extended by the
protocol=streams
variation.
kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|classic|streams}
Number of streams groups based on state
kafka.server:type=group-coordinator-metrics,name=streams-group-count,state={empty|not_ready|assigning|reconciling|stable|dead}
Streams group rebalances sensor
kafka.server:type=group-coordinator-metrics,name=streams-group-rebalance-rate
kafka.server:type=group-coordinator-metrics,name=streams-group-rebalance-count
Broker Configurations
The new group protocol would be guarded by enabling a new version of the group.version
feature flag on the broker side.
Updated properties
Configuration | Description | Values |
---|---|---|
| The list of enabled rebalance protocols. |
This will be added to the default value of this configuration property once this feature is complete. |
New properties
Name | Type | Default | Doc |
---|---|---|---|
group.streams.session.timeout.ms | int | 45s | The timeout to detect client failures when using the streams group protocol. |
group.streams.min.session.timeout.ms | int | 45s | The minimum session timeout. |
group.streams.max.session.timeout.ms | int | 60s | The maximum session timeout. |
group.streams.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. |
group.streams.min.heartbeat.interval.ms | int | 5s | The minimum heartbeat interval. |
group.streams.max.heartbeat.interval.ms | int | 15s | The maximum heartbeat interval. |
group.streams.max.size | int | MaxValue | The maximum number of streams clients that a single streams group can accommodate. |
group.streams.acceptable.recovery.lag | long | 10’000 | The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment. |
group.streams.num.warmup.replicas | int | 2 | The maximum number of warmup replicas. |
group.streams.max.warmup.replicas | int | 20 | Maximum for dynamic configurations of the warmup replica configuration |
group.streams.num.standby.replicas | int | 0 | The number of standby replicas for each task. |
group.streams.max.standby.replicas | int | 2 | Maximum for dynamic configurations of the standby replica configuration |
group.streams.task.offset.interval.ms | int | 60s | The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed. |
group.streams.min.task.offset.interval.ms | int | 15s | Minimum for dynamic configurations of the task offset interval. Used to restrict users of the cluster from sending the task changelog offsets too often. |
group.streams.assignor.name | string | null | The name of the task assignor used for all streams groups. Can be |
Group Configurations
We will add new configurations for the resource type GROUP
in DescribeConfigs
and IncrementalAlterConfigs
to override the default broker configurations dynamically for specific groups.
Name | Type | Default | Doc |
---|---|---|---|
group.streams.session.timeout.ms | int | 45s | The timeout to detect client failures when using the streams group protocol. |
group.streams.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. |
group.streams.acceptable.recovery.lag | long | 10’000 | The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment. |
group.streams.num.warmup.replicas | int | 2 | The maximum number of warmup replicas. |
group.streams.num.standby.replicas | int | 0 | The number of standby replicas for each task. |
group.streams.task.offset.interval.ms | int | 60s | The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed. |
group.streams.assignor.name | string | null | The name of the task assignor used for this streams groups. Can be |
Streams API
In a future major version, when the classic
group protocol will be deprecated, the interfaces introduced in KIP-924 would be deprecated, unless an integration with this KIP is proposed in a follow-up KIP.
Streams Configurations
New configurations
Name | Type | Default | Doc |
---|---|---|---|
group.protocol | enum | classic | A flag which indicates if the new protocol should be used or not. It could be: classic or streams |
topology.epoch | int | 0 | The epoch of the topology for the streams group. Ignored if |
Deprecations
In a future major version, when the classic
group protocol will be deprecated, the following configuration options will be deprecated:
acceptable.recovery.lag
max.warmup.replicas
num.standby.replicas
probing.rebalance.interval.ms
rack.aware.assignment.tags
rack.aware.assignment.strategy
rack.aware.assignment.traffic_cost
rack.aware.assignment.non_overlap_cost
task.assignor.class
Note that both the rack-aware task assignor and customizable client-side task assignment may be introduced into the new protocol in follow-up KIPs (see Future Work section). In this case, the latter 5 configuration options would not be deprecated in the future major version.
Admin API
Add the following methods to the org.apache.kafka.client.admin.AdminClient
interface, mostly backed by the same implementations as the consumer group API.
Method signature | Description | RPC used |
---|---|---|
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) | Alter offset information for a streams group. | OffsetCommit |
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterStreamsGroupOffsetsOptions options) | Alter offset information for a streams group. | OffsetCommit |
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<String> topics) | Delete offset information for a set of topics in a streams group. | OffsetDelete |
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<String> topics, DeleteStreamsGroupOffsetsOptions options) | Delete offset information for a set of topics in a streams group. | OffsetDelete |
DeleteStreamsGroupResult deleteStreamsGroups(Collection<String> groupIds) | Delete streams groups from the cluster. | DeleteGroups |
DeleteStreamsGroupResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupOptions options) | Delete streams groups from the cluster. | DeleteGroups |
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds) | Describe some streams groups in the cluster. | StreamsGroupDescribe |
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions options) | Describe some streams groups in the cluster. | StreamsGroupDescribe |
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) | List the streams group offsets available in the cluster for the specified Streams groups. | OffsetFetch |
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options) | List the streams group offsets available in the cluster for the specified Streams groups. | OffsetFetch |
These correspond to existing APIs for consumer groups and some of the code will be shared. The main reason for duplicating them is the naming, as the current API uses "consumerGroup" in the name, despite the RPCs just using "group". There are some differences:
- The
describeStreamsGroups
uses the DescribeStreamsGroup RPC and contains other information than consumer groups. - A streams group has an extra state - NOT_READY, and no legacy states from the classic protocol.
removeMembersFromConsumerGroup
will not have a corresponding API in this first version, as it is using the LeaveGroup RPC for classic consumer groups, which is not available for KIP-848-style groups.
Admin
AlterStreamsGroupOffsetResult
AlterStreamsGroupOffsetsOptions
DeleteStreamsGroupOffsetsResult
DeleteStreamsGroupOffsetsOptions
DeleteStreamsGroupsResult
DeleteStreamsGroupsOptions
DescribeStreamsGroupsResult
StreamsGroupDescription
StreamsGroupMemberDescription
StreamsGroupMemberAssignment
StreamsGroupSubtopologyDescription
DescribeStreamsGroupsOptions
ListStreamsGroupOffsetsResult
ListStreamsGroupOffsetsOptions
ListStreamsGroupOffsetsSpec
GroupType
Another case is added to the org.apache.kafka.common.GroupType
enum:
Enum constant | Description |
---|---|
STREAMS("Streams") | Streams group |
GroupState
Another case is added to the org.apache.kafka.common.GroupState
enum:
Enum constant |
---|
NOT_READY |
Exceptions
The following new exceptions are added to the org.apache.kafka.common.errors
package, corresponding to the new error codes in the Kafka protocol.
StreamsInvalidTopologyException
- The supplied topology is invalid. Returned if the client sends a topology that does not fulfill the expected invariants.StreamsInvalidTopologyEpoch
Exception - The client provided a invalid topology epoch with respect to the stream group state, for example, the topology was changed by the epoch not bumped.StreamsTopologyFenced
Exception - When a client attempts to join with an outdated topology epoch.
Command-line tools
kafka-groups.sh
We add an option --streams
to the above commad-line tool, which filter the groups to show share groups. Furthermore streams
becomes one of the possible values for --group-type
.
kafka-streams-groups.sh
A new tool called kafka-streams-groups.sh
is added for working with streams
groups. It has the following options:
Option | Description |
---|---|
--version | Display Kafka version. |
--verbose | Use with Use with Use with |
--all-input-topics | Use with --reset-offsets or --delete-offsets. If specified, includes all input topics of the streams group, as stored by the topology metadata on the broker. |
--input-topics <String: topics> | Use with --reset-offsets or --delete-offsets. Comma-separated list of user input topics. For these topics, the tool by default will reset the offset to the earliest available offset, or delete the offsets. Reset to other offset position by appending other reset offset option, ex: --input-topics foo --shift-by 5. |
--internal-topics <String: topics> | Use with --delete. Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by the default behaviour (do a dry-run without this option to view these topics). |
--to-offset <Long: offset> | Reset input topic offsets to a specific offset |
--to-latest | Reset input topic offsets to latest offset. |
--to-earliest | Reset input topic offsets to earliest offset. |
--by-duration <String: duration> | Reset input topic offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS' |
--to-datetime <String: datetime> | Reset input topic offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'. |
--from-file | Reset input topic offsets to values defined in CSV file. |
--shift-by <Long: n> | Reset input topic offsets shifting current offset by 'n', where 'n' can be positive or negative. |
--timeout <Long: timeout (ms)> | The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000) |
--state [String] | When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Not Ready', 'Reconciling', 'Assigning', 'Stable' and 'Dead'. |
--reset-offsets | Reset input topic offsets of streams group. Supports one streams group at a time, and instances should be inactive. You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, --to-latest, --shift-by, --from-file, --to-current, --to-offset. To define the scope use --all-input-topics or --input-topics. One scope must be specified unless you use '--from-file'. Fails if neither '--dry-run' nor '–execute' is specified. |
--offsets | Describe the group and list all input topic partitions in the group along with their offset lag. This is the default sub-action of --describe and may be used with the '--describe' option only. |
--members | Describe members of the group. This option may be used with the '--describe' option only. |
--list | List all streams groups. |
--help | Print usage information. |
--group <String: streams group ID> | The group ID (application ID) of the streams ID we wish to act on. |
--execute | Execute operation. Supported operations: reset-offsets. |
--dry-run | Only show results without executing changes on streams groups. Supported operations: reset-offsets. |
--describe | Describe streams group and list offset lag (number of records not yet processed) related to given group. |
--delete-offsets | Delete offsets of streams group. Supports one streams group at the time. To define the scope use --all-input-topics or --input-topics. One scope must be specified unless you use '--from-file'. |
--delete | Pass in a group to delete entire streams group. For instance --group g1. Deletes offsets, internal topics of the streams group and and topology and ownership information on the broker. Instances should be inactive. |
--command-config <String: command config property file> | Property file containing configs to be passed to Admin Client. |
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
Here are some examples.
To display a list of all streams groups:
$ kafka-
streams
-groups.sh --bootstrap-server localhost:9092 --list
To delete the inactive streams group S1
, which deletes all offsets, internal topics, assignment and topology metadata on the broker:
$ kafka-
streams
-groups.sh --bootstrap-server localhost:9092 --group S1 --delete
To delete the offsets from input topics of inactive streams group S1
, which essentially resets the consumption of this topic in the streams group:
$ kafka-
streams
-groups.sh --bootstrap-server localhost:9092 --group S1 --delete-offsets --all-input-topics
To set the starting offset for the input topics of inactive streams group S1
to a specific date and time:
$ kafka-
streams
-groups.sh --bootstrap-server localhost:9092 --group S1 --reset-offsets --all-input-topics --to-datetime 1999-12-31T23:57:00.000 --execute
kafka-streams-application-reset
The streams application reset tool will only work with the classic protocol, and will be deprecated when the classic protocol is deprecated.
Compatibility, Deprecation and Migration Plan
The new broker-side group management will require more information than the current UserData
to compute the assignment, so existing Streams versions cannot join a Streams group.
To enable rolling upgrades from the old protocol to the new protocol, we will have to extend the current UserData
with the missing (topology-derived) data. We can then introduce a compatibility layer in the new group coordinator that translates RPCs from the legacy rebalance protocol into requests in the streams rebalance protocol by deserializing the UserData
payload. The migration path for the user is then to update to a new version of Streams, before switching from the legacy consumer-based protocol to the streams rebalance protocol in a second step. The first time a consumer sends a StreamsGroupHeartbeat
with the group ID of an existing legacy consumer group, the group will be converted into a Streams group, similar to how the migration is handled for legacy consumer groups in KIP-848.
Named topologies will not be supported in the new protocol. A Streams application using named topologies will throw an error on start-up when configured with the new protocol. While noteworthy, this is not strictly a compatibility concern, since the configuration to enable named topologies is not part of the public API.
Test Plan
Our primary method of validation will be the execution of the existing test infrastructure for Kafka streams on the new protocol, including porting all integration tests and system tests. Furthermore, we will add new integration tests for offline & online migration from the classic
protocol to the streams
protocol.
Rejected Alternatives
KIP-848 originally envisioned that Kafka Streams continues to using a byte-array metadata protocol on top of regular consumer group RPCs, and use a client-side assignor, and provided a rough proposal how to do so. The described approach was never implemented. In this KIP, we propose to use broker-side assignment by default for several reasons (mostly already mentioned in the motivation section):
Client-side assignment makes it hard to debug and tune the assignment logic, where a simple parameter change requires redeploying of all streams clients.
Using opaque byte-arrays to exchange the required metadata makes it hard to understand the state of a streams group on the protocol level. One has to discover the information from logs spread across all client machines. Central mechanisms of the Kafka protocol, such as versioning of RPCs, needs to be reimplemented on the level of RPCs.
One could argue that the Kafka protocol is simpler without Kafka Streams specific RPCs, but we think the opposite is true. The KIP-848 consumer group protocol was extended with client-side assignment, byte-array metadata and metadata versioning just for Kafka Streams. Having the concerns of Kafka Streams dealt with in separate RPCs mainly introduces a clearer separation and gives the Kafka community the ability to simplify the upcoming consumer protocol.
Instead of using the heartbeat RPC during joining to initialize the topology, we considered using a dedicated group intialization RPC. Using the heartbeat RPC for initialization turned out to be simpler and we realized that the drawbacks that we wanted to avoid with the dedicated initialization RPC where not that significant. For example, we thought that we could save on data sent for intitialization with a dedicated intialization call. However, sending the topology metadata in the heartbeat when joining does not seem to be too much and we can ensure all clients use the same topology metadata. Also with two different calls we had the overhead to coordinate the initialization and the heartbeat requests.
An earlier draft of this KIP proposed introducing optional client-side task assignment, that would allow customization of the task assignment similar to KIP-924. We decided to remove it from this KIP, to limit the scope of the KIP and not introduce features in the protocol whose use we cannot fully envision yet. The RPCs defined in this KIP are, however, defined in a way that they could easily be extended with client-side assignment in a follow-up KIP, possibly together with a public interface for adding custom task assignors on the client side.
Future Work
The classic streams rebalance protocol will remain fully functional with the introduction of the streams protocol, however the aim is to eventually deprecate. There are some use cases no yet covered by this KIP, that would have to be added to the protocol in follow-up KIPs, if there is enough interest.
- Customizable client-side task assignment in the style of KIP-924
- Optimization of the assignment in cases were not every rack has a replica of the partition, as in KIP-925: Rack aware task assignment in Kafka Streams
- Injection of a customized implementation of a streams rebalance protocol using the
KafkaClientSupplier
.
There are also several improvements of long-standing Kafka Streams issues that can be built on top of this protocol, in particular:
Topology updates
- Validate topology compatibility broker-side
Automatically drain repartition topics on a topology update
- In addition to the existing shutdown-flag, a global pause/resume functionality could be built on top of the protocol
There is an opportunity to improve EOSv2 in the new protocol. One may add the current transaction.id to the heartbeat request and a list of closing transactions.id that the client should fence in the heartbeat response to improve EOSv2 and avoid the need to rely on a low
transaction.timeout.ms
config.