Status

Current stateAccepted

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note that this is joint work by Lucas Brutschy and Bruno Cadonna .

Motivation

Kafka Streams clients use a protocol to distribute stream processing tasks among Streams clients. Since the inception of Streams, this has worked by piggybacking custom byte payloads on the pre-KIP-848 consumer rebalance protocol. A custom partition assignor on one of the Streams clients would perform task assignment and various other centralized tasks, such as internal topic initialization. While this approach has served its purpose well and the streams partition assignor has achieved impressive flexibility and complexity over the years, it has also had a number of deficiencies:

  • A large fraction of the production incidents in Kafka Streams are caused by bugs in the rebalancing logic or misconfiguration thereof. With the rebalance logic being run on the processing node, the implementer of the business logic is also typically responsible for configuring, upgrading and debugging the rebalance logic. 

    • When rebalance problems occur, the timeline of events and information that caused the rebalance and influenced the assignment is spread across all clients, making it difficult to debug.

    • Since streams versions on the client are notoriously slow to be picked up by teams building the processing logic, bugs in the rebalance logic are prevalent for years after being fixed on trunk.

    • Since assignment logic is controlled by the streams application, tuning the parameters of the assignment logic requires redeployment of the application, which in itself triggers rebalances (and reassignments), creating a vicious cycle.

  • With the rebalance logic being based on the metadata on one of the Streams clients, metadata inconsistencies among clients have caused a variety of issues.

  • Particularly, the use of raw byte arrays in the protocol means that brokers cannot understand or validate the payload that is propagated among Streams clients.

  • In the implementation, piggybacking the task assignment logic on the consumer partition assignment and injecting the required Streams metadata and functionality into the general-purpose KafkaConsumer interface has required a number of workarounds, increasing code complexity and hurting the maintainability of the code. This has affected both the streams code, which had to work around the restrictions of the consumer interface, and the consumer, whose interface had to be padded to account for Kafka Streams requirements.

  • Because the protocol is not Kafka Streams native, advanced features like “smooth scale out” or “application shutdown on error” and others require workarounds like probing rebalances, which are disruptive for the running application as well as often hard to reason about by users.

KIP-848 introduced the next generation of the consumer rebalance protocol, which also requires a redefinition of the streams rebalance protocol. We propose to use this change in the protocol to solve or alleviate the above problems by introducing a dedicated protocol for Kafka Streams, which uses dedicated RPCs for task rebalancing and where centralized logic such as topic initialization and task assignment are performed centrally by the brokers’ group coordinator.

Design Goals

We propose to introduce new RPCs designed to make Streams initialization and task assignment first-class citizens of the Kafka protocol. The proposed RPCs are built on top of the following design goals:

  • We want to inherit all the advantages of the new consumer rebalance protocol introduced in KIP-848, such as greatly simplified clients, task assignment on the broker by default, truly incremental and cooperative rebalancing without global synchronization, and the ability of each instance to initiate a metadata update to the rebalance logic.

  • Move central initialization tasks, such as the creation of internal topics, to the group coordinator.

  • Make all data required for initialization and assignment explicit in the protocol and deserializable by the broker, with corresponding compatibility guarantees.

  • Establish a single source of truth for stream application metadata such as the internal topic configuration, the topology and the assignment configuration to detect inconsistencies early by persisting them on the broker.

  • Make the assignment logic parameters centrally tunable, without the need to redeploy the clients.

  • Make active tasks, standby tasks and warm-up tasks first-class citizens in assignment and reconciliation

Proposed Changes

Overview

We introduce the concept of a Streams group in parallel to a consumer group. Streams clients use a dedicated heartbeat RPC, StreamsGroupHeartbeat to join a group, leave a group, and update the group coordinator about its currently owned tasks and its client-specific metadata. The group coordinator manages a Streams group similarly to a consumer group, continuously updating the group member metadata via heartbeat responses and running assignment logic when changes are detected. 

To assign tasks among Streams clients, the group coordinator uses topology metadata that is initialized members joins the group and persisted in the consumer offsets topic. To trigger topology initialization and/or detect topology inconsistencies, each heartbeat contains a unique ID of the topology (which can be derived automatically from the topology via hashing). 

Streams groups

We will introduce a new group type called streams to the group coordinator. For each Streams group, we define new record key and value types for the group metadata, topology metadata and group member metadata. The records are persisted in the __consumer_offsets topic. A group can either be a streams group, a share group or a consumer group, defined by the first heartbeat request using the corresponding GroupId

Joining, leaving and staying in a Streams group

During the complete lifetime of a Streams client, its members continuously send StreamsGroupHeartbeatRequests to the group coordinator. The requests contain the following information:

  • All non-assignment metadata as defined in the ConsumerGroupHeartbeatRequest such as the group ID, member ID, member epoch, instance ID, rack ID and rebalance timeout.

  • The currently assigned active, standby and warm-up tasks. Each task is identified by a subtopology ID (which can be an arbitrary string in protocol for future extensibility, even though subtopologies are currently numbered) and a partition number.

  • The process ID to identify multiple members running in the same process (i.e. the same Streams client).

  • The user-defined endpoint to be used for running interactive queries.

  • Topology metadata needed for creating internal topics and computing the assignment.
  • The topology epoch, which is defined by the client, and will be 0 by default. The epoch can be bumped by the client to deploy a new version of the topology metadata.

  • For each task, the current sum of task changelog offsets and task changelog end offsets. These are used to determine whether a warm-up task is below acceptable.recovery.lag and can be used to optimize the assignment in the assignor. For more details on these offsets see "Cumulative task changelog offsets and task changelog end offsets" section below

  • A flag to request shutdown of the whole application (only if the member epoch is the group leave epoch).

Group ID, member ID, member epoch, and instance ID (if defined by the client) are sent with each heartbeat request. Any other information that has not changed since the last heartbeat can be omitted, unless an error occurred.

Heartbeat handling & response

If the heartbeat is valid and any data in the heartbeat has changed, the group coordinator will update its internal representation of the member correspondingly and persist the updated member metadata in a record on the __consumer_offsets topic. It then potentially updates the group assignment and reconciles member assignments, which is described in the section "Rebalance Process".

The group coordinator responds with a StreamsGroupHeartbeatResponse, which contains 

  • All non-assignment metadata as defined in the ConsumerGroupHeartbeatResponse such as the member ID (when joining as a new member), new member epoch and heartbeat interval ms.

  • The current target assignment for this member, as sets of active, standby and warm-up task IDs.

  • For each client in the group, the user-defined endpoint and the assigned tasks for IQ

Again, fields that don’t change will be omitted from the heartbeat response.

Initializing and updating topology metadata on the broker

Whenever a member joins the Streams group, the first heartbeat request contains metadata of the topology. The metadata describes the topology as a set of subtopologies, each identified by a unique string identifier and containing the following metadata that is relevant for creation of internal topics and assignment:

  • The set of source topics. Here, the source topics can be defined through a Google RE2/J regular expression.

  • The set of repartition sink topics. We do not include non-repartition sink topics or output topics that the topology produces in using dynamic routing.

  • The set of changelog topics, replication factors and other topic-level configurations.

  • The set of repartitions topics, the number of partitions for each of them (optional), replication factors and other topic-level configurations.

  • The set of copartition groups, that is, the set of source topics (user source topics, or internal repartition source topics) that have to have the same number of partitions, e.g. because they are being used inside a join.

Note that the metadata of topology does not specify the number of input partitions a source topology has, and similarly also does not specify the number of partitions for changelog topics, and may omit the number of partitions for some repartition topics (if it can be inferred). By leaving the number of partitions parametric, the broker can allow changing the number of partitions for certain topologies, without reinitializing the group. In particular, stateless topologies without repartitioning steps, can support any number of input partitions without reinitializing the broker-side topology.

The metadata sent to the broker via the heartbeat is persisted as a new record type in the __consumer_offsets topic and is part of the group coordinator state in subsequent requests. If one or more source topics or internal topics do not exist, the group will enter state NOT_READY  and will attempt to create any missing internal topics (see more details below). Only after the topology metadata is initialized and all source topics and internal topics exist, the group coordinator starts assigning tasks to the members of the group.

Topology updates

To avoid unintentional topology updates or rollbacks, updating the broker-side topology after it has been initialized for a group requires bumping the client-side configuration topology.epoch, which defaults to 0. By default, whenever a Streams member joins the group with a topology that is different from the current topology of the streams group, and the topology epoch is not bumped, the group coordinator will respond with an error, indicating that the member is incorrectly updating the broker-side topology. 

