Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note that this is joint work by Lucas Brutschy and Bruno Cadonna .

Motivation

Kafka Streams clients use a protocol to distribute stream processing tasks among Streams clients. Since the inception of Streams, this has worked by piggybacking custom byte payloads on the pre-KIP-848 consumer rebalance protocol. A custom partition assignor on one of the Streams clients would perform task assignment and various other centralized tasks, such as internal topic initialization. While this approach has served its purpose well and the streams partition assignor has achieved impressive flexibility and complexity over the years, it has also had a number of deficiencies:

  • A large fraction of the production incidents in Kafka Streams are caused by bugs in the rebalancing logic or misconfiguration thereof. With the rebalance logic being run on the processing node, the implementer of the business logic is also typically responsible for configuring, upgrading and debugging the rebalance logic. 

    • When rebalance problems occur, the timeline of events and information that caused the rebalance and influenced the assignment is spread across all clients, making it difficult to debug.

    • Since streams versions on the client are notoriously slow to be picked up by teams building the processing logic, bugs in the rebalance logic are prevalent for years after being fixed on trunk.

    • Since assignment logic is controlled by the streams application, tuning the parameters of the assignment logic requires redeployment of the application, which in itself triggers rebalances (and reassignments), creating a vicious cycle.

  • With the rebalance logic being based on the metadata on one of the Streams clients, metadata inconsistencies among clients have caused a variety of issues.

  • Particularly, the use of raw byte arrays in the protocol means that brokers cannot understand or validate the payload that is propagated among Streams clients.

  • In the implementation, piggybacking the task assignment logic on the consumer partition assignment and injecting the required Streams metadata and functionality into the general-purpose KafkaConsumer interface has required a number of workarounds, increasing code complexity and hurting the maintainability of the code. This has affected both the streams code, which had to work around the restrictions of the consumer interface, and the consumer, whose interface had to be padded to account for Kafka Streams requirements.

  • Because the protocol is not Kafka Streams native, advanced features like “smooth scale out” or “application shutdown on error” and others require workarounds like probing rebalances, which are disruptive for the running application as well as often hard to reason about by users.

KIP-848 introduced the next generation of the consumer rebalance protocol, which also requires a redefinition of the streams rebalance protocol. We propose to use this change in the protocol to solve or alleviate the above problems by introducing a dedicated protocol for Kafka Streams, which uses dedicated RPCs for task rebalancing and where centralized logic such as topic initialization and task assignment are performed centrally by the brokers’ group coordinator.

Design Goals

We propose to introduce new RPCs designed to make Streams initialization and task assignment first-class citizens of the Kafka protocol. The proposed RPCs are built on top of the following design goals:

  • We want to inherit all the advantages of the new consumer rebalance protocol introduced in KIP-848, such as greatly simplified clients, task assignment on the broker by default, truly incremental and cooperative rebalancing without global synchronization, and the ability of each instance to initiate a metadata update to the rebalance logic.

  • Move central initialization tasks, such as the creation of internal topics, to the group coordinator.

  • Make all data required for initialization and assignment explicit in the protocol and deserializable by the broker, with corresponding compatibility guarantees.

  • Establish a single source of truth for stream application metadata such as the internal topic configuration, the topology and the assignment configuration to detect inconsistencies early by persisting them on the broker.

  • Make the assignment logic parameters centrally tunable, without the need to redeploy the clients.

  • Make active tasks, standby tasks and warm-up tasks first-class citizens in assignment and reconciliation

Proposed Changes

Overview

We introduce the concept of a Streams group in parallel to a consumer group. Streams clients use a dedicated heartbeat RPC, StreamsGroupHeartbeat to join a group, leave a group, and update the group coordinator about its currently owned tasks and its client-specific metadata. The group coordinator manages a Streams group similarly to a consumer group, continuously updating the group member metadata via heartbeat responses and running assignment logic when changes are detected. 

To assign tasks among Streams clients, the group coordinator uses topology metadata that is initialized members joins the group and persisted in the consumer offsets topic. To trigger topology initialization and/or detect topology inconsistencies, each heartbeat contains a unique ID of the topology (which can be derived automatically from the topology via hashing). 

Streams groups

We will introduce a new group type called streams to the group coordinator. For each Streams group, we define new record key and value types for the group metadata, topology metadata and group member metadata. The records are persisted in the __consumer_offsets topic. A group can either be a streams group, a share group or a consumer group, defined by the first heartbeat request using the corresponding GroupId

Joining, leaving and staying in a Streams group

During the complete lifetime of a Streams client, its members continuously send StreamsGroupHeartbeatRequests to the group coordinator. The requests contain the following information:

  • All non-assignment metadata as defined in the ConsumerGroupHeartbeatRequest such as the group ID, member ID, member epoch, instance ID, rack ID and rebalance timeout.

  • The currently assigned active, standby and warm-up tasks. Each task is identified by a subtopology ID (which can be an arbitrary string in protocol for future extensibility, even though subtopologies are currently numbered) and a partition number.

  • The process ID to identify multiple members running in the same process (i.e. the same Streams client).

  • The user-defined endpoint to be used for running interactive queries.

  • A unique ID of the processing topology. The ID is defined by the client, and will be the hash of the topology representation by default.

  • Topology metadata needed for creating internal topics and computing the assignment.
  • For each task, the current sum of task changelog offsets and task changelog end offsets. These are used to determine whether a warm-up task is below acceptable.recovery.lag and can be used to optimize the assignment in the assignor. For more details on these offsets see "Cumulative task changelog offsets and task changelog end offsets" section below

  • A flag to request shutdown of the whole application (only if the member epoch is the group leave epoch).

Group ID, member ID, member epoch, and instance ID (if defined by the client) are sent with each heartbeat request. Any other information that has not changed since the last heartbeat can be omitted, unless an error occurred.

Heartbeat handling & response

If the heartbeat is valid and any data in the heartbeat has changed, the group coordinator will update its internal representation of the member correspondingly and persist the updated member metadata in a record on the __consumer_offsets topic. It then potentially updates the group assignment and reconciles member assignments, which is described in the section "Rebalance Process".

The group coordinator responds with a StreamsGroupHeartbeatResponse, which contains 

  • All non-assignment metadata as defined in the ConsumerGroupHeartbeatResponse such as the member ID (when joining as a new member), new member epoch and heartbeat interval ms.

  • The current target assignment for this member, as sets of active, standby and warm-up task IDs.

  • For each client in the group, the user-defined endpoint and the assigned tasks for IQ

Again, fields that don’t change will be omitted from the heartbeat response.

Initializing and updating topology metadata on the broker

Whenever a member joins the Streams group, the first heartbeat request contains metadata of the topology. The metadata describes the topology as a set of subtopologies, each identified by a unique string identifier and containing the following metadata that is relevant for creation of internal topics and assignment:

  • The set of source topics. Here, the source topics can be defined through a Google RE2/J regular expression.

  • The set of repartition sink topics. We do not include non-repartition sink topics or output topics that the topology produces in using dynamic routing.

  • The set of changelog topics, replication factors and other topic-level configurations.

  • The set of repartitions topics, the number of partitions for each of them (optional), replication factors and other topic-level configurations.

The metadata sent to the broker via the heartbeat is persisted as a new record type in the __consumer_offsets topic and can be retrieved in subsequent requests. If one or more internal topics do not exist, the group coordinator invokes an synchronous initialization. Only after the topology metadata is initialized and the internal topics are created, the group coordinator starts assigning tasks to the members of the group.

If internal topics exist, but no consistent set of topics can be created without changing the number of partitions of the existing topics, an error is returned. The topic-level configurations are not validated and only used for initialization.

Topology updates

Topology updates can be initiated by any member upon joining. Whenever a Streams member joins the group (whether a new member, or a static member after a restart) with a topology ID that is different from the current topology of the streams group, the group coordinator re-initializes the broker-side topology with the topology metadata contained in the first heartbeat of the Streams member. If the topology ID of an existing member does not correspond to the current topology ID of the group, this will be indicated in the heartbeat response sent to that member (for informational purposes, e.g., logging a warning).

The topology ID of the group and the topology ID of each member are provided to the assignor, which will make sure that members with a topology ID that does not correspond to the group's topology ID will not get any new tasks assigned, avoiding problems caused by topology incompatibilities.

Example

For example of a rolling bounce to upgrade the topology, let’s assume a sticky assignor with stateless tasks and no standby tasks and one thread per process for illustration.