Any member can update the topology by sending a topology with a bumped topology epoch when joining.  The group coordinator then updates the broker-side topology with the topology metadata sent by the member. If the topology epoch of an existing member does not correspond to the current topology epoch of the group anymore (so the topology of the member is stale w.r.t. the group's topology), this will be indicated in the heartbeat response (status STALE_TOPOLOGY)sent to that member (for informational purposes, e.g., logging a warning). 

The topology epoch of the group and the topology epoch of each member are provided to the assignor, which will make sure that members with a topology epoch that does not correspond to the group's topology epoch will not get any new tasks assigned, avoiding problems caused by topology incompatibilities.

Example

For example of a rolling bounce to upgrade the topology, let’s assume a sticky assignor with stateless tasks and no standby tasks and one thread per process for illustration.

Assume we have 3 clients A, B, and C with a uniform assignment (#0 is the topology epoch):

  • A(#0): T1, T2, T3

  • B(#0): T4, T5, T6

  • C(#0): T7, T8, T9

We shut down C to update its topology. Once we shut down C, the assignment is the following:

  • A(#0): T1, T2, T3, T7

  • B(#0): T4, T5, T6, T8, T9

Then, C rejoins the group with topology epoch #1. This topology epoch bumps the group's topology epoch, and C also includes the information to initialize the new topology in its first heartbeat. From now on, A and B will not get new tasks assigned, but C will get new tasks assigned because now #1 is the group's topology epoch. The rest of the assignment is unchanged - just that A and B cannot get any new tasks, but they can retain some of the tasks they have. So since the sticky assignor balances out the assignment, we should end up with something like this:

  • A(#0): T2, T3, T7

  • B(#0): T5, T6, T8

  • C(#1): T1, T4, T9

B is shut down for the topology update. Since, again, A cannot get new tasks but C can, the new assignment is:

  • A(#0): T2, T3, T7

  • C(#1): T1, T4, T9, T5, T6, T8

Client B comes back with the new topology epoch, and gets new tasks assigned:

  • A(#0): T2, T3, T7

  • B(#1): T1, T4, T9

  • C(#1): T5, T6, T8

And so on.

The broker will reject members that attempt to join with a stale topology epoch, or with the current topology epoch but different topology metadata.

Handling topic topology mismatches

It can happen that the group is initialized to a topology, but source / sink or internal topics required by the topology do not exist or differ in their configuration from what is required for the topology to successfully execute. This is typically detected during the handling of the streams group heartbeat in the group coordinator, where we detect changes in either the topology or the topic metadata on the broker, triggering "topology configuration" process, in which the group coordinator performs the following steps:

  • Check that all source topics exists, resolve source topic regular expressions and check that each of them resolve to at least one topic.
  • Check that "copartition groups" are satisfied, that is, all source topics that are supposed to be copartitioned are indeed copartitioned.
  • Derive the required number of  partitions for all internal topics from the source topic configuration.
  • Check that all internal topics exist with the right configuration.

If any source topics or internal topics are missing, the group enters a state NOT_READY. In NOT_READY, all heartbeats will be handled as usual (so they typically should not fail), but in the heartbeat response, the status will indicate which kind of problem exists - all members will get an empty assignment when the group is in NOT_READY  state. Below, we describe the behavior of the protocol when any mismatch is detected during topic/topology configuration. The behaviors are ordered by precedence, for example, if source topics and internal topics are missing, then the groups takes on the behavior for missing source topics, not the behavior for missing internal topics.

  1. Source topics missing
    Condition: A source topic is missing or a source topic regex resolves to zero topics.
    Behavior: The group will enter/remain in state NOT_READY.  Heartbeat responses will indicate status MISSING_SOURCE_TOPICS .  In the status detail, we specify all missing source topics and all regular expressions matching zero topics.
  2. Topics incorrectly partitioned
    Condition: Some topics are incorrectly partitioned, if two topics are supposed to be copartitioned according to the topology, but in the current topic metadata on the broker, the number of partitions for the two topics is different, or the number of partitions in a changelog topic does not correspond to the maximal number of source topic partition for that subtopology.
    Behavior: The group will enter/remain in state NOT_READY.  Heartbeat responses will indicate status INCORRECTLY_PARTITIONED_TOPICS . In the status detail, we specify at least one incorrectly partitioned topic.
  3. Internal topics are missing
    Condition: One or more internal topics are missing.
    Behavior: If discovered during a heartbeat, the group coordinator will attempt to create the internal topics by sending a corresponding topic create request will be sent to the Kraft coordinator. There can only be one such request in-flight at a time, an appropriate back-off mechanism will be used to prevent too many attempts to create the topics. If the appropriate ACL for topic creation are not assigned to the principle executing the heartbeat, no such attempt will be made. The group will enter/remain in state NOT_READY .  Heartbeat responses will indicate status MISSING_INTERNAL_TOPICS. In the status detail, we specify whether an attempt to create the topics was made, whether and why a previous attempt failed, whether sufficient ACLs to create the topics are available.
  4. Topic configuration mismatches
    If an internal topic exists, but does not have the same configuration as defined in the topology (all parameters of the topic beside number of partitions, that is, replication factor, retention time, etc.), this will be logged on the broker, but otherwise be ignored.

Describing and listing streams groups

Streams groups will be returned in ListGroupsResponse with GroupType equal to the string streams. The group ID of a streams group can be used in OffsetFetchRequest and OffsetCommitRequest as usual. Sending a ConsumerGroupHeartbeatRequest to a streams group will return an GROUP_ID_NOT_FOUND error. Sending a StreamsGroupHeartbeatRequest to a consumer group will similarly return an GROUP_ID_NOT_FOUND error.

There will be a new RPC DescribeStreamsGroup that returns, given the group ID, all metadata related to the streams group, such as

  • The topology metadata of the group. This topology metadata is the result of the above "topology configuration" process, so it contains the topology metadata as initialized by one of the streams group members, but with a concrete number of partitions for each topic, and with source topic regular expressions resolved to a specific set of topics.

  • The topology epoch of the group.
  • The latest member metadata that each member provided through the StreamsGroupHeartbeat API.

  • The current target assignment generated by the assignor.

We also include extensions of the Admin API and the command-line tools to describe and modify streams groups.

Rebalance Process

Similar as in KIP-848 consumer groups, the rebalance process is entirely driven by the group coordinator, and based on the same three epochs: group epoch, assignment epoch, and member epoch. The main difference is the assigned resource - instead of assigning partitions to consumers, we assign tasks consisting of a subtopology ID and a partition number, and each task can be assigned to a client in three roles: as an active task, as a standby task or as a warm-up task.

We only explain the main differences in the rebalance process in regular consumer groups here.

  • The partition metadata for the group, which is tracked by the group coordinator, is the partition metadata of all input topics (i.e., user source topics and internal repartition topics we read from).

  • The group epoch is bumped:

    • When a member joins or leaves the group.
    • When a member is fenced or removed from the group by the group coordinator.
    • When the partition metadata is updated. For instance when a new partition is added or a new topic matching the subscribed topics is created.
    • When a member with an assigned warm-up task reports a task changelog offset and task changelog end offset whose difference is less than acceptable.recovery.lag.
    • When a member updates its topology metadata, rack ID, client tags or process ID. Note: Typically, these do not change within the lifetime of a Streams client, so this only happens when a member with static membership rejoins with an updated configuration.
    • When an assignment configuration for the group is updated.

Assignment

Every time the group epoch is bumped, a new task assignment is computed by calling the task assignor, which consists of assigning tasks as active, standby or warm-up tasks to members of the group. The task assignor is configured on the broker side both through a global static configuration that defined the default assignor for all groups, and a dynamic group-level configuration that sets the assignor for a specific group.

The AK implementation will provide the following assignors

  • highly_available - Similar to the current HighAvailabilityTaskAssignor

  • sticky - similar to the current StickyTaskAssignor

Which assignors are provided by the group coordinator is not part of the protocol and is specific to each implementation. In this KIP, we do not introduce a pluggable interface for a task assignors, but we leave it open to do so in the future.

If no task assignor is configured globally or on the group-level, the broker picks the assignor. The broker is free to make a dynamic decision here - the AK implementation will choose highly_available for stateful topologies and sticky for stateless topologies.

Each task assignor must make sure to fulfill the following invariants:

  • Each warm-up task that has reached acceptable.recovery.lag must be turned into an active task or a standby task, or be unassigned from the member.

  • A stateful task (in the various roles) cannot be assigned to two clients with the same processId.

Cumulative task changelog offsets and task changelog end offsets

Whenever a stateful task is added or removed to/from a Streams client, or when warm-up task reaches acceptable.recovery.lag and in regular intervals, defined by a broker-side configuration task.offset.interval.ms, each client reports two sets of offsets, the sums of task changelog offsets and the sums of task changelog end offsets. These sets can be used by the assignors, in particular the highly_available assignor, to determine which tasks are caught-up to acceptable.recovery.lag and optimize the assignment when multiple clients with a partial copy of the state exist.

Cumulative task changelog offsets

The cumulative changelog offset of a task is the sum of all positions in the changelog topics of the local state stores belonging to the task. A Streams member reports the cumulative changelog offsets for all tasks with local state. That is:

  • For each processing (active) task, the sum of the offsets last fully replicated to the broker in the changelog topic of each local state. Under at-least-once guarantees, this is going to correspond to the high watermark, under exactly-once semantics, it’s going to be the last stable offset.

  • For each restoring (active, standby or warm-up), task the sum of the positions in the changelog topic and checkpointed to the local state directory.

  • For each dormant task (task which is not owned, but which has state locally on disk), the sum of the checkpointed positions in the changelog topic present in its state directory. The client only reports the offsets for dormant tasks that we manage to acquire the lock to the state directory for.

Members that run in the same process (and use the same state directory) may report offsets for overlapping sets of dormant tasks. These offsets can conflict (since they are recorded at different points in time), but these conflicts can easily be resolved by taking the most recently received offsets. The current assignors will be updated to take this into account.

Cumulative task changelog end offsets

Similarly, the member reports the sum of the end offsets in all changelog topics for the currently owned tasks, if available. This simplifies the broker-side task assignment, since the broker doesn’t need to fetch the current end-offsets. Specifically:

  • For active processing tasks, the cumulative task changelog end offset is the same as the cumulative task changelog offset

  • For all restoring tasks (active, standby or warm-up), the cumulative end-offset is the sum of the last end offsets cached by the restore consumer. If an end offset of a topic partition is unknown, no end-offset is reported.

  • For dormant tasks, no end-offset is reported.

Using cumulative task changelog offsets and cumulative task changelog end offsets

By reporting cumulative task changelog offsets and cumulative task changelog end offsets instead of the cumulative task lag, we can determine most task lags in the group coordinator, without the group coordinator or the streams client having to do additional requests. Even if the end offsets of the topic partition is not known to that member, as long as it is known to another member in the group. However, it may happen that no end offset is known in the group coordinator - for example, for tasks that are dormant on at least one client, and no member currently owns that task as active, standby or warm-up tasks. In these cases, the task with unknown end-offsets are never considered to be caught-up, however, the cumulative task changelog offset can still be used when deciding on where to place a new active, standby or warm-up task - typically, by selecting the client with the maximal cumulative task changelog offset that fulfils all other requirements.

Reconciliation of the group

Once a new target assignment is installed, each member will independently reconcile their current assignment with their new target assignment. Ultimately, each member will converge to their target epoch and assignment. The reconciliation process handles active tasks and standby/warm-up tasks differently:

  • Active tasks are reconciled like topic partitions in the KIP-848 consumer group protocol, that is, for reconciling the target assignment, the reconciliation follows three phases:

    • The group coordinator first asks the member to revoke any active tasks that are not assigned to that member in the target assignment any more, by removing those active tasks from the set of tasks sent to the member in the heartbeat response

    • The member must first confirm revocation of these active tasks by removing them from the set of active tasks sent in the heartbeat request

    • Now, active tasks will be incrementally assigned to the member. An active task is assigned as soon as no other client owns it anymore, that is, once the previous owner (if any) has confirmed the revocation of the corresponding active task.

  • Standby and warm-up tasks are reconciled in parallel with active tasks, but following slightly different logic:

    • As for active tasks, standby and warm-up tasks removed from the members target assignment are removed from the assignment sent in the heartbeat response immediately, that is, with the next heartbeat. The member confirms the revocation of the tasks as soon as the state directory (or any other resource related to the task) is closed.

    • Newly assigned standby and warm-up tasks await that any member with the same processId owning that task (as active or standby) confirms revocation of the task by removing it from the corresponding set in a heartbeat response. That is, a task that is not an active task or a standby task on any other member with the same processId can be assigned immediately. Otherwise, it is assigned as soon as the blocking task is revoked. Target assignments that assign a task to two members with the same processId are invalid.

Streams Group States

The possible states of the streams group are EMPTY, ASSIGNING, RECONCILING , STABLE, NOT_READY, DEAD as for consumer groups.

Global & Dynamic Group Configuration

We make core assignment options configurable centrally on the broker, without relying on each clients configuration. This allows tuning a streams group without redeploying the streams application. The three core assignment options will be introduced on the broker-side: acceptable.recovery.lag, num.warmup.replicas and num.standby.replicas. They can be configured both globally on the broker, and dynamically for specific streams groups through the IncrementalAlterConfigs and DescribeConfigs RPCs.

Online Migration from a Classic Consumer Group to a Streams Group

As in KIP-848, upgrading from a classic consumer group currently used in Kafka Streams to the new streams group is possible by rolling the Streams clients, assuming the Streams protocol is enabled on the brokers. When the first consumer of a Streams client using the new Streams rebalance protocol joins the group, the group is converted from a classic group to a streams group. When the last consumer of an Streams client using the new Streams rebalance protocol leaves the group, the group is converted back to a classic group. Note that the group epoch starts at the current group generation. During the migration, all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgraded consumers are translated to the new Streams protocol.

Let's recapitulate how the classic rebalance protocol works in Streams. First, the consumers in a Streams client join or re-join the group with the JoinGroup API. The JoinGroup request contains the subscribed topics, the owned partitions, the generation ID, the Streams-specific subscription info, and some other fields. The Streams-specific subscription info contains the process ID of the Streams client, the owned active and standby tasks, the task offset sums, the user endpoints for Interactive Query, and some more fields. When all the consumers of the Streams clients of the Streams application have joined, the group coordinator picks a leader for the group out of the consumers and sends back the JoinGroup response to all the members of the group (i.e., consumers that joined). The JoinGroup response contains the member ID, the generation ID, and the member ID of the leader (to make the leader aware of leadership) as well as all the consumer and Streams-specific metadata about subscribed topics, owned partitions, tasks, etc. The leader uses the data in the JoinGroup response for computing the assignment. Second, all the members in the Streams clients collect their assignment—computed by the leader—by using the SyncGroup API. The leader sends the computed assignment with the SyncGroup request to the group coordinator, and the group coordinator distributes the assignment to the members through the SyncGroup response. In parallel, the members heartbeat with the Heartbeat API in order to maintain their session. The Heartbeat API is also used by the group coordinator to inform the members about an ongoing rebalance. All those interactions are synchronized on the generation of the group. It is important to note that the consumer does not make any assumption about the generation ID. It basically uses what it receives from the group coordinator. The classic rebalance protocol used in Streams supports two modes: Eager and Cooperative. In the eager mode, the consumer revokes all its partitions before rejoining the group during a rebalance. In the cooperative mode, the consumer does not revoke any partitions before rejoining the group. However, it revokes the partitions that it does not own anymore when it receives its new assignment and rejoins immediately if it had to revoke any partitions.

The Streams rebalance protocol relies on the StreamsGroupHeartbeat API to do all the above. Concretely, the API updates the group state, provides the active, standby, and warm-up tasks owned by the Streams client, gets back the assignment, and updates the session. We can remap those to the classic protocol as follows: The JoinGroup API updates the group state and provides the active and standby tasks  owned (warm-ups are standby tasks in the classic protocol), the SyncGroup API gets back the task assignment, and the Heartbeat API updates the session. The main difference here is that the JoinGroup and SyncGroup do not run continuously. The group coordinator has to trigger it when it is needed by returning the REBALANCE_IN_PROGRESS error in the heartbeat response.

The implementation of the new Streams protocol in the group coordinator will handle the JoinGroup and SyncGroup APIs of the classic protocol. The group coordinator will ensure that a rebalance is triggered when the assignment of a Streams client on the classic protocol needs to be updated by returning a REBALANCE_IN_PROGRESS error in the heartbeat response.

When the first consumer of a Streams client that uses the new Streams rebalance protocol joins a classic group, the classic group in the group coordinator will be transformed to a streams group. The generation ID becomes the group epoch. The consumer of the Streams client initializes the group with the topology metadata. A rebalance is triggered by the group coordinator for all consumers still on the classic protocol by sending the REBALANCE_IN_PROGRESS error in the heartbeat. The consumers on the classic protocol send their owned active and standby tasks in the JoinGroup request to the group coordinator. The consumers on the Streams protocol send their owned active, standby, and warm-up tasks (i.e., should be empty since they just joined) through the StreamsGroupHeartbeat request to the group coordinator. In contrast to the classic protocol, the group coordinator does not pick a leader for computing the assignment but computes the assignment itself. That is the target assignment. The group epoch is increased. From the target assignment, the current member assignment is computed depending on the owned tasks. If members do not need to revoke any tasks, their member epoch is increased to the group epoch. If the members need to revoke a task, their member epoch stays the same. The group coordinator sends the new member epoch alongside the current member assignment through the StreamsGroupHeartbeat response to the members on the Streams protocol. The members still on the classic protocol receive the member epoch through the JoinGroup response, but they still need to wait for the SyncGroup response for their current assignment. Basically, the group coordinator translates the JoinGroup and SyncGroup API to the Streams protocol internally and communicates to members still on the classic protocol via JoinGroup, SyncGroup as well as Heartbeat and with the members on the Streams protocol via the StreamsGroupHeartbeat.

The Streams protocol also assigns warm-up tasks. However, the classic protocol does not have any notion of a warm-up task. If the group coordinator assigns a warm-up task to a Streams client on the classic protocol, that warm-up task is translated to a standby task in the assignment for the Streams client on the classic protocol. The group coordinator chooses one of the Streams clients on the classic protocol to trigger a probing rebalance.

A more detailed description of this process can be found in KIP-848 in Section Supporting Online Consumer Group Upgrade.  

Example

  • classic group (generation=23)
    • A
    • B
  • assignment
    • A - active=[0_0, 0_2, 0_4], standby=[0_1, 0_3, 0_5]
    • B - active=[0_1, 0_3, 0_5], standby=[0_0, 0_2, 0_4]

C joins using the Streams protocol. The classic group is transformed to a streams group.

  • streams group (group epoch=24)
    • A (classic)
    • B (classic)
    • C (streams)
  • target assignment (epoch=24)
    • A - active=[0_0, 0_2, 0_3], standby=[0_1, 0_5], warm-up=[]
    • B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
    • C - active=[], standby=[0_0, 0_4], warm-up=[0_2, 0_5]
  • member assignment
    • A
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_0, 0_2, 0_4], standby=[0_1, 0_3, 0_5]
      • JoinGroupResponse: generation ID=24
      • SyncGroupResponse: active=[0_0, 0_2, 0_3], standby=[0_1, 0_5]
    • B
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_1, 0_3, 0_5], standby=[0_0, 0_2, 0_4]
      • JoinGroupResponse: generation ID=24
      • SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
    • C
      • StreamsGroupHeartbeat: epoch=24, active=[], standby=[0_0, 0_4], warm-up=[0_2, 0_5]

    C's warm-up task 0_2 is caught up. A is requested to revoke active task 0_2, thus A does not increase its generation ID.

  • streams group (group epoch=25)
    • A (classic)
    • B (classic)
    • C (streams)
  • target assignment (epoch=25)
    • A - active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
    • B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
    • C - active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]
  • member assignment
    • A
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_0, 0_2, 0_3], standby=[0_1, 0_5], warm-up=[]
      • JoinGroupResponse: generation ID=24
      • SyncGroupResponse: active=[0_0, 0_3], standby=[0_1, 0_5]
    • B
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
      • JoinGroupResponse: generation ID=25
      • SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
    • C
      • StreamsGroupHeartbeat: epoch=25, active=[], standby=[0_0, 0_4], warm-up=[0_5]

A follow-up rebalance is triggered so that A can report the revoked active task 0_2. Since A does not need to revoke tasks anymore the generation ID is increased.

  • streams group (group epoch=25)
    • A (classic)
    • B (classic)
    • C (streams)
  • target assignment (epoch=25)
    • A - active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
    • B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
    • C - active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]
  • member assignment
    • A
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
      • JoinGroupResponse: generation ID=25
      • SyncGroupResponse: active=[0_0, 0_3], standby=[0_1, 0_5]
    • B
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
      • JoinGroupResponse: generation ID=25
      • SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
    • C
      • StreamsGroupHeartbeat: epoch=25, active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]

If C leaves the group now, the group coordinator transforms back the group to a classic group and only use JoinGroup, SyncGroup, and Heartbeat to communicate with the members. The records of the Streams protocol are deleted from the __consumer_offsets topic.

Public Interfaces

This section lists the changes impacting the public interfaces.

KRPC

New Errors

The conditions in which these errors are returned are stated further down.

  • STREAMS_INVALID_TOPOLOGY - The supplied topology is invalid. Returned if the client sends a topology that does not fulfill the expected invariants, see below in the sections "Request Validation".

  • STREAMS_INVALID_TOPOLOGY_EPOCH  - The client provided a invalid topology epoch with respect to the stream group state, for example, the topology was changed by the epoch not bumped.
  • STREAMS_TOPOLOGY_FENCED  - When a client attempts to join with an outdated topology epoch.

StreamsGroupHeartbeat

The StreamsGroupHeartbeat API is the new core API used by streams application to form a group. The API allows members to initialize a topology, advertise their state, and their owned tasks. The group coordinator uses it to assign/revoke tasks to/from members. This API is also used as a liveness check.

Request Schema

The member must set all the (top level) fields with the exception of RackId and InstanceId when it joins for the first time or when an error occurs (e.g. request timed out). Otherwise, it is expected to only fill in the fields which have changed since the last heartbeat.


{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "StreamsGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },     
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member ID generated by the streams consumer. The member ID must be kept during the entire lifetime of the streams consumer process." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." },
    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided or if it didn't change since the last heartbeat; the instance ID for static membership otherwise." },
    { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
      "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of the member otherwise." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its tasks otherwise." },

    { "name": "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The topology metadata of the streams application. Used to initialize the topology of the group and to check if the topology corresponds to the topology initialized for the group. Only sent when memberEpoch = 0, must be non-empty. Null otherwise.",
      "fields": [
        { "name": "Epoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the topology. Used to check if the topology corresponds to the topology initialized on the brokers." },
        { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+",
          "about": "The sub-topologies of the streams application.",
          "fields": [
            { "name": "SubtopologyId", "type": "string", "versions": "0+",
              "about": "String to uniquely identify the subtopology. Deterministically generated from the topology" },
            { "name": "SourceTopics", "type": "[]string", "versions": "0+",
              "about": "The topics the topology reads from." },
            { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
              "about": "The regular expressions identifying topics the subtopology reads from." },
            { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+",
              "about": "The set of state changelog topics associated with this subtopology. Created automatically." },
            { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+",
              "about": "The repartition topics the subtopology writes to." },
            { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+",
              "about": "The set of source topics that are internally created repartition topics. Created automatically." },
            { "name": "CopartitionGroups", "type": "[]CopartitionGroup", "versions": "0+",
              "about": "A subset of source topics that must be copartitioned.",
              "fields": [
                { "name": "SourceTopics", "type": "[]int16", "versions": "0+",
                  "about": "The topics the topology reads from. Index into the array on the subtopology level." },
                { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+",
                  "about": "Regular expressions identifying topics the subtopology reads from. Index into the array on the subtopology level." },
                { "name": "RepartitionSourceTopics", "type": "[]int16", "versions": "0+",
                  "about": "The set of source topics that are internally created repartition topics. Index into the array on the subtopology level." }
              ]}
          ]}
      ]
    },

    { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Currently owned active tasks for this client. Null if unchanged since last heartbeat." },
    { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Currently owned standby tasks for this client. Null if unchanged since last heartbeat." },
    { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Currently owned warm-up tasks for this client. Null if unchanged since last heartbeat." },

    { "name": "ProcessId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Identity of the streams instance that may have multiple consumers. Null if unchanged since last heartbeat." },
    { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "User-defined endpoint for Interactive Queries. Null if unchanged since last heartbeat or if not defined on the client." },
    { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Used for rack-aware assignment algorithm. Null if unchanged since last heartbeat." },

    { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Cumulative changelog offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." },
    { "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Cumulative changelog end-offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." },
    { "name": "ShutdownApplication", "type": "bool", "versions": "0+", "default": false,
      "about": "Whether all Streams clients in the group should shut down." }
  ],

  "commonStructs": [
    { "name": "KeyValue", "versions": "0+", "fields": [
      { "name": "Key", "type": "string", "versions": "0+",
        "about": "key of the config" },
      { "name": "Value", "type": "string", "versions": "0+",
        "about": "value of the config" }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." },
      { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
        "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." },
      { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
        "about": "Topic-level configurations as key-value pairs."
      }
    ]},
    { "name": "Endpoint", "versions": "0+", "fields": [
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "host of the endpoint" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "port of the endpoint" }
    ]},
    { "name": "TaskOffset", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partition", "type": "int32", "versions": "0+",
        "about": "The partition." },
      { "name": "Offset", "type": "int64", "versions": "0+",
        "about": "The offset." }
    ]},
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]}
  ]
}

Required ACL

  • READ on group

  • CREATE on cluster resource, or CREATE on all topics in StateChangelogTopics and RepartitionSourceTopics

    • Note that this ACLs are only required if the group coordinator should create the internal topics implicitly. If the internal topics are created explicitly, this ACL is not needed for the Streams group heartbeat.
  • DESCRIBE_CONFIGS on all topics included in the message

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • Either MemberId is non-empty or MemberEpoch is 0.

  • MemberEpoch must be >= -2.

  • InstanceId, if not null, must be non-empty.

  • RebalanceTimeoutMs must be larger than zero in the first heartbeat request.

  • ActiveTasks, StandbyTasks and WarmupTasks have to be disjoint sets

  • Each element of ActiveTasks, StandbyTasks and WarmupTasks has to be a valid task ID in the topology initialized for the group ID.

  • ActiveTasks, StandbyTasks and WarmupTasks have to be non-null and empty when joining (member epoch is 0)

  • If MemberEpoch  is 0, Topology should be non-null. Otherwise, it should be null.

STREAMS_INVALID_TOPOLOGY is returned when the request contains a new topology and should the topology not obey the following invariants:

  • A StateChangelogTopic topics must not have a defined partition number.

  • A RepartitionSourceTopic cannot be in SourceTopics or StateChangelogTopics of any subtopology.

  • A StateChangelogTopic cannot be in SourceTopics or RepartitionSinkTopic or RepartitionSourceTopics of any subtopology.

  • A RepartitionSourceTopic of one subtopology must be a RepartitionSinkTopic of at least one other subtopology.

  • All indices in CopartitionGroups must be valid indices in the corresponding topic arrays.

Request Handling

When the group coordinator handles a StreamsGroupHeartbeat request:

  1. Performs request validation.
  2. If the member joins the group (i.e. member epoch is 0):

    1. Look up or create the group.
      1. If the group is created the topology epoch of the group is set to the topology epoch sent by the member.
    2. GROUP_ID_NOT_FOUND is returned if the group ID is associated with a group type that is not streams or classic (the latter will be allowed for migration).
    3. Compare the topology epoch of the request ER to the topology epoch of the group EG
      1. If ER=EG, check if topology metadata in the request is equal to the topology metadata of the group, otherwise fail with STREAMS_INVALID_TOPOLOGY_EPOCH
      2. If ER<EG, fail with STREAMS_TOPOLOGY_FENCED 
      3. If ER>EG+1, fail with STREAMS_INVALID_TOPOLOGY_EPOCH
      4. If ER=EG+1, update the topology by writing the new topology record to the offset topic
    4. Creates the member.
  3. If the member is already part of the group (i.e. member epoch is greater than 0):
    1. Looks up the group.
    2. GROUP_ID_NOT_FOUND is returned if the group ID does not exist anymore.
    3. If the member does not exist, returns UNKNOWN_MEMBER_ID
    4. Checks whether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.
      • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that it knows about and its request will be rejected with a FENCED_MEMBER_EPOCH. This will be handled as in KIP-848.
    5. If the topology epoch in the request is less than the topology epoch of the group, set state STALE_TOPOLOGY  in the response.
  4. Updates information of the member if needed. The group epoch is incremented if there is any change.
  5. If the topology or topic metadata changed, detect any topology / topic mismatches as described earlier in this document.
  6. Reconcile the member assignments as explained earlier in this document.

Reponse Schema

The group coordinator will only set the ActiveTasks, StandbyTasks and WarmupTasks fields until the member acknowledges that it has converged to the desired assignment. This is done to ensure that the members converge to the target assignment.


{
  "apiKey": TBD,
  "type": "response",
  "name": "StreamsGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - FENCED_MEMBER_EPOCH (version 0+)
  // - UNRELEASED_INSTANCE_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+) 
  // - CLUSTER_AUTHORIZATION_FAILED (version 0+)
  // - STREAMS_INVALID_TOPOLOGY (version 0+)
  // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+)
  // - STREAMS_TOPOLOGY_FENCED (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },      
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id is always generated by the streams consumer."},     
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." },
    { "name": "AcceptableRecoveryLag", "type": "int32", "versions": "0+",
      "about": "The maximal lag a warm-up task can have to be considered caught-up." },
    { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
      "about": "The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed." },

    { "name": "Status", "type":  "[]Status", "versions":  "0+",  "nullableVersions": "0+", "default": "null",
      "about": "Indicate zero or more status for the group.  Null if unchanged since last heartbeat." },

    // The streams app knows which partitions to fetch from given this information
    { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Assigned active tasks for this client. Null if unchanged since last heartbeat." },
    { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Assigned standby tasks for this client. Null if unchanged since last heartbeat." },
    { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." },

    // IQ-related information
    { "name": "PartitionsByUserEndpoint", "type": "[]EndpointToPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." ,
      "fields": [
        { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
          "about": "User-defined endpoint to connect to the node" },
        { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
          "about": "All partitions available on the node" }
      ]
    }
  ],
  "commonStructs": [
    { "name": "Status", "versions": "0+", "fields": [
      // Possible status codes
      //  0 - STALE_TOPOLOGY                 - The topology epoch supplied is lower than the topology epoch for this streams group.
      //  1 - MISSING_SOURCE_TOPICS          - One or more source topics are missing or a source topic regex resolves to zero topics.
      //                                       Missing topics are indicated in the StatusDetail.
      //  2 - INCORRECTLY_PARTITIONED_TOPICS - One or more topics are incorrectly partitioned, that is, they are not copartitioned despite being 
      //                                       part of a copartition group, or the number of partitions in a changelog topic does not correspond
      //                                       to the maximal number of source topic partition for that subtopology.
      //                                       Incorrectly partitioned topics are indicated in the StatusDetail.
      //  3 - MISSING_INTERNAL_TOPICS        - One or more internal topics are missing. 
      //                                       Missing topics are indicated in the StatusDetail. 
      //                                       The group coordinator will attempt to create all missing internal topics, if any errors occur during
      //                                       topic creation, this will be indicated in StatusDetail.
      //  4 - SHUTDOWN_APPLICATION           - A client requested the shutdown of the whole application.
      { "name": "StatusCode", "type": "int8", "versions": "0+",
        "about": "A code to indicate that a particular status is active for the group membership" },
      { "name": "StatusDetail", "type": "string", "versions": "0+",
        "about": "A string representation of the status." }
    ]},
    { "name": "TopicPartition", "versions": "0+", "fields": [
      { "name": "Topic", "type": "string", "versions": "0+",
        "about": "topic name" },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "partitions" }
    ]},
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]},
    { "name": "Endpoint", "versions": "0+", "fields": [
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "host of the endpoint" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "port of the endpoint" }
    ]}
  ]
}

StreamsGroupDescribe API

Request Schema


{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "StreamsGroupDescribeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The ids of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
      "about": "Whether to include authorized operations." }
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey the schema definition.

INVALID_GROUP_ID is returned should the group ID be empty.

Request Handling

When the group coordinator handle a StreamsGroupDescribeRequest request:

  • Checks whether the group ids exists. If it does not, GROUP_ID_NOT_FOUND is returned. Similarly, it is returned if the group ID is associated with a group type that is not streams .

  • Looks up the groups and returns the response.

Response Schema


{
  "apiKey": TBD,
  "type": "response",
  "name": "StreamsGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },

        { "name":  "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The topology metadata currently initialized for the streams application. Can be null in case of a describe error.",
          "fields": [
            { "name": "Epoch", "type": "int32", "versions": "0+",
              "about": "The epoch of the currently initialized topology for this group." },
            { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The subtopologies of the streams application. This contains the configured subtopologies, where the number of partitions are set and any regular expressions are resolved to actual topics. Null if the group is uninitialized, source topics are missing or incorrectly partitioned.",
              "fields": [
                { "name": "SubtopologyId", "type": "string", "versions": "0+",
                  "about": "String to uniquely identify the subtopology." },
                { "name": "SourceTopics", "type": "[]string", "versions": "0+",
                  "about": "The topics the subtopology reads from." },
                { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+",
                  "about": "The repartition topics the subtopology writes to." },
                { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+",
                  "about": "The set of state changelog topics associated with this subtopology. Created automatically." },
                { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+",
                  "about": "The set of source topics that are internally created repartition topics. Created automatically." }
              ]}
          ]},
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The member epoch." },
            { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member instance ID for static membership." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The rack ID." },

            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },

            { "name": "TopologyEpoch", "type": "int32", "versions": "0+",
              "about": "The epoch of the topology on the client." },

            { "name": "ProcessId", "type": "string", "versions": "0+",
              "about": "Identity of the streams instance that may have multiple clients. " },
            { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "User-defined endpoint for Interactive Queries. Null if not defined for this client." },
            { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
              "about": "Used for rack-aware assignment algorithm." },
            { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+",
              "about": "Cumulative changelog offsets for tasks." },
            { "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+",
              "about": "Cumulative changelog end offsets for tasks." },

            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." },
            { "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
              "about": "The target assignment." },
            { "name": "IsClassic", "type": "bool", "versions": "0+",
              "about": "True for classic members that have not been upgraded yet." }
        ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "Endpoint", "versions": "0+", "fields": [
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "host of the endpoint" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "port of the endpoint" }
    ]},
    { "name": "TaskOffset", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partition", "type": "int32", "versions": "0+",
        "about": "The partition." },
      { "name": "Offset", "type": "int64", "versions": "0+",
        "about": "The offset." }
    ]},
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+",
        "about": "Active tasks for this client." },
      { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+",
        "about": "Standby tasks for this client." },
      { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+",
        "about": "Warm-up tasks for this client. " }
    ]},
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]},
    { "name": "KeyValue", "versions": "0+", "fields": [
      { "name": "Key", "type": "string", "versions": "0+",
        "about": "key of the config" },
      { "name": "Value", "type": "string", "versions": "0+",
        "about": "value of the config" }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." },
      { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
        "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." },
      { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
        "about": "Topic-level configurations as key-value pairs."
      }
    ]}
  ]
}