Assume we have 3 clients A, B, and C with a uniform assignment (#1 is the topology ID):

  • A(#1): T1, T2, T3

  • B(#1): T4, T5, T6

  • C(#1): T7, T8, T9

We shut down C to update its topology. Once we shut down C, the assignment is the following:

  • A(#1): T1, T2, T3, T7

  • B(#1): T4, T5, T6, T8, T9

Then, C rejoins the group with topology ID #2. This topology ID does not correspond to the group's topology ID, so C will not get any tasks assigned. A and B get to keep their tasks.

  • A(#1): T1, T2, T3, T7

  • B(#1): T4, T5, T6, T8, T9

  • C(#2): -

The broker will ask C to initialize the new topology. Once the topology is initialized, from now on, A and B will not get new tasks assigned, but C will get new tasks assigned because now #2 is the group's topology ID. The rest of the assignment is unchanged - just that A and B cannot get any new tasks, but they can retain some of the tasks they have. So since the sticky assignor balances out the assignment, we should end up with something like this:

  • A(#1): T2, T3, T7

  • B(#1): T5, T6, T8

  • C(#2): T1, T4, T9

B is shut down for the topology update. Since, again, A cannot get new tasks but C can, the new assignment is:

  • A(#1): T2, T3, T7

  • C(#2): T1, T4, T9, T5, T6, T8

Client B comes back with the new topology ID, and gets new tasks assigned:

  • A(#1): T2, T3, T7

  • B(#2): T1, T4, T9

  • C(#2): T5, T6, T8

And so on.

It is up to the client implementation how topology IDs are defined. The protocol and the broker only requires that two clients with the same topology ID should have the same topology. In the initial implementation, the topology ID will be derived from the topology by deterministic hashing. However, it would also be possible to derive an implementation where topology IDs are explicitly defined through a configuration. This would, for example, allow the topologies to be manually versioned or topology IDs be derived from a git commit hash.

Adding a strict mode, where topology updates are requested explicitly, any implicit updates of the topology are rejected and members with incompatible topologies are fenced out of the group can be built on top of this protocol, but will be postponed to a follow-up KIP.

Describing and listing streams groups

Streams groups will be returned in ListGroupsResponse with GroupType equal to the string streams. The group ID of a streams group can be used in OffsetFetchRequest and OffsetCommitRequest as usual. Sending a ConsumerGroupHeartbeatRequest to a streams group will return an GROUP_ID_NOT_FOUND error. Sending a StreamsGroupHeartbeatRequest to a consumer group will similarly return an GROUP_ID_NOT_FOUND error.

There will be a new RPC DescribeStreamsGroup that returns, given the group ID, all metadata related to the streams group, such as

  • The topology metadata with which the topology was initialized provided by the first heartbeat with the new topology.

  • The latest member metadata that each member provided through the StreamsGroupHeartbeat API.

  • The current target assignment generated by the assignor.

We also include extensions of the Admin API and the command-line tools to describe and modify streams groups.

Rebalance Process

Similar as in KIP-848 consumer groups, the rebalance process is entirely driven by the group coordinator, and based on the same three epochs: group epoch, assignment epoch, and member epoch. The main difference is the assigned resource - instead of assigning partitions to consumers, we assign tasks consisting of a subtopology ID and a partition number, and each task can be assigned to a client in three roles: as an active task, as a standby task or as a warm-up task.

We only explain the main differences in the rebalance process in regular consumer groups here.

  • The partition metadata for the group, which is tracked by the group coordinator, is the partition metadata of all input topics (i.e., user source topics and internal repartition topics we read from).

  • The group epoch is bumped:

    • When a member joins or leaves the group.
    • When a member is fenced or removed from the group by the group coordinator.
    • When the partition metadata is updated. For instance when a new partition is added or a new topic matching the subscribed topics is created.
    • When a member with an assigned warm-up task reports a task changelog offset and task changelog end offset whose difference is less that acceptable.recovery.lag.
    • When a member updates its topology metadata, rack ID, client tags or process ID. Note: Typically, these do not change within the lifetime of a Streams client, so this only happens when a member with static membership rejoins with an updated configuration.
    • When an assignment configuration for the group is updated.

Assignment

Every time the group epoch is bumped, a new task assignment is computed by calling the task assignor, which consists of assigning tasks as active, standby or warm-up tasks to members of the group. The task assignor is configured on the broker side both through a global static configuration that defined the default assignor for all groups, and a dynamic group-level configuration that sets the assignor for a specific group.

The AK implementation will provide the following assignors

  • highly_available - Like the current HighAvailabilityTaskAssignor

  • sticky - Like the current StickyTaskAssignor

Which assignors are provided by the group coordinator is not part of the protocol and is specific to each implementation. In this KIP, we do not introduce a pluggable interface for a task assignors, but we leave it open to do so in the future.

If no task assignor is configured globally or on the group-level, the broker picks the assignor. The broker is free to make a dynamic decision here - the AK implementation will choose highly_available for stateful topologies and sticky for stateless topologies.

Each task assignor must make sure to fulfill the following invariants:

  • Each warm-up task that has reached acceptable.recovery.lag must be turned into an active task or a standby task, or be unassigned from the member.

  • A stateful task (in the various roles) cannot be assigned to two clients with the same processId.

Cumulative task changelog offsets and task changelog end offsets

Whenever a stateful task is added or removed to/from a Streams client, or when warm-up task reaches acceptable.recovery.lag and in regular intervals, defined by a broker-side configuration task.offset.interval.ms, each client reports two sets of offsets, the sums of task changelog offsets and the sums of task changelog end offsets. These sets can be used by the assignors, in particular the highly_available assignor, to determine which tasks are caught-up to acceptable.recovery.lag and optimize the assignment when multiple clients with a partial copy of the state exist.

Cumulative task changelog offsets

The cumulative changelog offset of a task is the sum of all positions in the changelog topics of the local state stores belonging to the task. A Streams member reports the cumulative changelog offsets for all tasks with local state. That is:

  • For each processing (active) task, the sum of the offsets last fully replicated to the broker in the changelog topic of each local state. Under at-least-once guarantees, this is going to correspond to the high watermark, under exactly-once semantics, it’s going to be the last stable offset.

  • For each restoring (active, standby or warm-up), task the sum of the positions in the changelog topic and checkpointed to the local state directory.

  • For each dormant task (task which is not owned, but which has state locally on disk), the sum of the checkpointed positions in the changelog topic present in its state directory. The client only reports the offsets for dormant tasks that we manage to acquire the lock to the state directory for.

Members that run in the same process (and use the same state directory) may report offsets for overlapping sets of dormant tasks. These offsets can conflict (since they are recorded at different points in time), but these conflicts can easily be resolved by taking the most recently received offsets. The current assignors will be updated to take this into account.

Cumulative task changelog end offsets

Similarly, the member reports the sum of the end offsets in all changelog topics for the currently owned tasks, if available. This simplifies the broker-side task assignment, since the broker doesn’t need to fetch the current end-offsets. Specifically:

  • For active processing tasks, the cumulative task changelog end offset is the same as the cumulative task changelog offset

  • For all restoring tasks (active, standby or warm-up), the cumulative end-offset is the sum of the last end offsets cached by the restore consumer. If an end offset of a topic partition is unknown, no end-offset is reported.

  • For dormant standby tasks, no end-offset is reported.

Using cumulative task changelog offsets and cumulative task changelog end offsets

By reporting cumulative task changelog offsets and cumulative task changelog end offsets instead of the cumulative task lag, we can determine most task lags in the group coordinator, without the group coordinator or the streams client having to do additional requests. Even if the end offsets of the topic partition is not known to that member, as long as it is known to another member in the group. However, it may happen that no end offset is known in the group coordinator - for example, for tasks that are dormant on at least one client, and no member currently owns that task as active, standby or warm-up tasks. In these cases, the task with unknown end-offsets are never considered to be caught-up, however, the cumulative task changelog offset can still be used when deciding on where to place a new active, standby or warm-up task - typically, by selecting the client with the maximal cumulative task changelog offset that fulfils all other requirements.

Reconciliation of the group

Once a new target assignment is installed, each member will independently reconcile their current assignment with their new target assignment. Ultimately, each member will converge to their target epoch and assignment. The reconciliation process handles active tasks and standby/warm-up tasks differently:

  • Active tasks are reconciled like topic partitions in the KIP-848 consumer group protocol, that is, for reconciling the target assignment, the reconciliation follows three phases:

    • The group coordinator first asks the member to revoke any active tasks that are not assigned to that member in the target assignment any more, by removing those active tasks from the set of tasks sent to the member in the heartbeat response

    • The member must first confirm revocation of these active tasks by removing them from the set of active tasks sent in the heartbeat request

    • Now, active tasks will be incrementally assigned to the member. An active task is assigned as soon as no other client owns it anymore, that is, once the previous owner (if any) has confirmed the revocation of the corresponding active task.

  • Standby and warm-up tasks are reconciled in parallel with active tasks, but following slightly different logic:

    • As for active tasks, standby and warm-up tasks removed from the members target assignment are removed from the assignment sent in the heartbeat response immediately, that is, with the next heartbeat. The member confirms the revocation of the tasks as soon as the state directory (or any other resource related to the task) is closed.

    • Newly assigned standby and warm-up tasks await that any member with the same processId owning that task (as active or standby) confirms revocation of the task by removing it from the corresponding set in a heartbeat response. That is, a task that is not an active task or a standby task on any other member with the same processId can be assigned immediately. Otherwise, it is assigned as soon as the blocking task is revoked. Target assignments that assign a task to two members with the same processId are invalid.

Streams Group States

The possible states of the streams group are EMPTY, ASSIGNING, RECONCILING , STABLE, DEAD as for consumer groups.

Global & Dynamic Group Configuration

We make core assignment options configurable centrally on the broker, without relying on each clients configuration. This allows tuning a streams group without redeploying the streams application. The three core assignment options will be introduced on the broker-side: acceptable.recovery.lag, num.warmup.replicas and num.standby.replicas. They can be configured both globally on the broker, and dynamically for specific streams groups through the IncrementalAlterConfigs and DescribeConfigs RPCs.

Online Migration from a Classic Consumer Group to a Streams Group

As in KIP-848, upgrading from a classic consumer group currently used in Kafka Streams to the new Streams group is possible by rolling the Streams clients assuming the Streams protocol is enabled on the brokers. When the first consumer of a Streams client using the new Streams rebalance protocol joins the group, the group is converted from a classic group to a streams group. When the last consumer of an Streams client using the new Streams rebalance protocol leaves the group, the group is converted back to a classic group. Note that the group epoch starts at the current group generation. During the migration, all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgraded consumers are translated to the new Streams protocol.

Let's recapitulate how the classic rebalance protocol works in Streams. First, the consumers in a Streams client join or re-join the group with the JoinGroup API. The JoinGroup request contains the subscriped topics, the owned partitions, the generation ID, the Streams-specific subscription info, and some other fields. The Streams-specific subscription info contains the process ID of the Streams client, the owned active and standby tasks, the task offset sums, the user endpoints for Interactive Query and some more fields. When all the consumers of the Streams clients of the Streams application have joined, the group coordinator picks a leader for the group out of the consumers and sends back the JoinGroup response to all the members of the group (i.e. consumers that joined). The JoinGroup response contains the member ID, the generation ID, the member ID of the leader (to make the leader aware of leadership) as well as all the consumer and Streams-specific metadata about subscribed topics, owned partitions and tasks etc. The leader uses the data in the JoinGroup response for computing the assignment. Second, all the members in the Streams clients collect their assignment - computed by the leader - by using the SyncGroup API. The leader sends the computed assignment with the SyncGroup request to the group coordinator and the group coordinator distributes the assignment to the members through the SyncGroup response. In parallel, the members heartbeat with the Heartbeat API in order to maintain their session. The Heartbeat API is also used by the group coordinator to inform the members about an ongoing rebalance. All those interactions are synchronized on the generation of the group. It is important to note that the consumer does not make any assumption about the generation ID. It basically uses what it receives from the group coordinator. The classic rebalance protocol ussed in Streams supports two modes: Eager and Cooperative. In the eager mode, the consumer revokes all its partitions before rejoining the group during a rebalance. In the cooperative mode, the consumer does not revoke any partitions before rejoining the group. However, it revokes the partitions that it does not own anymore when it receives its new assignment and rejoins immediately if he had to revoke any partitions.

The Streams rebalance protocol relies on the StreamsGroupHeartbeat API to do all the above. Concretely, the API updates the group state, provides the active, standby, and warm-up tasks owned by the Streams client, gets back the assignment, and updates the session. We can remap those to the classic protocol as follow: the JoinGroup API updates the group state and provides the active and standby tasks  owned (warm-ups are standby tasks in the classic protocol), the SyncGroup API gets back the task assignment, and the Heartbeat API updates the session. The main difference here is that the JoinGroup and SyncGroup does not run continuously. The group coordinator has to trigger it when it is need by returning the REBALANCE_IN_PROGRESS error in the heartbeat response.

The implementation of the new Streams protocol in the group coordinator will handle the JoinGroup and SyncGroup API of the classic protocol. The group coordinator will ensure that a rebalance is triggered when the assignment of a Streams client on the classic protocol needs to be updated by returning REBALANCE_IN_PROGRESS error in the heartbeat response.

When the first consumer of a Streams client that uses the new Streams rebalance protocol joins a classic group, the classic group in the group coordinator will be transformed to a streams group. The generation ID becomes the group epoch. The consumer of the Streams client initializes the group with the topology metadata. A rebalance is triggered by the group coordinator for all consumers still on the classic protocol by sending the REBALANCE_IN_PROGRESS error in the heartbeat. The consumers on the classic protocol send their owned active and standby tasks in the JoinGroup request to the group coordinator. The consumers on the Streams protocol send their owned active, standby, and warm-up tasks (i.e., should be empty since they just joined) through the StreamsGroupHeartbeat request to the group coordinator. In contrast to the classic protocol the group coordinator does not pick a leader for computing the assignment but computes the assignment itself. That is the target assignment. The group epoch is increased. From the target assignment the current member assignment are computed depending on the owned tasks. If members do not need to revoke any tasks, their member epoch is increased to the group epoch. If the members need to revoke a task, their member epoch stays the same. The group coordinator sends the new member epoch alongside the current member assignment through the StreamsGroupHeartbeat response to the members on the Streams protocol. The members still on the classic protocol receive the member epoch through the JoinGroup response but they still need to wait for the SyncGroup response for their current assignment. Basically, the group coordinator translates the JoinGroup and SyncGroup API to the Streams protocol internally and communicates to members still on the classic protocol via JoinGroup, SyncGroup as well as Heartbeat and with the members on the Streams protocol via the StreamsGroupHeartbeat.

A more detailed description of this process can be found in KIP-848 in Section Supporting Online Consumer Group Upgrade.  

Example

  • classic group (generation=23)
    • A
    • B
  • assignment
    • A - active=[0_0, 0_2, 0_4], standby=[0_1, 0_3, 0_5]
    • B - active=[0_1, 0_3, 0_5], standby=[0_0, 0_2, 0_4]

C joins using the Streams protocol. The classic group is transformed to a streams group.

  • streams group (group epoch=24)
    • A (classic)
    • B (classic)
    • C
  • target assignment (epoch=24)
    • A - active=[0_0, 0_2, 0_3], standby=[0_1, 0_5], warm-up=[]
    • B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
    • C - active=[], standby=[0_0, 0_4], warm-up=[0_2, 0_5]
  • member assignment
    • A
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_0, 0_2, 0_4], standby=[0_1, 0_3, 0_5]
      • JoinGroupResponse: generation ID=24
      • SyncGroupResponse: active=[0_0, 0_2, 0_3], standby=[0_1, 0_5]
    • B
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_1, 0_3, 0_5], standby=[0_0, 0_2, 0_4]
      • JoinGroupResponse: generation ID=24
      • SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
    • C
      • StreamsGroupHeartbeat: epoch=24, active=[], standby=[0_0, 0_4], warm-up=[0_2, 0_5]

    C's warm-up task 0_2 is caught up. A is requested to revoke active task 0_2, thus A does not increase its generation ID.

  • streams group (group epoch=25)
    • A (classic)
    • B (classic)
    • C
  • target assignment (epoch=25)
    • A - active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
    • B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
    • C - active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]
  • member assignment
    • A
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_0, 0_2, 0_3], standby=[0_1, 0_5], warm-up=[]
      • JoinGroupResponse: generation ID=24
      • SyncGroupResponse: active=[0_0, 0_3], standby=[0_1, 0_5]
    • B
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
      • JoinGroupResponse: generation ID=25
      • SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
    • C
      • StreamsGroupHeartbeat: epoch=25, active=[], standby=[0_0, 0_4], warm-up=[0_5]

A follow-up rebalance is triggered so that A can report the revoked active task 0_2. Since A does not need to revoke tasks anymore the generation ID is increased.

  • streams group (group epoch=25)
    • A (classic)
    • B (classic)
    • C
  • target assignment (epoch=25)
    • A - active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
    • B - active=[0_1, 0_4, 0_5], standby=[0_2, 0_3], warm-up=[]
    • C - active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]
  • member assignment
    • A
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_0, 0_3], standby=[0_1, 0_5], warm-up=[]
      • JoinGroupResponse: generation ID=25
      • SyncGroupResponse: active=[0_0, 0_3], standby=[0_1, 0_5]
    • B
      • Receives REBALANCE_IN_PROGRESS error in heartbeat response
      • JoinGroupRequest: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
      • JoinGroupResponse: generation ID=25
      • SyncGroupResponse: active=[0_1, 0_4, 0_5], standby=[0_2, 0_3]
    • C
      • StreamsGroupHeartbeat: epoch=25, active=[0_2], standby=[0_0, 0_4], warm-up=[0_5]

If C leaves the group now, the group coordinator transforms back the group to a classic group and only use JoinGroup, SyncGroup, and Heartbeat to communicate with the members. The records of the Streams protocol are deleted from the __consumer_offsets topic.

Public Interfaces

This section lists the changes impacting the public interfaces.

KRPC

New Errors

The conditions in which these errors are returned are stated further down.

  • STREAMS_INVALID_TOPOLOGY - The supplied topology is invalid. Returned if the client sends a topology that does not fulfill the expected invariants, see below in the sections "Request Validation".

  • STREAMS_MISSING_SOURCE_TOPICS - There are source topics missing for a topology that is supposed to be initialized. Also returned if the source topic regular expression matched no topics.

  • STREAMS_INCONSISTENT_INTERNAL_TOPICS - There are internal topics present on the broker that are not consistent with the internal topic requirements of the provided topology.

StreamsGroupHeartbeat

The StreamsGroupHeartbeat API is the new core API used by streams application to form a group. The API allows members to initialize a topology, advertise their state, and their owned tasks. The group coordinator uses it to assign/revoke tasks to/from members. This API is also used as a liveness check.

Request Schema

The member must set all the (top level) fields with the exception of RackId and InstanceId when it joins for the first time or when an error occurs (e.g. request timed out). Otherwise, it is expected to only fill in the fields which have changed since the last heartbeat.


{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "StreamsGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." },
    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided or if it didn't change since the last heartbeat; the instance ID for static membership otherwise." },
    { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
      "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },

	{ "name":  "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": null,
      "about": "The topology data of the streams application. 
				Used to initialize the topology of the group and to check if the topology corresponds to the topology initialized for the group.
				Only sent when memberEpoch = 0, must be non-empty. Null otherwise.",
      "fields": [
        { "name": "TopologyId", "type": "string", "versions": "0+",
      	  "about": "The ID of the topology. Used to check if the topology corresponds to the topology initialized on the brokers." },
		{ "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+",
      	  "about": "The sub-topologies of the streams application.",
	      "fields": [
	        { "name": "SubtopologyId", "type": "string", "versions": "0+",
	          "about": "String to uniquely identify the sub-topology. Deterministically generated from the topology" },
	        { "name": "SourceTopics", "type": "[]string", "versions": "0+",
	          "about": "The topics the topology reads from." },
	        { "name": "SourceTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
	           "about": "The regular expressions identifying topics the sub-topology reads from. null if not provided." },
	        { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+",
	          "about": "The set of state changelog topics associated with this sub-topology. Created automatically." },
	        { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+",
	          "about": "The repartition topics the sub-topology writes to." }, 
	        { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+",
	          "about": "The set of source topics that are internally created repartition topics. Created automatically." }
	      ]
		}
	  ]	
    }	
 
    { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Currently owned active tasks for this client. Null if unchanged since last heartbeat." },
    { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Currently owned standby tasks for this client. Null if unchanged since last heartbeat." },
    { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Currently owned warm-up tasks for this client. Null if unchanged since last heartbeat." },

    { "name": "ProcessId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Identity of the streams instance that may have multiple consumers. Null if unchanged since last heartbeat." },
    { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "User-defined endpoint for Interactive Queries. Null if unchanged since last heartbeat." },
    { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Used for rack-aware assignment algorithm. Null if unchanged since last heartbeat." },

    { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Cumulative changelog offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." },
    { "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Cumulative changelog end-offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." },
    { "name": "ShutdownApplication", "type": "bool", "versions": "0+", "default": false,
      "about": "Whether all Streams clients in the group should shut down." }
  ],

  "commonStructs": [
    { "name": "KeyValue", "versions": "0+", "fields": [
      { "name": "Key", "type": "string", "versions": "0+",
        "about": "key of the config" },
      { "name": "Value", "type": "string", "versions": "0+",
        "about": "value of the config" }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." },
      { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "Topic-level configurations as key-value pairs."
      }
    ]},
    { "name": "Endpoint", "versions": "0+", "fields": [
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "host of the endpoint" },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "port of the endpoint" }
    ]},
    { "name": "TaskOffset", "versions": "0+", "fields": [
      { "name": "Subtopology", "type": "string", "versions": "0+",
        "about": "The sub-topology identifier." },
      { "name": "Partition", "type": "int32", "versions": "0+",
        "about": "The partition." },
      { "name": "Offset", "type": "int64", "versions": "0+",
        "about": "The offset." }
    ]},
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "Subtopology", "type": "string", "versions": "0+",
        "about": "The sub-topology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]}
  ]
}

Required ACL

  • READ on group

  • CREATE on cluster resource, or CREATE on all topics in StateChangelogTopics and RepartitionSourceTopics

    • Note that this ACLs are only required if the group coordinator should create the internal topics implicitly. If the internal topics are created explicitly, this ACL is not needed for the Streams group heartbeat.
  • DESCRIBE_CONFIGS on all topics included in the message

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • Either MemberId is non-empty or MemberEpoch is 0.

  • MemberEpoch must be >= -2.

  • InstanceId, if not null, must be non-empty.

  • RebalanceTimeoutMs must be larger than zero in the first heartbeat request.

  • ActiveTasks, StandbyTasks and WarmupTasks have to be disjoint sets

  • Each element of ActiveTasks, StandbyTasks and WarmupTasks has to be a valid task ID in the topology initialized for the group ID.

  • ActiveTasks, StandbyTasks and WarmupTasks have to be non-null and empty when joining (member epoch is 0)

STREAMS_INVALID_TOPOLOGY is returned when the request contains a new topology and should the topology not obey the following invariants:

  • A StateChangelogTopic topics must not have a defined partition number.

  • A RepartitionSourceTopic cannot be in SourceTopics or StateChangelogTopics of any subtopology.

  • A StateChangelogTopic cannot be in SourceTopics or RepartitionSinkTopic or RepartitionSourceTopics of any subtopology.

  • A RepartitionSourceTopic of one subtopology must be a RepartitionSinkTopic of at least one other subtopology.

STREAMS_MISSING_SOURCE_TOPICS is returned if there are source topics missing during the initialization of the topology. Also returned if the source topic regular expression matched no topics.

STREAMS_INCONSISTENT_INTERNAL_TOPICS is returned if there are internal topics present on the broker that are not consistent with the internal topic requirements of the provided topology.

Request Handling

When the group coordinator handles a StreamsGroupHeartbeat request:

  1. Performs request validation.
  2. If the member joins the group (i.e. member epoch is 0):

    1. If the group is initialized:
      1. Looks up the group
    2. If the group is not initialized:
      1. Creates group and initializes the topology by creating all required internal topics.
      2. Writes the topology, keyed with the GroupId to the consumer offset topic. Existing records will be overwritten.
    3. GROUP_ID_NOT_FOUND is returned if the group ID is associated with a group type that is not streams or classic (the latter will be allowed for migration).
    4. Creates the member.
  3. If the member is already part of the group (i.e. member epoch is greater than 0):
    1. Looks up the group.
    2. GROUP_ID_NOT_FOUND is returned if the group ID does not exist anymore.
    3. If the member does not exist, returns UNKNOWN_MEMBER_ID
    4. Checks whether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.
      • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that it knows about and its request will be rejected with a FENCED_MEMBER_EPOCH. This will be handled as in KIP-848.
  4. Updates information of the member if needed. The group epoch is incremented if there is any change.
  5. Reconcile the member assignments as explained earlier in this document.

Reponse Schema

The group coordinator will only set the ActiveTasks, StandbyTasks and WarmupTasks fields until the member acknowledges that it has converged to the desired assignment. This is done to ensure that the members converge to the target assignment.


{
  "apiKey": TBD,
  "type": "response",
  "name": "StreamsGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - FENCED_MEMBER_EPOCH (version 0+)
  // - UNRELEASED_INSTANCE_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+) 
  // - CLUSTER_AUTHORIZATION_FAILED (version 0+)
  // - STREAMS_INVALID_TOPOLOGY (version 0+)
  // - STREAMS_MISSING_SOURCE_TOPICS (version 0+)
  // - STREAMS_INCONSISTENT_INTERNAL_TOPICS (version 0+) 
  "fields": [
    // Same as consumer group heart beat
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." },
    { "name": "AcceptableRecoveryLag", "type": "int32", "versions": "0+",
      "about": "The maximal lag a warm-up task can have to be considered caught-up." },
    { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
      "about": "The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed." },

    // Topology updating
    { "name": "GroupTopologyId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": null,
      "about": "The current ID of the topology for the group. Null if unchanged since last heartbeat." },

    { "name": "Status", "type":  "[]Status", "versions":  "0+",  "nullableVersions": "0+", "default": "null", 
      "about": "Indicate zero or more status for the group.  Null if unchanged since last heartbeat." },

    // The streams app knows which partitions to fetch from given this information
    { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Assigned active tasks for this client. Null if unchanged since last heartbeat." },
    { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Assigned standby tasks for this client. Null if unchanged since last heartbeat." },
    { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." },

    // IQ-related information
    { "name": "PartitionsByUserEndpoint", "type": "[]EndpointToPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." ,
      "fields": [
        { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
          "about": "User-defined endpoint to connect to the node" },
        { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
          "about": "All partitions available on the node" }
      ]
    }
  ],
  "commonStructs": [
    { "name": "Status", "versions": "0+", "fields": [
      // Possible status codes
      //  0 - INCONSISTENT_TOPOLOGY   - The topology ID supplied is inconsistent with the topology for this streams group.
      //  1 - MISSING_SOURCE_TOPICS   - One or more source topics do not exist. Missing topics are indicated in the StatusDetail.
      //  2 - MISSING_INTERNAL_TOPICS - One or more internal topics do not exist. Missing topics are indicated in the StatusDetail.
      //  3 - SHUTDOWN_APPLICATION    - A client requested the shutdown of the whole application.
      { "name": "StatusCode", "type": "int8", "versions": "0+",
        "about": "A code to indicate that a particular status is active for the group membership" },
      { "name": "StatusDetail", "type": "string", "versions": "0+",
        "about": "A string representation of the status." }
    ]},
    { "name": "TopicPartition", "versions": "0+", "fields": [
      { "name": "Topic", "type": "string", "versions": "0+",
        "about": "topic name" },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "partitions" }
    ]},
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "Subtopology", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]}
    { "name": "Endpoint", "versions": "0+", "fields": [
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "host of the endpoint" },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "port of the endpoint" }
    ]}
  ]
}

StreamsGroupDescribe API

Request Schema


{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "StreamsGroupDescribeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The ids of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
      "about": "Whether to include authorized operations." }
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey the schema definition.

INVALID_GROUP_ID is returned should the group ID be empty.

Request Handling

When the group coordinator handle a StreamsGroupDescribeRequest request:

  • Checks whether the group ids exists. If it does not, GROUP_ID_NOT_FOUND is returned. Similarly, it is returned if the group ID is associated with a group type that is not streams .

  • Looks up the groups and returns the response.

Response Schema


{
  "apiKey": TBD,
  "type": "response",
  "name": "StreamsGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "TopologyId", "type": "string", "versions": "0+",
          "about": "The ID of the currently initialized topology for this group." }, 
        { "name": "Topology", "type": "[]Subtopology", "versions": "0+",
          "about": "The sub-topologies of the streams application.",
          "fields": [
            { "name": "Subtopology", "type": "string", "versions": "0+",
              "about": "String to uniquely identify the subtopology." },
            { "name": "SourceTopics", "type": "[]string", "versions": "0+",
              "about": "The topics the topology reads from." },
            { "name": "SourceTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
               "about": "The regular expressions identifying topics the topology reads from. null if not provided." },
            { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+",
              "about": "The topics the topology writes to." },
            { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+",
              "about": "The set of state changelog topics associated with this subtopology. Created automatically." },
            { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+",
              "about": "The set of source topics that are internally created repartition topics. Created automatically." }
          ]
        },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The member epoch." },
            { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member instance ID for static membership." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The rack ID." },

            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },

            { "name": "TopologyId", "type": "string", "versions": "0+",
              "about": "The ID of the topology on the client." },

            { "name": "ProcessId", "type": "string", "versions": "0+",
              "about": "Identity of the streams instance that may have multiple clients. " },
            { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
              "about": "Used for rack-aware assignment algorithm." },
            { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+",
              "about": "Cumulative changelog offsets for tasks." },
            { "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+",
              "about": "Cumulative changelog end offsets for tasks." },

            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." },
            { "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
              "about": "The target assignment." }
          ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TaskOffset", "versions": "0+", "fields": [
      { "name": "Subtopology", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partition", "type": "int32", "versions": "0+",
        "about": "The partition." },
      { "name": "Offset", "type": "int64", "versions": "0+",
        "about": "The offset." }
    ]},
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+",
        "about": "Active tasks for this client." },
      { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+",
        "about": "Standby tasks for this client." },
      { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+",
        "about": "Warm-up tasks for this client. " }
    ]},
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "Subtopology", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]},
    { "name": "KeyValue", "versions": "0+", "fields": [
      { "name": "Key", "type": "string", "versions": "0+",
        "about": "key of the config" },
      { "name": "Value", "type": "string", "versions": "0+",
        "about": "value of the config" }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." },
      { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "Topic-level configurations as key-value pairs."
      }
    ]}
  ]
}

Response Handling

Nothing particular.

Records

This section describes the new record types required for the new protocol. Similar to KIP-848, we define new record types that persist all data related to groups, their members and their assignments in the __consumer_offsets compacted topic. Each record has a dedicated key type that uses the version to differentiate the records from the records introduced in KIP-848, and those used to persist the committed offsets.

Compared to consumer groups, we introduce one extra record type, that stores information about the topology of the group.

The size of the persisted member metadata in the __consumer_offset topic will have to respect the max message size. But those records are persisted separately for each member, so it should be less of a concern. The same problem affects plain consumers in KIP-848 for very large consumer groups. Eventually, we may implement a way to split the records into multiple messages, and there are currently plans to add transaction support to the group coordinator to make sure the records can be updated atomically. For the first version of the protocol, we will limit those records by the max message size.

Group Metadata

The assignment-related metadata for a group with N members is stored with N+1 records. One metadata record per member and one metadata record for the group.

  • The member metadata record, keyed by the group ID and the member ID, stores all relevant metadata that is sent by a member to the coordinator in a heartbeat request. A notable exception are task changelog offsets and task changelog end offsets, which are not persisted, as they are constantly changing. 
  • The group metadata, keyed by the group ID, contains the group epoch.

The records evolve in the following way:

  • When the first member joins the group, the group metadata and the member metadata records for the member are first created.
  • Every time a new member joins the group, another member metadata record is added
  • Whenever a member updates its member metadata through the heartbeat, a new version of the member metadata record is appended.
  • When a member leaves the group, a tombstone is written for the member metadata key.

When the group lost all members (or never had any)  and offsets.retention.minutes expires, the group metadata tombstone is written. This can only happen in group state EMPTY.

StreamsGroupMemberMetadataKey


{
  "type": "data",
  "name": "StreamsGroupMemberMetadataKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": TBD,
      "about": "The member id." }
  ]
}