Response Handling

Nothing particular.

Records

This section describes the new record types required for the new protocol. Similar to KIP-848, we define new record types that persist all data related to groups, their members and their assignments in the __consumer_offsets compacted topic. Each record has a dedicated key type that uses the version to differentiate the records from the records introduced in KIP-848, and those used to persist the committed offsets.

Compared to consumer groups, we introduce one extra record type, that stores information about the topology of the group.

The size of the persisted member metadata in the __consumer_offset topic will have to respect the max message size. But those records are persisted separately for each member, so it should be less of a concern. The same problem affects plain consumers in KIP-848 for very large consumer groups. Eventually, we may implement a way to split the records into multiple messages, and there are currently plans to add transaction support to the group coordinator to make sure the records can be updated atomically. For the first version of the protocol, we will limit those records by the max message size.

Group Metadata

The assignment-related metadata for a group with N members is stored with N+1 records. One metadata record per member and one metadata record for the group.

  • The member metadata record, keyed by the group ID and the member ID, stores all relevant metadata that is sent by a member to the coordinator in a heartbeat request. A notable exception are task changelog offsets and task changelog end offsets, which are not persisted, as they are constantly changing. 
  • The group metadata, keyed by the group ID, contains the group epoch.

The records evolve in the following way:

  • When the first member joins the group, the group metadata and the member metadata records for the member are first created.
  • Every time a new member joins the group, another member metadata record is added
  • Whenever a member updates its member metadata through the heartbeat, a new version of the member metadata record is appended.
  • When a member leaves the group, a tombstone is written for the member metadata key.