StreamsGroupMemberMetadataValue


{
  "type": "data",
  "name": "StreamsGroupMemberMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InstanceId", "versions": "0+", "nullableVersions": "0+", "type": "string",
      "about": "The (optional) instance ID for static membership." },
    { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string",
      "about": "The (optional) rack id." },
    { "name": "ClientId", "versions": "0+", "type": "string",
      "about": "The client id." },
    { "name": "ClientHost", "versions": "0+", "type": "string",
      "about": "The client host." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "The rebalance timeout." },

    { "name": "TopologyId", "type": "string", "versions": "0+",
      "about": "The ID of the topology. Must be non-empty." },

    { "name": "ProcessId", "type": "string", "versions": "0+",
      "about": "Identity of the streams instance that may have multiple consumers." },
    { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
      "about": "User-defined endpoint for running interactive queries on this instance." },
    { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
      "about": "Used for rack-aware assignment algorithm." }
  ],
  "commonStructs": [
    { "name": "Endpoint", "versions": "0+", "fields": [
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "host of the endpoint" },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "port of the endpoint" }
    ]},
    { "name": "KeyValue", "versions": "0+",
      "fields": [
        { "name": "Key", "type": "string", "versions": "0+",
          "about": "key of the config" },
        { "name": "Value", "type": "string", "versions": "0+",
          "about": "value of the config" }
      ]
    }
  ]
}