When the group lost all members (or never had any)  and offsets.retention.minutes expires, the group metadata tombstone is written. This can only happen in group state EMPTY.

StreamsGroupMemberMetadataKey


{
  "type": "data",
  "name": "StreamsGroupMemberMetadataKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": TBD,
      "about": "The member id." }
  ]
}

StreamsGroupMemberMetadataValue


{
  "type": "data",
  "name": "StreamsGroupMemberMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InstanceId", "versions": "0+", "nullableVersions": "0+", "type": "string",
      "about": "The (optional) instance ID for static membership." },
    { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string",
      "about": "The (optional) rack id." },
    { "name": "ClientId", "versions": "0+", "type": "string",
      "about": "The client id." },
    { "name": "ClientHost", "versions": "0+", "type": "string",
      "about": "The client host." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "The rebalance timeout." },

    { "name": "TopologyEpoch", "type": "int32", "versions": "0+",
      "about": "The epoch of the topology. Must be non-zero." },

    { "name": "ProcessId", "type": "string", "versions": "0+",
      "about": "Identity of the streams instance that may have multiple consumers." },
    { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "User-defined endpoint for running interactive queries on this instance." },
    { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
      "about": "Used for rack-aware assignment algorithm." }
  ],
  "commonStructs": [
    { "name": "Endpoint", "versions": "0+", "fields": [
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "host of the endpoint" },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "port of the endpoint" }
    ]},
    { "name": "KeyValue", "versions": "0+",
      "fields": [
        { "name": "Key", "type": "string", "versions": "0+",
          "about": "key of the config" },
        { "name": "Value", "type": "string", "versions": "0+",
          "about": "value of the config" }
      ]
    }
  ]
}