StreamsGroupMetadataKey


{
  "type": "data",
  "name": "StreamsGroupMetadataKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." }
  ]
}

StreamsGroupMetadataValue


{
  "type": "data",
  "name": "StreamsGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "versions": "0+", "type": "int32",
      "about": "The group epoch." }
  ]
}

Group Topology Metadata

The topology for the group, and the metadata for input topic partitions consumed in the topology, are persisted in two records.

  • The partition metadata record, keyed by the group ID, contains metadata of all topic partitions consumed by the current topology
  • The topology record, keyed by the group ID, stores the topology metadata sent in the heartbeat request when members join.

The records are evolved in the following way

  • When the group coordinator receives valid topology metadata in a heartbeat request, both records are created, using the topology sent in the request and the topic metadata currently known to the group coordinator.
  • When the group coordinator detects that the partition metadata for one of the input partitions used in the topology changed (by comparing the previous partition metadata record to the currently known topic metadata in the broker) during the handling of a group heartbeat, the stream group partition metadata record is updated, and a new assignment is computed.

When the group lost all members and offsets.retention.minutes expires, tombstones is written to remove the records.

StreamsGroupPartitionMetadataKey


{
  "type": "data",
  "name": "StreamsGroupPartitionMetadataKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." }
  ]
}

StreamsGroupPartitionMetadataValue