StreamsGroupMetadataKey


{
  "type": "data",
  "name": "StreamsGroupMetadataKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." }
  ]
}

StreamsGroupMetadataValue


{
  "type": "data",
  "name": "StreamsGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "versions": "0+", "type": "int32",
      "about": "The group epoch." }
  ]
}

Group Topology Metadata

The topology for the group, and the metadata for input topic partitions consumed in the topology, are persisted in two records.

  • The partition metadata record, keyed by the group ID, contains metadata of all topic partitions consumed by the current topology
  • The topology record, keyed by the group ID, stores the topology metadata sent in the heartbeat request when members join.

The records are evolved in the following way

  • When the group coordinator receives valid topology metadata in a heartbeat request, both records are created, using the topology sent in the request and the topic metadata currently known to the group coordinator.
  • When the group coordinator detects that the partition metadata for one of the input partitions used in the topology changed (by comparing the previous partition metadata record to the currently known topic metadata in the broker) during the handling of a group heartbeat, the stream group partition metadata record is updated, and a new assignment is computed.

When the group lost all members and offsets.retention.minutes expires, tombstones is written to remove the records.

StreamsGroupPartitionMetadataKey


{
  "type": "data",
  "name": "StreamsGroupPartitionMetadataKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." }
  ]
}

StreamsGroupPartitionMetadataValue


{
  "type": "data",
  "name": "StreamsGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "versions": "0+", "type": "[]TopicMetadata",
      "about": "The list of topic metadata.", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid",
        "about": "The topic id." },
      { "name": "TopicName", "versions": "0+", "type": "string",
        "about": "The topic name." },
      { "name": "NumPartitions", "versions": "0+", "type": "int32",
        "about": "The number of partitions of the topic." },
      { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
        "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
          { "name": "Partition", "versions": "0+", "type": "int32",
            "about": "The partition number." },
          { "name": "Racks", "versions": "0+", "type": "[]string",
            "about": "The set of racks that the partition is mapped to." }
      ]}
    ]}
  ]
}

StreamsGroupTopologyKey


{
  "type": "data",
  "name": "StreamsGroupTopologyKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." }
  ]
}

StreamsGroupTopologyValue


{
  "type": "data",
  "name": "StreamsGroupTopologyValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "type": "int32", "versions": "0+",
      "about": "The epoch of the topology. " },
    { "name":  "Subtopologies", "type": "[]Subtopology", "versions": "0+",
      "about": "The sub-topologies of the streams application.",
      "fields": [
        { "name": "SubtopologyId", "type": "string", "versions": "0+",
          "about": "String to uniquely identify the subtopology." },
        { "name": "SourceTopics", "type": "[]string", "versions": "0+",
          "about": "The topics the topology reads from." },
        { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
          "about": "Regular expressions identifying topics the subtopology reads from." },
        { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+",
          "about": "The set of state changelog topics associated with this subtopology." },
        { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+",
          "about": "The repartition topics the subtopology writes to." },
        { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+",
          "about": "The set of source topics that are internally created repartition topics." },
        { "name": "CopartitionGroups", "type": "[]CopartitionGroup", "versions": "0+",
          "about": "A subset of source topics that must be copartitioned.",
          "fields": [
            { "name": "SourceTopics", "type": "[]int16", "versions": "0+",
              "about": "The topics the topology reads from. Index into the array on the subtopology level." },
            { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+",
              "about": "Regular expressions identifying topics the subtopology reads from. Index into the array on the subtopology level." },
            { "name": "RepartitionSourceTopics", "type": "[]int16", "versions": "0+",
              "about": "The set of source topics that are internally created repartition topics. Index into the array on the subtopology level." }
          ]
        }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicConfig", "versions": "0+", "fields": [
      { "name": "key", "type": "string", "versions": "0+",
        "about": "The key of the topic-level configuration." },
      { "name": "value", "type": "string", "versions": "0+",
        "about": "The value of the topic-level configuration," }
    ]
    },
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." },
      { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
        "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." },
      { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+",
        "about": "Topic-level configurations as key-value pairs."
      }
    ]}
  ]
}