{
  "type": "data",
  "name": "StreamsGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "versions": "0+", "type": "[]TopicMetadata",
      "about": "The list of topic metadata.", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid",
        "about": "The topic id." },
      { "name": "TopicName", "versions": "0+", "type": "string",
        "about": "The topic name." },
      { "name": "NumPartitions", "versions": "0+", "type": "int32",
        "about": "The number of partitions of the topic." },
      { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
        "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
          { "name": "Partition", "versions": "0+", "type": "int32",
            "about": "The partition number." },
          { "name": "Racks", "versions": "0+", "type": "[]string",
            "about": "The set of racks that the partition is mapped to." }
      ]}
    ]}
  ]
}

StreamsGroupTopologyKey


{
  "type": "data",
  "name": "StreamsGroupTopologyKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." }
  ]
}

StreamsGroupTopologyValue


{
  "type": "data",
  "name": "StreamsGroupTopologyValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopologyId", "type": "string", "versions": "0+",
      "about": "The ID of the topology. Must be non-empty. " },
    { "name":  "Topology", "type": "[]Subtopology", "versions": "0+",
      "about": "The sub-topologies of the streams application.",
      "fields": [
        { "name": "Subtopology", "type": "string", "versions": "0+",
          "about": "String to uniquely identify the subtopology." },
        { "name": "SourceTopics", "type": "[]string", "versions": "0+",
          "about": "The topics the topology reads from." },
        { "name": "SourceTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The regular expressions identifying topics the topology reads from. null if not provided." },
        { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+",
          "about": "The set of state changelog topics associated with this subtopology. " },
	    { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+",
          "about": "The repartition topics the sub-topology writes to." }, 
        { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+",
          "about": "The set of source topics that are internally created repartition topics. " }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicConfig", "versions": "0+", "fields": [
      { "name": "key", "type": "string", "versions": "0+",
        "about": "The key of the topic-level configuration." },
      { "name": "value", "type": "string", "versions": "0+",
        "about": "The value of the topic-level configuration," }
    ]
    },
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." },
      { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "Topic-level configurations as key-value pairs."
      }
    ]}
  ]
}

Current Member Assignment

For each member, there is a record to store the current member assignment. The record is first created when a member joins (with an empty assignment). Every time an assignment is computed for the member during reconciliation, it is compared to the previous assignment for the member. If the assignment changed, a new version of the current member assignment record is appended to the consumer offset topic. When the member leaves the group, a tombstone is written to remove the record.

StreamsGroupCurrentMemberAssignmentKey


{
  "type": "data",
  "name": "StreamsGroupCurrentMemberAssignmentKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": TBD,
      "about": "The member id." }
  ]
}

StreamsGroupCurrentMemberAssignmentValue


{
  "type": "data",
  "name": "StreamsGroupCurrentMemberAssignmentValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "MemberEpoch", "versions": "0+", "type": "int32",
      "about": "The current member epoch that is expected from the member in the heartbeat request." },
    { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
      "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." },
    { "name": "State", "versions": "0+", "type": "int8",
      "about": "The member state. See StreamsGroupMember.MemberState for the possible values." },
    { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned active tasks for this streams client." },
    { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned standby tasks for this streams client." },
    { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned warm-up tasks for this streams client." },
    { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": "[]TaskIds",
      "about": "The active tasks that must be revoked by this member." }
  ],
  "commonStructs": [
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "Subtopology", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]}
  ]
}

Streams Group Target Assignment

The target assignment is stored in N + 1 records, where N is the number of members in the group. When a new target assignment is computed, the group coordinator will compare it with the current target assignment and only write the difference between the two assignments to the __consumer_offsets  partition. That also means if a member left the group, a corresponding tombstone record is written. The assignment must be atomic, so the group coordinator will ensure that all the records are written in a single batch, limiting the size to the maximum batch size (default 1MB), as in consumer groups. 

StreamsGroupTargetAssignmentMemberKey


{
  "type": "data",
  "name": "StreamsGroupTargetAssignmentMemberKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": TBD,
      "about": "The member id." }
  ]
}

StreamsGroupTargetAssignmentMemberValue


{
  "type": "data",
  "name": "StreamsGroupTargetAssignmentMemberValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned active tasks for this streams client." },
    { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned standby tasks for this streams client." },
    { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds",
      "about": "Currently assigned warm-up tasks for this streams client." }
  ],
  "commonStructs": [
    { "name": "TaskIds", "versions": "0+", "fields": [
      { "name": "Subtopology", "type": "string", "versions": "0+",
        "about": "The subtopology identifier." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of the input topics processed by this member." }
    ]}
  ]
}

StreamsGroupTargetAssignmentMetadataKey


{
  "type": "data",
  "name": "StreamsGroupTargetAssignmentMetadataKey",
  "validVersions": TBD,
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": TBD,
      "about": "The group id." }
  ]
}

StreamsGroupTargetAssignmentMetadataValue


{
  "type": "data",
  "name": "StreamsGroupTargetAssignmentMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
      "about": "The assignment epoch." }
  ]
}

Broker Metrics

The existing group metrics are extended to differentiate between streams groups and consumer groups and account for streams group states.

  • Number of groups based on type of the protocol, where the list of protocols is extended by the protocol=streams variation.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|classic|streams}

  • Number of streams groups based on state

kafka.server:type=group-coordinator-metrics,name=streams-group-count,state={empty|initializing|assigning|reconciling|stable|dead}

  • Streams group rebalances sensor

kafka.server:type=group-coordinator-metrics,name=streams-group-rebalance-rate

kafka.server:type=group-coordinator-metrics,name=streams-group-rebalance-count

Broker Configurations

The new group protocol would be guarded by enabling a new version of the group.version feature flag on the broker side.

Updated properties

Configuration

Description

Values

group.coordinator.rebalance.protocols 

The list of enabled rebalance protocols.

"streams"  is included in the list of protocols to enable streams groups.

This will be added to the default value of this configuration property once this feature is complete.

New properties

Name

Type

Default

Doc

group.streams.session.timeout.ms

int

45s

The timeout to detect client failures when using the streams group protocol.

group.streams.min.session.timeout.ms

int

45s

The minimum session timeout.

group.streams.max.session.timeout.ms

int

60s

The maximum session timeout.

group.streams.heartbeat.interval.ms

int

5s

The heartbeat interval given to the members.

group.streams.min.heartbeat.interval.ms

int

5s

The minimum heartbeat interval.

group.streams.max.heartbeat.interval.ms

int

15s

The maximum heartbeat interval.

group.streams.max.size

int

MaxValue

The maximum number of streams clients that a single streams group can accommodate.

group.streams.acceptable.recovery.lag

long

10’000

The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment.

group.streams.num.warmup.replicas

int

2

The maximum number of warmup replicas.

group.streams.max.warmup.replicas

int

20

Maximum for dynamic configurations of the warmup replica configuration

group.streams.num.standby.replicas

int

0

The number of standby replicas for each task.

group.streams.max.standby.replicas

int

2

Maximum for dynamic configurations of the standby replica configuration

group.streams.task.offset.interval.ms

int

60s

The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed.

group.streams.min.task.offset.interval.ms

int

15s

Minimum for dynamic configurations of the task offset interval. Used to restrict users of the cluster from sending the task changelog offsets too often.

group.streams.assignor.name

string

null

The name of the task assignor used for all streams groups. Can be sticky or highly_available in AK.

Group Configurations

We will add new configurations for the resource type GROUP in DescribeConfigs and IncrementalAlterConfigs to override the default broker configurations dynamically for specific groups.

Name

Type

Default

Doc

group.streams.session.timeout.ms

int

45s

The timeout to detect client failures when using the streams group protocol.

group.streams.heartbeat.interval.ms

int

5s

The heartbeat interval given to the members.

group.streams.acceptable.recovery.lag

long

10’000

The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment.

group.streams.num.warmup.replicas

int

2

The maximum number of warmup replicas.

group.streams.num.standby.replicas

int

0

The number of standby replicas for each task.

group.streams.task.offset.interval.ms

int

60s

The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed.

group.streams.assignor.name

string

null

The name of the task assignor used for this streams groups. Can be sticky or highly_available in AK.

Streams API

In a future major version, when the classic group protocol will be deprecated, the interfaces introduced in KIP-924 would be deprecated, unless an integration with this KIP is proposed in a follow-up KIP.

Streams Configurations

New configurations

Name

Type

Default

Doc

group.protocol

enum

classic

A flag which indicates if the new protocol should be used or not. It could be: classic or streams

Deprecations

In a future major version, when the classic group protocol will be deprecated, the following configuration options will be deprecated:

acceptable.recovery.lag

max.warmup.replicas

num.standby.replicas

probing.rebalance.interval.ms

rack.aware.assignment.tags

rack.aware.assignment.strategy

rack.aware.assignment.traffic_cost

rack.aware.assignment.non_overlap_cost

task.assignor.class

Note that both the rack-aware task assignor and customizable client-side task assignment may be introduced into the new protocol in follow-up KIPs (see Future Work section). In this case, the latter 5 configuration options would not be deprecated in the future major version.

Admin API

Add the following methods to the org.apache.kafka.client.admin.AdminClient  interface, mostly backed by the same implementations as the consumer group API.

Method signature

Description

RPC used

AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets)Alter offset information for a streams group.OffsetCommit
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterStreamsGroupOffsetsOptions options) Alter offset information for a streams group.OffsetCommit
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<String> topics)Delete offset information for a set of topics in a streams group.OffsetDelete
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<String> topics, DeleteStreamsGroupOffsetsOptions options) Delete offset information for a set of topics in a streams group.OffsetDelete
DeleteStreamsGroupResult deleteStreamsGroups(Collection<String> groupIds)Delete streams groups from the cluster.DeleteGroups
DeleteStreamsGroupResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupOptions options) Delete streams groups from the cluster.DeleteGroups
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds)Describe some streams groups in the cluster.StreamsGroupDescribe
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions options) Describe some streams groups in the cluster.StreamsGroupDescribe
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs)List the streams group offsets available in the cluster for the specified Streams groups.OffsetFetch
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options) List the streams group offsets available in the cluster for the specified Streams groups.OffsetFetch
ListStreamsGroupsResult listStreamsGroups()List the streams groups available in the cluster.ListGroups
ListStreamsGroupsResult listStreamsGroups(ListStreamsGroupsOptions options) List the streams groups available in the cluster.ListGroups

These correspond to existing APIs for consumer groups and some of the code will be shared. The main reason for duplicating them is the naming, as the current API uses "consumerGroup" in the name, despite the RPCs just using "group". There are some differences:

  • The describeStreamsGroups uses the DescribeStreamsGroup RPC and contains other information than consumer groups.
  • A streams group has an extra state -  INITIALIZING, and no legacy states from the classic protocol.
  • removeMembersFromConsumerGroup will not have a corresponding API in this first version, as it is using the LeaveGroup RPC for classic consumer groups, which is not available for KIP-848-style groups.

Admin

   /**
    * Alters offsets for the specified group. In order to succeed, the group must be empty.
    *
    * <p>This is a convenience method for {@link #alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} with default options.
    * See the overload for more details.
    *
    * @param groupId The group for which to alter offsets.
    * @param offsets A map of offsets by partition.
    * @return The AlterStreamsGroupOffsetsResult.
    */
   default AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
       return alterStreamsGroupOffsets(groupId, offsets, new AlterStreamsGroupOffsetsOptions());
   }
 
   /**
    * Alters offsets for the specified group. In order to succeed, the group must be empty.
    *
    * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
    *
    * @param groupId The group for which to alter offsets.
    * @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
    * @param options The options to use when altering the offsets.
    * @return The AlterStreamsGroupOffsetsResult.
    */
   AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterStreamsGroupOffsetsOptions options);
 
   /**
    * Delete offsets for a set of topics in a Streams group with the default options.
    *
    * <p>This is a convenience method for {@link #deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} with default options.
    * See the overload for more details.
    *
    * @param groupId The group for which to delete offsets.
    * @param topics The topics.
    * @return The DeleteStreamsGroupOffsetsResult.
    */
   default DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<String> topics) {
       return deleteStreamsGroupOffsets(groupId, topics, new DeleteStreamsGroupOffsetsOptions());
   }
 
   /**
    * Delete offsets for a set of topics in a Streams group.
    *
    * @param groupId The group for which to delete offsets.
    * @param topics The topics.
    * @param options The options to use when deleting offsets in a Streams group.
    * @return The DeleteStreamsGroupOffsetsResult.
    */
   DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
       Set<String> topics,
       DeleteStreamsGroupOffsetsOptions options);
 
   /**
    * Delete Streams groups from the cluster with the default options.
    *
    * <p>This is a convenience method for {@link #deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} with default options.
    * See the overload for more details.
    *
    * @param groupIds The IDs of the groups to delete.
    * @return The DeleteStreamsGroupsResult.
    */
   default DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds) {
       return deleteStreamsGroups(groupIds, new DeleteStreamsGroupsOptions());
   }
 
   /**
    * Delete Streams groups from the cluster.
    *
    * @param groupIds The IDs of the groups to delete.
    * @param options The options to use when deleting a Streams group.
    * @return The DeleteStreamsGroupsResult.
    */
   DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options);
 
   /**
    * Describe some Streams groups in the cluster, with the default options.
    *
    * <p>This is a convenience method for {@link #describeStreamsGroups(Collection, DescribeStreamsGroupsOptions)}
    * with default options. See the overload for more details.
    *
    * @param groupIds The IDs of the groups to describe.
    * @return The DescribeStreamsGroupsResult.
    */
   default DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds) {
       return describeStreamsGroups(groupIds, new DescribeStreamsGroupsOptions());
   }
 
   /**
    * Describe some Streams groups in the cluster.
    *
    * @param groupIds The IDs of the groups to describe.
    * @param options  The options to use when describing the groups.
    * @return The DescribeStreamsGroupsResult.
    */
   DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds,
                                                     DescribeStreamsGroupsOptions options);
 
   /**
    * List the Streams group offsets available in the cluster for the specified Streams groups with the default options.
    *
    * <p>This is a convenience method for {@link #listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}
    * to list offsets of all partitions for the specified Streams groups with default options.
    *
    * @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
    * @return The ListStreamsGroupOffsetsResult
    */
   default ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) {
       return listStreamsGroupOffsets(groupSpecs, new ListStreamsGroupOffsetsOptions());
   }
 
   /**
    * List the Streams group offsets available in the cluster for the specified Streams groups.
    *
    * @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
    * @param options The options to use when listing the Streams group offsets.
    * @return The ListStreamsGroupOffsetsResult
    */
   ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options);
 
   /**
    * List the Streams groups available in the cluster with the default options.
    *
    * <p>This is a convenience method for {@link #listStreamsGroups(ListStreamsGroupsOptions)} with default options.
    * See the overload for more details.
    *
    * @return The ListStreamsGroupsResult.
    */
   default ListStreamsGroupsResult listStreamsGroups() {
       return listStreamsGroups(new ListStreamsGroupsOptions());
   }
 
   /**
    * List the Streams groups available in the cluster.
    *
    * @param options The options to use when listing the Streams groups.
    * @return The ListStreamsGroupsResult.
    */
   ListStreamsGroupsResult listStreamsGroups(ListStreamsGroupsOptions options);