Current Member Assignment

For each member, there is a record to store the current member assignment. The record is first created when a member joins (with an empty assignment). Every time an assignment is computed for the member during reconciliation, it is compared to the previous assignment for the member. If the assignment changed, a new version of the current member assignment record is appended to the consumer offset topic. When the member leaves the group, a tombstone is written to remove the record.

StreamsGroupCurrentMemberAssignmentKey


{
  "type": "data",
  "name": "StreamsGroupCurrentMemberAssignmentKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": TBD,
      "about": "The member id." }
  ]
}

StreamsGroupCurrentMemberAssignmentValue


{
  "type": "data",
  "name": "StreamsGroupCurrentMemberAssignmentValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "MemberEpoch", "versions": "0+", "type": "int32",
      "about": "The current member epoch that is expected from the member in the heartbeat request." },
    { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
      "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." },
    { "name": "State", "versions": "0+", "type": "int8",
      "about": "The member state. See StreamsGroupMember.MemberState for the possible values." },
    { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned active tasks for this streams client." },
    { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned standby tasks for this streams client." },
    { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned warm-up tasks for this streams client." },
    { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": "[]TaskIds",
      "about": "The active tasks that must be revoked by this member." },
    { "name": "StandbyTasksPendingRevocation", "versions": "0+", "type": "[]TaskIds",
      "about": "The standby tasks that must be revoked by this member." },
    { "name": "WarmupTasksPendingRevocation", "versions": "0+", "type": "[]TaskIds",
      "about": "The warmup tasks that must be revoked by this member." }
  ],
  "commonStructs": [
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]}
  ]
}

Streams Group Target Assignment

The target assignment is stored in N + 1 records, where N is the number of members in the group. When a new target assignment is computed, the group coordinator will compare it with the current target assignment and only write the difference between the two assignments to the __consumer_offsets  partition. That also means if a member left the group, a corresponding tombstone record is written. The assignment must be atomic, so the group coordinator will ensure that all the records are written in a single batch, limiting the size to the maximum batch size (default 1MB), as in consumer groups. 

StreamsGroupTargetAssignmentMemberKey


{
  "type": "data",
  "name": "StreamsGroupTargetAssignmentMemberKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": TBD,
      "about": "The member id." }
  ]
}

StreamsGroupTargetAssignmentMemberValue


{
  "type": "data",
  "name": "StreamsGroupTargetAssignmentMemberValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned active tasks for this streams client." },
    { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned standby tasks for this streams client." },
    { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned warm-up tasks for this streams client." }
  ],
  "commonStructs": [
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "SubtopologyId", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]}
  ]
}

StreamsGroupTargetAssignmentMetadataKey


{
  "type": "data",
  "name": "StreamsGroupTargetAssignmentMetadataKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." }
  ]
}

StreamsGroupTargetAssignmentMetadataValue


{
  "type": "data",
  "name": "StreamsGroupTargetAssignmentMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
      "about": "The assignment epoch." }
  ]
}

Broker Metrics

The existing group metrics are extended to differentiate between streams groups and consumer groups and account for streams group states.

  • Number of groups based on type of the protocol, where the list of protocols is extended by the protocol=streams variation.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|classic|streams}

  • Number of streams groups based on state

kafka.server:type=group-coordinator-metrics,name=streams-group-count,state={empty|not_ready|assigning|reconciling|stable|dead}
  • Streams group rebalances sensor

kafka.server:type=group-coordinator-metrics,name=streams-group-rebalance-rate

kafka.server:type=group-coordinator-metrics,name=streams-group-rebalance-count

Broker Configurations

The new group protocol would be guarded by enabling a new version of the group.version feature flag on the broker side.

Updated properties

Configuration

Description

Values

group.coordinator.rebalance.protocols 

The list of enabled rebalance protocols.

"streams"  is included in the list of protocols to enable streams groups.

This will be added to the default value of this configuration property once this feature is complete.

New properties

Name

Type

Default

Doc

group.streams.session.timeout.ms

int

45s

The timeout to detect client failures when using the streams group protocol.

group.streams.min.session.timeout.ms

int

45s

The minimum session timeout.

group.streams.max.session.timeout.ms

int

60s

The maximum session timeout.

group.streams.heartbeat.interval.ms

int

5s

The heartbeat interval given to the members.

group.streams.min.heartbeat.interval.ms

int

5s

The minimum heartbeat interval.

group.streams.max.heartbeat.interval.ms

int

15s

The maximum heartbeat interval.

group.streams.max.size

int

MaxValue

The maximum number of streams clients that a single streams group can accommodate.

group.streams.acceptable.recovery.lag

long

10’000

The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment.

group.streams.num.warmup.replicas

int

2

The maximum number of warmup replicas.

group.streams.max.warmup.replicas

int

20

Maximum for dynamic configurations of the warmup replica configuration

group.streams.num.standby.replicas

int

0

The number of standby replicas for each task.

group.streams.max.standby.replicas

int

2

Maximum for dynamic configurations of the standby replica configuration

group.streams.task.offset.interval.ms

int

60s

The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed.

group.streams.min.task.offset.interval.ms

int

15s

Minimum for dynamic configurations of the task offset interval. Used to restrict users of the cluster from sending the task changelog offsets too often.

group.streams.assignor.name

string

null

The name of the task assignor used for all streams groups. Can be sticky or highly_available in AK.

Group Configurations

We will add new configurations for the resource type GROUP in DescribeConfigs and IncrementalAlterConfigs to override the default broker configurations dynamically for specific groups.

Name

Type

Default

Doc

group.streams.session.timeout.ms

int

45s

The timeout to detect client failures when using the streams group protocol.

group.streams.heartbeat.interval.ms

int

5s

The heartbeat interval given to the members.

group.streams.acceptable.recovery.lag

long

10’000

The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment.

group.streams.num.warmup.replicas

int

2

The maximum number of warmup replicas.

group.streams.num.standby.replicas

int

0

The number of standby replicas for each task.

group.streams.task.offset.interval.ms

int

60s

The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed.

group.streams.assignor.name

string

null

The name of the task assignor used for this streams groups. Can be sticky or highly_available in AK.

Streams API

In a future major version, when the classic group protocol will be deprecated, the interfaces introduced in KIP-924 would be deprecated, unless an integration with this KIP is proposed in a follow-up KIP.

Streams Configurations

New configurations

Name

Type

Default

Doc

group.protocol

enum

classic

A flag which indicates if the new protocol should be used or not. It could be: classic or streams

topology.epoch

int

0

The epoch of the topology for the streams group. Ignored if group.protocol=classic 

Deprecations

In a future major version, when the classic group protocol will be deprecated, the following configuration options will be deprecated:

acceptable.recovery.lag

max.warmup.replicas

num.standby.replicas

probing.rebalance.interval.ms

rack.aware.assignment.tags

rack.aware.assignment.strategy

rack.aware.assignment.traffic_cost

rack.aware.assignment.non_overlap_cost

task.assignor.class

Note that both the rack-aware task assignor and customizable client-side task assignment may be introduced into the new protocol in follow-up KIPs (see Future Work section). In this case, the latter 5 configuration options would not be deprecated in the future major version.

Admin API

Add the following methods to the org.apache.kafka.client.admin.AdminClient  interface, mostly backed by the same implementations as the consumer group API.

Method signature

Description

RPC used

AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets)Alter offset information for a streams group.OffsetCommit
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterStreamsGroupOffsetsOptions options) Alter offset information for a streams group.OffsetCommit
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<String> topics)Delete offset information for a set of topics in a streams group.OffsetDelete
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<String> topics, DeleteStreamsGroupOffsetsOptions options) Delete offset information for a set of topics in a streams group.OffsetDelete
DeleteStreamsGroupResult deleteStreamsGroups(Collection<String> groupIds)Delete streams groups from the cluster.DeleteGroups
DeleteStreamsGroupResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupOptions options) Delete streams groups from the cluster.DeleteGroups
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds)Describe some streams groups in the cluster.StreamsGroupDescribe
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions options) Describe some streams groups in the cluster.StreamsGroupDescribe
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs)List the streams group offsets available in the cluster for the specified Streams groups.OffsetFetch
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options) List the streams group offsets available in the cluster for the specified Streams groups.OffsetFetch

These correspond to existing APIs for consumer groups and some of the code will be shared. The main reason for duplicating them is the naming, as the current API uses "consumerGroup" in the name, despite the RPCs just using "group". There are some differences:

  • The describeStreamsGroups uses the DescribeStreamsGroup RPC and contains other information than consumer groups.
  • A streams group has an extra state -  NOT_READY, and no legacy states from the classic protocol.
  • removeMembersFromConsumerGroup will not have a corresponding API in this first version, as it is using the LeaveGroup RPC for classic consumer groups, which is not available for KIP-848-style groups.