AlterStreamsGroupOffsetResult

package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }
 
    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}


AlterStreamsGroupOffsetsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for the {@link Admin#alterStreamsGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsOptions extends AbstractOptions<AlterStreamsGroupOffsetsOptions> {
}

DeleteStreamsGroupOffsetsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#deleteStreamsGroupOffsets(String, Set<String>, DeleteStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }
 
    /**
     * Return a future which can be used to check the result for a given topic.
     */
    public KafkaFuture<Void> partitionResult(final String topic) {
    }
}

DeleteStreamsGroupOffsetsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for the {@link Admin#deleteStreamsGroupOffsets(String, Set<String>, DeleteStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsOptions extends AbstractOptions<DeleteStreamsGroupOffsetsOptions> {
}

DeleteStreamsGroupsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteStreamsGroupsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }
 
    /**
     * Return a map from group id to futures which can be used to check the status of individual deletions.
     */
    public Map<String, KafkaFuture<Void>> deletedGroups() {
    }
}

DeleteStreamsGroupsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteStreamsGroupsOptions extends AbstractOptions<DeleteStreamsGroupsOptions> {
}

DescribeStreamsGroupsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#describeStreamsGroups(Collection<String>, DescribeStreamsGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeStreamsGroupsResult {
    /**
     * Return a future which yields all StreamsGroupDescription objects, if all the describes succeed.
     */
    public KafkaFuture<Map<String, StreamsGroupDescription>> all() {
    }
 
    /**
     * Return a map from group id to futures which yield group descriptions.
     */
    public Map<String, KafkaFuture<StreamsGroupDescription>> describedGroups() {
    }
}

StreamsGroupDescription

/**
 * A detailed description of a single streams group in the cluster.
 */
public class StreamsGroupDescription {

    public StreamsGroupDescription(
        final String groupId,
        final String topologyId,
        final Collection<StreamsGroupSubtopologyDescription> subtopologies,
        final Collection<StreamsGroupMemberDescription> members,
        final StreamsGroupState state,
        final Node coordinator,
        final Set<AclOperation> authorizedOperations);

    /**
     * The id of the streams group.
     */
    public String groupId();

    /**
     * The id of the currently used topology, or null if uninitialized.
     */
    public String topologyId();

    /**
     * A list of the members of the streams group.
     */
    public Collection<StreamsGroupMemberDescription> members();

    /**
     * A list of the subtopologies in the streams group.
     */
    public Collection<StreamsGroupSubtopologyDescription> subtopologies();

    /**
     * The streams group state, or UNKNOWN if the state is too new for us to parse.
     */
    public StreamsGroupState state();

    /**
     * The streams group coordinator, or null if the coordinator is not known.
     */
    public Node coordinator();

    /**
     * authorizedOperations for this group, or null if that information is not known.
     */
    public Set<AclOperation> authorizedOperations();
}

StreamsGroupMemberDescription

/**
 * A detailed description of a single member in the group.
 */
public class StreamsGroupMemberDescription {

    public StreamsGroupMemberDescription(
        final String memberId, 
        final Optional<String> instanceId,
        final String clientId,
        final String clientHost, 
        final String topologyId, 
        final String processId,
        final Map<String, String> clientTags,
        final List<TaskOffset> taskOffsets, 
        final List<TaskOffset> taskEndOffsets,
        final StreamsGroupMemberAssignment assignment,
        final Optional<StreamsGroupMemberAssignment> targetAssignment);

    /**
     * The id of the group member.
     */
    public String memberId();

    /**
     * The id of the instance, used for static membership, if available.
     */
    public Optional<String> instanceId();

    /**
     * The client id of the group member.
     */
    public String clientId();

    /**
     * The host of the group member.
     */
    public String clientHost();

    /**
     * The id of the topology present on the client.
     */
    public String topologyId();

    /**
     * Identity of the streams instance that may have multiple clients.
     */
    public String processId();

    /**
     * Used for rack-aware assignment algorithm.
     */
    public Map<String, String> clientTags();

    /**
     * Cumulative offsets for tasks.
     */
    public List<TaskOffset> taskOffsets();

    /**
     * Cumulative task changelog end offsets for tasks.
     */
    public List<TaskOffset> taskEndOffsets();

    /**
     * The current assignment.
     */
    public StreamsGroupMemberAssignment assignment();

    /**
     * The target assignment.
     */
    public Optional<StreamsGroupMemberAssignment> targetAssignment();
    
    /**
     * The cumulative offset for one task.
     */
    public static class TaskOffset {

        public TaskOffset(final String subtopology, final int partition, final long offset);

        /**
         * The subtopology identifier.
         */
        public String subtopology();

        /**
         * The partition of the task.
         */
        public int partition();

        /**
         * The cumulative offset (sum of offsets in all input partitions).
         */
        public long offset();
    }
}

StreamsGroupMemberAssignment

/**
 * A description of the assignments of a specific streams group member.
 */
public class StreamsGroupMemberAssignment {

    public StreamsGroupMemberAssignment(
        final List<TaskIds> activeTasks,
        final List<TaskIds> standbyTasks,
        final List<TaskIds> warmupTasks);

    /**
     * Active tasks for this client.
     */
    public List<TaskIds> activeTasks();

    /**
     * Standby tasks for this client.
     */
    public List<TaskIds> standbyTasks();
    
    /**
     * Warmup tasks for this client.
     */
    public List<TaskIds> warmupTasks();

    /**
     * All tasks for one subtopology of a member.
     */
    public static class TaskIds {

        public TaskIds(final String subtopology, final List<Integer> partitions);

        /**
         * The sub-topology identifier.
         */
        public String subtopology();

        /**
         * The partitions of the input topics processed by this member.
         */
        public List<Integer> partitions();
    }
}

StreamsGroupSubtopologyDescription

/**
 * A detailed description of a single subtopology
 */
public class StreamsGroupSubtopologyDescription {

    public StreamsGroupSubtopologyDescription(
        final String subtopology,
        final List<String> sourceTopics,
        final String sourceTopicRegex, final List<String> repartitionSinkTopics,
        final Map<String, TopicInfo> stateChangelogTopics,
        final Map<String, TopicInfo> repartitionSourceTopics);

    /**
     * String to uniquely identify the subtopology.
     */
    public String subtopology();

    /**
     * The topics the topology reads from.
     */
    public List<String> sourceTopics();

    /**
     * The regular expressions identifying topics the topology reads from. null if not provided.
     */
    public String sourceTopicRegex();

    /**
     * The repartition topics the topology writes to.
     */
    public List<String> repartitionSinkTopics();

    /**
     * The set of state changelog topics associated with this sub-topology.
     */
    public Map<String, TopicInfo> stateChangelogTopics();

    /**
     * The set of source topics that are internally created repartition topics.
     */
    public Map<String, TopicInfo> repartitionSourceTopics();

    /**
     * Information about a topic.
     */
    public static class TopicInfo {

        public TopicInfo(final int partitions, final Map<String, String> topicConfigs);

        /**
         * The number of partitions in the topic.
         */
        public int partitions();

        /**
         * Configurations of the topic.
         */
        public Map<String, String> topicConfigs();
    }

}

DescribeStreamsGroupsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for {@link Admin#describeStreamsGroups(Collection<String>, DescribeStreamsGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeStreamsGroupsOptions extends AbstractOptions<DescribeStreamsGroupsOptions> {
    public DescribeStreamsGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);
 
    public boolean includeAuthorizedOperations();
}

ListStreamsGroupOffsetsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec>, ListStreamsGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsResult {
    /**
     * Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }
 
    /**
     * Return a future which yields a map of topic partitions to offsets for the specified group.
     */
    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) {
    }
}

ListStreamsGroupOffsetsOptions

package org.apache.kafka.client.admin;
  
/**
 * Options for {@link Admin#listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec>, ListStreamsGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsOptions extends AbstractOptions<ListStreamsGroupOffsetsOptions> {
}

ListStreamsGroupOffsetsSpec

package org.apache.kafka.client.admin;
  
/**
 * Specification of Streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec>, ListStreamsGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListStreamsGroupOffsetsSpec {
  public ListStreamsGroupOffsetsSpec();
 
  /**
   * Set the topic partitions whose offsets are to be listed for a Streams group.
   */
  ListStreamsGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);
 
  /**
   * Returns the topic partitions whose offsets are to be listed for a Streams group.
   */
  Collection<TopicPartition> topicPartitions();
}

ListStreamsGroupsResult

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#listStreamsGroups(ListStreamsGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListStreamsGroupsResult {
    /**
     * Returns a future that yields either an exception, or the full set of Streams group listings.
     */
    public KafkaFuture<Collection<StreamsGroupListing>> all() {
    }
 
    /**
     * Returns a future which yields just the valid listings.
     */
    public KafkaFuture<Collection<StreamsGroupListing>> valid() {
    }
  
    /**
     * Returns a future which yields just the errors which occurred.
     */
    public KafkaFuture<Collection<Throwable>> errors() {
    }
}

StreamsGroupListing

package org.apache.kafka.client.admin;
 