Admin

   /**
    * Alters offsets for the specified group. In order to succeed, the group must be empty.
    *
    * <p>This is a convenience method for {@link #alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} with default options.
    * See the overload for more details.
    *
    * @param groupId The group for which to alter offsets.
    * @param offsets A map of offsets by partition.
    * @return The AlterStreamsGroupOffsetsResult.
    */
   default AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
       return alterStreamsGroupOffsets(groupId, offsets, new AlterStreamsGroupOffsetsOptions());
   }
 
   /**
    * Alters offsets for the specified group. In order to succeed, the group must be empty.
    *
    * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
    *
    * @param groupId The group for which to alter offsets.
    * @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
    * @param options The options to use when altering the offsets.
    * @return The AlterStreamsGroupOffsetsResult.
    */
   AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterStreamsGroupOffsetsOptions options);
 
   /**
    * Delete offsets for a set of topics in a Streams group with the default options.
    *
    * <p>This is a convenience method for {@link #deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} with default options.
    * See the overload for more details.
    *
    * @param groupId The group for which to delete offsets.
    * @param topics The topics.
    * @return The DeleteStreamsGroupOffsetsResult.
    */
   default DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<String> topics) {
       return deleteStreamsGroupOffsets(groupId, topics, new DeleteStreamsGroupOffsetsOptions());
   }
 
   /**
    * Delete offsets for a set of topics in a Streams group.
    *
    * @param groupId The group for which to delete offsets.
    * @param topics The topics.
    * @param options The options to use when deleting offsets in a Streams group.
    * @return The DeleteStreamsGroupOffsetsResult.
    */
   DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
       Set<String> topics,
       DeleteStreamsGroupOffsetsOptions options);
 
   /**
    * Delete Streams groups from the cluster with the default options.
    *
    * <p>This is a convenience method for {@link #deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} with default options.
    * See the overload for more details.
    *
    * @param groupIds The IDs of the groups to delete.
    * @return The DeleteStreamsGroupsResult.
    */
   default DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds) {
       return deleteStreamsGroups(groupIds, new DeleteStreamsGroupsOptions());
   }
 
   /**
    * Delete Streams groups from the cluster.
    *
    * @param groupIds The IDs of the groups to delete.
    * @param options The options to use when deleting a Streams group.
    * @return The DeleteStreamsGroupsResult.
    */
   DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options);
 
   /**
    * Describe some Streams groups in the cluster, with the default options.
    *
    * <p>This is a convenience method for {@link #describeStreamsGroups(Collection, DescribeStreamsGroupsOptions)}
    * with default options. See the overload for more details.
    *
    * @param groupIds The IDs of the groups to describe.
    * @return The DescribeStreamsGroupsResult.
    */
   default DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds) {
       return describeStreamsGroups(groupIds, new DescribeStreamsGroupsOptions());
   }
 
   /**
    * Describe some Streams groups in the cluster.
    *
    * @param groupIds The IDs of the groups to describe.
    * @param options  The options to use when describing the groups.
    * @return The DescribeStreamsGroupsResult.
    */
   DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds,
                                                     DescribeStreamsGroupsOptions options);
 
   /**
    * List the Streams group offsets available in the cluster for the specified Streams groups with the default options.
    *
    * <p>This is a convenience method for {@link #listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}
    * to list offsets of all partitions for the specified Streams groups with default options.
    *
    * @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
    * @return The ListStreamsGroupOffsetsResult
    */
   default ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) {
       return listStreamsGroupOffsets(groupSpecs, new ListStreamsGroupOffsetsOptions());
   }
 
   /**
    * List the Streams group offsets available in the cluster for the specified Streams groups.
    *
    * @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
    * @param options The options to use when listing the Streams group offsets.
    * @return The ListStreamsGroupOffsetsResult
    */
   ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options);


AlterStreamsGroupOffsetResult

package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }
 
    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}


AlterStreamsGroupOffsetsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for the {@link Admin#alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsOptions extends AbstractOptions<AlterStreamsGroupOffsetsOptions> {
}

DeleteStreamsGroupOffsetsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#deleteStreamsGroupOffsets(String, Set<String>, DeleteStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }
 
    /**
     * Return a future which can be used to check the result for a given topic.
     */
    public KafkaFuture<Void> partitionResult(final String topic) {
    }
}

DeleteStreamsGroupOffsetsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for the {@link Admin#deleteStreamsGroupOffsets(String, Set<String>, DeleteStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsOptions extends AbstractOptions<DeleteStreamsGroupOffsetsOptions> {
}

DeleteStreamsGroupsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteStreamsGroupsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }
 
    /**
     * Return a map from group id to futures which can be used to check the status of individual deletions.
     */
    public Map<String, KafkaFuture<Void>> deletedGroups() {
    }
}

DeleteStreamsGroupsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteStreamsGroupsOptions extends AbstractOptions<DeleteStreamsGroupsOptions> {
}

DescribeStreamsGroupsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#describeStreamsGroups(Collection<String>, DescribeStreamsGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeStreamsGroupsResult {
    /**
     * Return a future which yields all StreamsGroupDescription objects, if all the describes succeed.
     */
    public KafkaFuture<Map<String, StreamsGroupDescription>> all() {
    }
 
    /**
     * Return a map from group id to futures which yield group descriptions.
     */
    public Map<String, KafkaFuture<StreamsGroupDescription>> describedGroups() {
    }
}

StreamsGroupDescription

/**
 * A detailed description of a single streams group in the cluster.
 */
public class StreamsGroupDescription {

    public StreamsGroupDescription(
        final String groupId,
        final int groupEpoch,
        final int targetAssignmentEpoch,
        final int topologyEpoch,
        final Collection<StreamsGroupSubtopologyDescription> subtopologies,
        final Collection<StreamsGroupMemberDescription> members,
        final GroupState groupState,
        final Node coordinator,
        final Set<AclOperation> authorizedOperations
    );

    /**
     * The id of the streams group.
     */
    public String groupId();
    
    /**
     * The epoch of the consumer group.
     */
    public int groupEpoch();
 
    /**
     * The epoch of the target assignment.
     */
    public int targetAssignmentEpoch();

    /**
     * The epoch of the currently used topology.
     */
    public int topologyEpoch();
    
    /**
     * A list of the members of the streams group.
     */
    public Collection<StreamsGroupMemberDescription> members();

    /**
     * A list of the subtopologies in the streams group.
     */
    public Collection<StreamsGroupSubtopologyDescription> subtopologies();

    /**
     * The state of the streams group, or UNKNOWN if the state is too new for us to parse.
     */
    public GroupState groupState();

    /**
     * The group coordinator, or null if the coordinator is not known.
     */
    public Node coordinator();

    /**
     * authorizedOperations for this group, or null if that information is not known.
     */
    public Set<AclOperation> authorizedOperations();
}

StreamsGroupMemberDescription

/**
 * A detailed description of a single member in the group.
 */
public class StreamsGroupMemberDescription {

    public StreamsGroupMemberDescription(
        final String memberId,
        final int memberEpoch,
        final Optional<String> instanceId,
        final Optional<String> rackId,
        final String clientId,
        final String clientHost,
        final int topologyEpoch,
        final String processId,
        final Optional<Endpoint> userEndpoint,
        final Map<String, String> clientTags,
        final List<TaskOffset> taskOffsets,
        final List<TaskOffset> taskEndOffsets,
        final StreamsGroupMemberAssignment assignment,
        final StreamsGroupMemberAssignment targetAssignment,
        final boolean isClassic
    );

    /**
     * The id of the group member.
     */
    public String memberId();

    /**
     * The epoch of the group member.
     */
    public int memberEpoch();

    /**
     * The id of the instance, used for static membership, if available.
     */
    public Optional<String> instanceId(); 
    
    /**
     * The rack ID of the group member.
     */
    public Optional<String> rackId();
    
    /**
     * The client id of the group member.
     */
    public String clientId();

    /**
     * The host of the group member.
     */
    public String clientHost();

    /**
     * The epoch of the group member. 
     */
    public int memberEpoch();

    /**
     * The epoch of the topology present on the client.
     */
    public int topologyEpoch();
 
    /**
     * Identity of the streams instance that may have multiple clients.
     */
    public String processId(); 

    /**
     * User-defined endpoint for Interactive Queries.
     */
    public Optional<Endpoint> userEndpoint();

    /**
     * Used for rack-aware assignment algorithm.
     */
    public Map<String, String> clientTags();

    /**
     * Cumulative offsets for tasks.
     */
    public List<TaskOffset> taskOffsets();

    /**
     * Cumulative task changelog end offsets for tasks.
     */
    public List<TaskOffset> taskEndOffsets();

    /**
     * The current assignment.
     */
    public StreamsGroupMemberAssignment assignment();

    /**
     * The target assignment.
     */
    public StreamsGroupMemberAssignment targetAssignment();     
    
    /**
     * The flag indicating whether a member is classic.
     */
    public boolean isClassic();

    /**
     * The cumulative offset for one task.
     */
    public static class TaskOffset {

        public TaskOffset(final String subtopologyId, final int partition, final long offset);

        /**
         * The subtopology identifier.
         */
        public String subtopologyId();

        /**
         * The partition of the task.
         */
        public int partition();

        /**
         * The cumulative offset (sum of offsets in all input partitions).
         */
        public long offset();
    }
}

StreamsGroupMemberAssignment

/**
 * A description of the assignments of a specific streams group member.
 */
public class StreamsGroupMemberAssignment {

    public StreamsGroupMemberAssignment(
        final List<TaskIds> activeTasks,
        final List<TaskIds> standbyTasks,
        final List<TaskIds> warmupTasks
    );

    /**
     * Active tasks for this client.
     */
    public List<TaskIds> activeTasks();

    /**
     * Standby tasks for this client.
     */
    public List<TaskIds> standbyTasks();
    
    /**
     * Warmup tasks for this client.
     */
    public List<TaskIds> warmupTasks();

    /**
     * All tasks for one subtopology of a member.
     */
    public static class TaskIds {

        public TaskIds(final String subtopologyId, final List<Integer> partitions);

        /**
         * The subtopology identifier.
         */
        public String subtopologyId();

        /**
         * The partitions of the input topics processed by this member.
         */
        public List<Integer> partitions();
    }
}

StreamsGroupSubtopologyDescription

/**
 * A detailed description of a single subtopology
 */
public class StreamsGroupSubtopologyDescription {

    public StreamsGroupSubtopologyDescription(
        final String subtopologyId,
        final List<String> sourceTopics,
        final List<String> repartitionSinkTopics,
        final Map<String, TopicInfo> stateChangelogTopics,
        final Map<String, TopicInfo> repartitionSourceTopics);

    /**
     * String to uniquely identify the subtopology.
     */
    public String subtopologyId();

    /**
     * The topics the topology reads from.
     */
    public List<String> sourceTopics();

    /**
     * The repartition topics the topology writes to.
     */
    public List<String> repartitionSinkTopics();

    /**
     * The set of state changelog topics associated with this subtopology.
     */
    public Map<String, TopicInfo> stateChangelogTopics();

    /**
     * The set of source topics that are internally created repartition topics.
     */
    public Map<String, TopicInfo> repartitionSourceTopics();

    /**
     * Information about a topic.
     */
    public static class TopicInfo {

        public TopicInfo(final int partitions, final Map<String, String> topicConfigs);