import org.apache.kafka.common.StreamsGroupState;
 
/**
 * A listing of a Streams group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class StreamsGroupListing {
  public StreamsGroupListing(String groupId);
  public StreamsGroupListing(String groupId, Optional<StreamsGroupState> state);
 
  /**
   * The id of the Streams group.
   */
  public String groupId();
 
  /**
   * The Streams group state.
   */
  public Optional<StreamsGroupState> state();
}

ListStreamsGroupsOptions

package org.apache.kafka.client.admin;
 
import org.apache.kafka.common.StreamsGroupState;
 
/**
 * Options for {@link Admin#listStreamsGroups(ListStreamsGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListStreamsGroupsOptions extends AbstractOptions<ListStreamsGroupsOptions> {
    /**
     * If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
     */
    public ListStreamsGroupsOptions inStates(Set<StreamsGroupState> states);
 
    /**
     * Return the list of States that are requested or empty if no states have been specified.
     */
    public Set<StreamsGroupState> states();
}


GroupType

Another case is added to the org.apache.kafka.common.GroupType  enum:

Enum constant

Description

Streams("Streams") Streams group

StreamsGroupState

A new enum org.apache.kafka.common.StreamsGroupState  is added:

Enum constant

DEAD 

EMPTY 

STABLE 

ASSIGNING

RECONCILING

UNKNOWN 

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors package, corresponding to the new error codes in the Kafka protocol.

  • StreamsInvalidTopology  - The supplied topology is invalid. Returned if the client sends a topology that does not fulfill the expected invariants.
  • StreamsMissingSourceTopics  - There are source topics missing for a topology that is supposed to be initialized. Also returned if the source topic regular expression matched no topics.
  • StreamsInconsistentInternalTopics  - There are internal topics present on the broker that are not consistent with the internal topic requirements of the provided topology.

StreamsInvalidTopology is fatal, StreamsMissingSourceTopics and StreamsInconsistentInternalTopics are all subclasses of RetriableException.

Command-line tools

kafka-streams-groups.sh

A new tool called kafka-streams-groups.sh is added for working with streams groups. It has the following options:

Option

Description

--version

Display Kafka version.

--all-input-topics

Use with --reset-offsets or --delete-offsets. If specified, includes all input topics of the streams group, as stored by the topology metadata on the broker.

--input-topics <String: topics>

Use with --reset-offsets or --delete-offsets. Comma-separated list of user input topics. For these topics, the tool by default will reset the offset to the earliest available offset, or delete the offsets. Reset to other offset position by appending other reset offset option, ex: --input-topics foo --shift-by 5. 

--internal-topics <String: topics>

Use with --delete. Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by the default behaviour (do a dry-run without this option to view these topics). 

--to-offset <Long: offset>

Reset input topic offsets to a specific offset

--to-latest

Reset input topic offsets to latest offset.

--to-earliest

Reset input topic offsets to earliest offset.

--by-duration <String: duration>

Reset input topic offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'

--to-datetime <String: datetime>

Reset input topic offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--from-file

Reset input topic offsets to values defined in CSV file.

--shift-by <Long: n>

Reset input topic offsets shifting current offset by 'n', where 'n' can be positive or negative.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)   

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Initializing', 'Reconciling', 'Assigning', 'Stable' and 'Dead'.

--reset-offsets

Reset input topic offsets of streams group. Supports one streams group at a time, and instances should be inactive.

You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, --to-latest, --shift-by, --from-file, --to-current, --to-offset.

To define the scope use --all-input-topics or --input-topics. One scope must be specified unless you use '--from-file'.

Fails if neither '--dry-run' nor '–execute' is specified.

--offsets

Describe the group and list all input topic partitions in the group along with their offset lag. This is the default sub-action of --describe and may be used with the '--describe' option only.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--list

List all streams groups.

--help

Print usage information.

--group <String: streams group ID>

The group ID (application ID) of the streams ID we wish to act on.

--execute

Execute operation. Supported operations: reset-offsets.

--dry-run

Only show results without executing changes on streams groups. Supported operations: reset-offsets.

--describe

Describe streams group and list offset lag (number of records not yet processed) related to given group.

--delete-offsets

Delete offsets of streams group. Supports one streams group at the time.

To define the scope use --all-input-topics or --input-topics. One scope must be specified unless you use '--from-file'.

--delete

Pass in a group to delete entire streams group. For instance --group g1. Deletes offsets, internal topics of the streams group and and topology and ownership information on the broker. Instances should be inactive.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

Here are some examples. 

To display a list of all streams groups:

$ kafka-streams-groups.sh --bootstrap-server localhost:9092 --list

To delete the inactive streams group S1 , which deletes all offsets, internal topics, assignment and topology metadata on the broker:

$ kafka-streams-groups.sh --bootstrap-server localhost:9092 --group S1 --delete

To delete the offsets from input topics of inactive streams group S1 , which essentially resets the consumption of this topic in the streams group:

$ kafka-streams-groups.sh --bootstrap-server localhost:9092 --group S1 --delete-offsets --all-input-topics

To set the starting offset for the input topics of inactive streams group S1  to a specific date and time:

$ kafka-streams-groups.sh --bootstrap-server localhost:9092 --group S1 --reset-offsets --all-input-topics --to-datetime 1999-12-31T23:57:00.000 --execute

kafka-streams-application-reset

The streams application reset tool will only work with the classic protocol, and will be deprecated when the classic protocol is deprecated.

Compatibility, Deprecation and Migration Plan

The new broker-side group management will require more information than the current UserData to compute the assignment, so existing Streams versions cannot join a Streams group. 

To enable rolling upgrades from the old protocol to the new protocol, we will have to extend the current UserData with the missing (topology-derived) data. We can then introduce a compatibility layer in the new group coordinator that translates RPCs from the legacy rebalance protocol into requests in the streams rebalance protocol by deserializing the UserData payload. The migration path for the user is then to update to a new version of Streams, before switching from the legacy consumer-based protocol to the streams rebalance protocol in a second step. The first time a consumer sends a StreamsGroupHeartbeat with the group ID of an existing legacy consumer group, the group will be converted into a Streams group, similar to how the migration is handled for legacy consumer groups in KIP-848.

Named topologies will not be supported in the new protocol. A Streams application using named topologies will throw an error on start-up when configured with the new protocol. While noteworthy, this is not strictly a compatibility concern, since the configuration to enable named topologies is not part of the public API.

Test Plan

Our primary method of validation will be the execution of the existing test infrastructure for Kafka streams on the new protocol, including porting all integration tests and system tests. Furthermore, we will add new integration tests for offline & online migration from the classic protocol to the streams protocol.

Rejected Alternatives

  • KIP-848 originally envisioned that Kafka Streams continues to using a byte-array metadata protocol on top of regular consumer group RPCs, and use a client-side assignor, and provided a rough proposal how to do so. The described approach was never implemented. In this KIP, we propose to use broker-side assignment by default for several reasons (mostly already mentioned in the motivation section):

    • Client-side assignment makes it hard to debug and tune the assignment logic, where a simple parameter change requires redeploying of all streams clients.

    • Using opaque byte-arrays to exchange the required metadata makes it hard to understand the state of a streams group on the protocol level. One has to discover the information from logs spread across all client machines. Central mechanisms of the Kafka protocol, such as versioning of RPCs, needs to be reimplemented on the level of RPCs.

    • One could argue that the Kafka protocol is simpler without Kafka Streams specific RPCs, but we think the opposite is true. The KIP-848 consumer group protocol was extended with client-side assignment, byte-array metadata and metadata versioning just for Kafka Streams. Having the concerns of Kafka Streams dealt with in separate RPCs mainly introduces a clearer separation and gives the Kafka community the ability to simplify the upcoming consumer protocol.

  • Instead of using a separate initialize RPC, we considered just using the heartbeat with memberEpoch=0 to initialize the Streams group and initiate the creation of all topics. This would have a number of downsides, such as that every member needs to send the initialization data every time they start (and the initialization data may be large already in the current schema and may grow larger in the future), and the heartbeat requiring the ACL for creating topics. Furthermore, since the initialization may take a certain amount of time, we’d likely still have to respond to the initial heartbeat without having fully initialized the group, leading to temporary “initializing” state for the group that we also get in this proposal.

  • An earlier draft of this KIP proposed introducing optional client-side task assignment, that would allow customization of the task assignment similar to KIP-924. We decided to remove it from this KIP, to limit the scope of the KIP and not introduce features in the protocol whose use we cannot fully envision yet. The RPCs defined in this KIP are, however, defined in a way that they could easily be extended with client-side assignment in a follow-up KIP, possibly together with a public interface for adding custom task assignors on the client side.

Future Work

The classic streams rebalance protocol will remain fully functional with the introduction of the streams protocol, however the aim is to eventually deprecate. There are some use cases no yet covered by this KIP, that would have to be added to the protocol in follow-up KIPs, if there is enough interest.

  • Customizable client-side task assignment in the style of KIP-924
  • Optimization of the assignment in cases were not every rack has a replica of the partition, as in KIP-925: Rack aware task assignment in Kafka Streams
  • Injection of a customized implementation of a streams rebalance protocol using the KafkaClientSupplier.

There are also several improvements of long-standing Kafka Streams issues that can be built on top of this protocol, in particular:

  • Topology updates

    • Validate topology compatibility broker-side
    • Introduce manually bumped topology versioning in addition to automatically managed topology IDs

    • Automatically drain repartition topics on a topology update

  • In addition to the existing shutdown-flag, a global pause/resume functionality could be built on top of the protocol
  • There is an opportunity to improve EOSv2 in the new protocol. One may add the current transaction.id to the heartbeat request and a list of closing transactions.id that the client should fence in the heartbeat response to improve EOSv2 and avoid the need to rely on a low transaction.timeout.ms config.


  • No labels