        /**
         * The number of partitions in the topic.
         */
        public int partitions();

        /**
         * Configurations of the topic.
         */
        public Map<String, String> topicConfigs();
    }

}

DescribeStreamsGroupsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for {@link Admin#describeStreamsGroups(Collection<String>, DescribeStreamsGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeStreamsGroupsOptions extends AbstractOptions<DescribeStreamsGroupsOptions> {
    public DescribeStreamsGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);
 
    public boolean includeAuthorizedOperations();
}

ListStreamsGroupOffsetsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec>, ListStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsResult {
    /**
     * Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }
 
    /**
     * Return a future which yields a map of topic partitions to offsets for the specified group.
     */
    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) {
    }
}

ListStreamsGroupOffsetsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for {@link Admin#listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec>, ListStreamsGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsOptions extends AbstractOptions<ListStreamsGroupOffsetsOptions> {
}

ListStreamsGroupOffsetsSpec

package org.apache.kafka.client.admin;
  
/**
 * Specification of Streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec>, ListStreamsGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsSpec {
  public ListStreamsGroupOffsetsSpec();
 
  /**
   * Set the topic partitions whose offsets are to be listed for a Streams group.
   */
  ListStreamsGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);
 
  /**
   * Returns the topic partitions whose offsets are to be listed for a Streams group.
   */
  Collection<TopicPartition> topicPartitions();
}

GroupType

Another case is added to the org.apache.kafka.common.GroupType  enum:

Enum constant

Description

STREAMS("Streams") Streams group

GroupState

Another case is added to the org.apache.kafka.common.GroupState  enum:

Enum constant

NOT_READY

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors package, corresponding to the new error codes in the Kafka protocol.

  • StreamsInvalidTopologyException  - The supplied topology is invalid. Returned if the client sends a topology that does not fulfill the expected invariants.
  • StreamsInvalidTopologyEpochException  - The client provided a invalid topology epoch with respect to the stream group state, for example, the topology was changed by the epoch not bumped.
  • StreamsTopologyFencedException  - When a client attempts to join with an outdated topology epoch.

Command-line tools

kafka-groups.sh

We add an option --streams  to the above commad-line tool, which filter the groups to show share groups. Furthermore streams  becomes one of the possible values for --group-type . 

kafka-streams-groups.sh

A new tool called kafka-streams-groups.sh is added for working with streams groups. It has the following options:

Option

Description

--version

Display Kafka version.

--verbose

Use with --describe --state  to show group epoch and target assignment epoch.

Use with --describe --members  to show for each member the member epoch, target assignment epoch, current assignment, target assignment, and whether member is still using the classic rebalance protocol (KIP-1099).

Use with --describe --offsets  and --describe  to show leader epochs (KIP-320).

--all-input-topics

Use with --reset-offsets or --delete-offsets. If specified, includes all input topics of the streams group, as stored by the topology metadata on the broker.

--input-topics <String: topics>

Use with --reset-offsets or --delete-offsets. Comma-separated list of user input topics. For these topics, the tool by default will reset the offset to the earliest available offset, or delete the offsets. Reset to other offset position by appending other reset offset option, ex: --input-topics foo --shift-by 5. 

--internal-topics <String: topics>

Use with --delete. Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by the default behaviour (do a dry-run without this option to view these topics). 

--to-offset <Long: offset>

Reset input topic offsets to a specific offset

--to-latest

Reset input topic offsets to latest offset.

--to-earliest

Reset input topic offsets to earliest offset.

--by-duration <String: duration>

Reset input topic offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'

--to-datetime <String: datetime>

Reset input topic offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--from-file

Reset input topic offsets to values defined in CSV file.

--shift-by <Long: n>

Reset input topic offsets shifting current offset by 'n', where 'n' can be positive or negative.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)   

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Not Ready', 'Reconciling', 'Assigning', 'Stable' and 'Dead'.

--reset-offsets

Reset input topic offsets of streams group. Supports one streams group at a time, and instances should be inactive.

You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, --to-latest, --shift-by, --from-file, --to-current, --to-offset.

To define the scope use --all-input-topics or --input-topics. One scope must be specified unless you use '--from-file'.

Fails if neither '--dry-run' nor '–execute' is specified.

--offsets

Describe the group and list all input topic partitions in the group along with their offset lag. This is the default sub-action of --describe and may be used with the '--describe' option only.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--list

List all streams groups.

--help

Print usage information.

--group <String: streams group ID>

The group ID (application ID) of the streams ID we wish to act on.

--execute

Execute operation. Supported operations: reset-offsets.

--dry-run

Only show results without executing changes on streams groups. Supported operations: reset-offsets.

--describe

Describe streams group and list offset lag (number of records not yet processed) related to given group.

--delete-offsets

Delete offsets of streams group. Supports one streams group at the time.

To define the scope use --all-input-topics or --input-topics. One scope must be specified unless you use '--from-file'.

--delete

Pass in a group to delete entire streams group. For instance --group g1. Deletes offsets, internal topics of the streams group and and topology and ownership information on the broker. Instances should be inactive.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

Here are some examples. 

To display a list of all streams groups:

$ kafka-streams-groups.sh --bootstrap-server localhost:9092 --list

To delete the inactive streams group S1 , which deletes all offsets, internal topics, assignment and topology metadata on the broker:

$ kafka-streams-groups.sh --bootstrap-server localhost:9092 --group S1 --delete

To delete the offsets from input topics of inactive streams group S1 , which essentially resets the consumption of this topic in the streams group:

$ kafka-streams-groups.sh --bootstrap-server localhost:9092 --group S1 --delete-offsets --all-input-topics

To set the starting offset for the input topics of inactive streams group S1  to a specific date and time:

$ kafka-streams-groups.sh --bootstrap-server localhost:9092 --group S1 --reset-offsets --all-input-topics --to-datetime 1999-12-31T23:57:00.000 --execute

kafka-streams-application-reset

The streams application reset tool will only work with the classic protocol, and will be deprecated when the classic protocol is deprecated.

Compatibility, Deprecation and Migration Plan

The new broker-side group management will require more information than the current UserData to compute the assignment, so existing Streams versions cannot join a Streams group. 

To enable rolling upgrades from the old protocol to the new protocol, we will have to extend the current UserData with the missing (topology-derived) data. We can then introduce a compatibility layer in the new group coordinator that translates RPCs from the legacy rebalance protocol into requests in the streams rebalance protocol by deserializing the UserData payload. The migration path for the user is then to update to a new version of Streams, before switching from the legacy consumer-based protocol to the streams rebalance protocol in a second step. The first time a consumer sends a StreamsGroupHeartbeat with the group ID of an existing legacy consumer group, the group will be converted into a Streams group, similar to how the migration is handled for legacy consumer groups in KIP-848.

Named topologies will not be supported in the new protocol. A Streams application using named topologies will throw an error on start-up when configured with the new protocol. While noteworthy, this is not strictly a compatibility concern, since the configuration to enable named topologies is not part of the public API.

Test Plan

Our primary method of validation will be the execution of the existing test infrastructure for Kafka streams on the new protocol, including porting all integration tests and system tests. Furthermore, we will add new integration tests for offline & online migration from the classic protocol to the streams protocol.

Rejected Alternatives

  • KIP-848 originally envisioned that Kafka Streams continues to using a byte-array metadata protocol on top of regular consumer group RPCs, and use a client-side assignor, and provided a rough proposal how to do so. The described approach was never implemented. In this KIP, we propose to use broker-side assignment by default for several reasons (mostly already mentioned in the motivation section):

    • Client-side assignment makes it hard to debug and tune the assignment logic, where a simple parameter change requires redeploying of all streams clients.

    • Using opaque byte-arrays to exchange the required metadata makes it hard to understand the state of a streams group on the protocol level. One has to discover the information from logs spread across all client machines. Central mechanisms of the Kafka protocol, such as versioning of RPCs, needs to be reimplemented on the level of RPCs.

    • One could argue that the Kafka protocol is simpler without Kafka Streams specific RPCs, but we think the opposite is true. The KIP-848 consumer group protocol was extended with client-side assignment, byte-array metadata and metadata versioning just for Kafka Streams. Having the concerns of Kafka Streams dealt with in separate RPCs mainly introduces a clearer separation and gives the Kafka community the ability to simplify the upcoming consumer protocol.

  • Instead of using the heartbeat RPC during joining to initialize the topology, we considered using a dedicated group intialization RPC. Using the heartbeat RPC for initialization turned out to be simpler and we realized that the drawbacks that we wanted to avoid with the dedicated initialization RPC where not that significant. For example, we thought that we could save on data sent for intitialization with a dedicated intialization call. However, sending the topology metadata in the heartbeat when joining does not seem to be too much and we can ensure all clients use the same topology metadata. Also with two different calls we had the overhead to coordinate the initialization and the heartbeat requests.

  • An earlier draft of this KIP proposed introducing optional client-side task assignment, that would allow customization of the task assignment similar to KIP-924. We decided to remove it from this KIP, to limit the scope of the KIP and not introduce features in the protocol whose use we cannot fully envision yet. The RPCs defined in this KIP are, however, defined in a way that they could easily be extended with client-side assignment in a follow-up KIP, possibly together with a public interface for adding custom task assignors on the client side.

Future Work

The classic streams rebalance protocol will remain fully functional with the introduction of the streams protocol, however the aim is to eventually deprecate. There are some use cases no yet covered by this KIP, that would have to be added to the protocol in follow-up KIPs, if there is enough interest.

  • Customizable client-side task assignment in the style of KIP-924
  • Optimization of the assignment in cases were not every rack has a replica of the partition, as in KIP-925: Rack aware task assignment in Kafka Streams
  • Injection of a customized implementation of a streams rebalance protocol using the KafkaClientSupplier.

There are also several improvements of long-standing Kafka Streams issues that can be built on top of this protocol, in particular:

  • Topology updates

    • Validate topology compatibility broker-side
    • Automatically drain repartition topics on a topology update

  • In addition to the existing shutdown-flag, a global pause/resume functionality could be built on top of the protocol
  • There is an opportunity to improve EOSv2 in the new protocol. One may add the current transaction.id to the heartbeat request and a list of closing transactions.id that the client should fence in the heartbeat response to improve EOSv2 and avoid the need to rely on a low transaction.timeout.ms config.


  • No labels