Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


This KIP aims to provide a mechanism for migrating from ZookeeperConsumerConnector to KafkaConsumer as well as a mechanism for rolling back from KafkaConsumer to ZookeeperConsumerConnector.


ZookeeperConsumerConnector is a high-level consumer client that comes with Kafka. ZookeeperConsumerConnector is a high-level consumer in the sense that it also handles group coordination. Group coordination here means coordinating partition ownership among the consumer group in such a way that each partition is owned by exactly one consumer in the group. This implicitly means group coordination must handle group membership changes so that partition ownership can be redistributed upon members joining/leaving the group or partitions being added. ZookeeperConsumerConnector uses zookeeper as the coordination system.

Kafka now comes with a KafkaConsumer consumer client that utilizes a coordination system built into the Kafka brokers themselves. This removes the zookeeper dependency.

While groups using ZookeeperConsumerConnector are able to successfully partake in zookeeper-based coordination and groups using KafkaConsumer are able to successfully partake in in kafka-based coordination, there is no graceful migration path from ZookeeperConsumerConnector to KafkaConsumer. The need for such a migration path is critical given that ZookeeperConsumerConnector is slated for deprecation.

A number of suboptimal migration plans have been proposed:

  1. shutdown all the ZookeeperConsumerConnectors first and start them back up under the same group as KafkaConsumers, causing downtime.
  2. have a mix of ZookeeperConsumerConnectors and KafkaConsumers in the same group, causing duplicate partition ownership and consumption as each rebalance protocol ignores the other.
  3. form a brand new group for the KafkaConsumers doing the same work as the ZookeeperConsumerConnector group, again causing duplicate partition ownership and consumption across the two groups.

All of these suffer from either downtime or dual partition ownership and consumption.

There is also a bigger assumption with all of these suboptimal migration plans: they assume users are able to do the full migration immediately. This assumption is not realistic. Some teams may be okay with doing a fully migration immediately with the existing migration plans and their earlier stated consequences, but most need to "canary" new code. That is, they want to deploy a single instance of the new code to test for regressions while all the other instances run old code. It is not unreasonable for this deployment strategy to span hours or even days. With this canary release strategy, the above migration plans would place the canary under heavy load since it is the sole consumer running kafka-based coordination in the group and it is guaranteed to own every partition the group is interested in. So the canary is likely going to look unhealthy and the canaried consumer can fall behind.

It is necessary to support a canary-friendly migration mechanism that does not suffer from downtime, dual partition ownership or consumption, and also spreads partition ownership among the canary and the rest of the group.

Public Interfaces

This KIP adds the following configs to kafka.consumer.ConsumerConfig:

coordination.migration.enabledA boolean that determines whether the ZookeeperConsumerConnector should participate in coordination migration. The default value is false.
coordination.migration.poll.interval.msThe poll interval for the embedded KafkaConsumer participating in coordination migration. The default value is 1000.
coordination.migration.{configs from org.apache.kafka.clients.consumer.ConsumerConfig}Expose all configs from org.apache.kafka.clients.consumer.ConsumerConfig for the embedded KafkaConsumer. The user only needs to specify coordination.migration.bootstrap.servers.

This KIP adds two yammer metrics to ZookeeperConsumerConnector:

kafka.consumer:type=ZookeeperConsumerConnector,name=ActiveKafkaCoordinationCountThis metric has value 0 when zookeeper-based coordination is in use and value 1 when kafka-based coordination is in use.
kafka.consumer:type=ZookeeperConsumerConnector,name=KafkaOwnedPartitionsCount,clientId=<client id>,groupId=<group id>This metric reports the number of partitions the consumer owns according to kafka-based coordination.


OZKCCAn old ZookeeperConsumerConnector from a build prior to this KIP with dual.commit.enabled set to false and set to zookeeper
MEZKCCZookeeperConsumerConnector with coordination.migration.enabled set to truedual.commit.enabled set to true, and set to kafka. MEZKCC actively runs an EKC.
MDZKCCZookeeperConsumerConnector with coordination.migration.enabled set to falsedual.commit.enabled set to false, and set to zookeeper. MDZKCC does not instantiate an EKC.
EKCKafkaConsumer embedded inside an MEKZCC
KCA pure non-embedded KafkaConsumer


A canary-friendly means of migration and rollback with:

  1. no downtime
  2. no dual partition ownership or consumption
  3. even partition ownership distribution among the canary and the rest of the group

Proposed Changes

In order to meet the above requirements, we need ZookeeperConsumerConnector and KafkaConsumer to share a coordination system.

This KIP proposes that we adapt ZookeeperConsumerConnector to understand kafka-based coordination. This adapted ZookeeperConsumerConnector would participate in both coordination systems in parallel but would only adopt the partition assignment decision from one of the two coordination systems based on a dynamic toggle.

Kafka-based coordination requires sophisticated logic for group coordinator discovery, rebalancing a group, and heartbeating. Rather than re-implement this in the ZookeeperConsumerConnector, we propose to embed a KafkaConsumer inside so it can partake in kafka-based coordination. KafkaConsumer defines membership liveness in terms of processing messages and calling KafkaConsumer.poll, so we would need to add a dedicated thread to ZookeeperConsumerConnector that periodically calls poll on the embedded KafkaConsumer. We provide a custom org.apache.kafka.clients.consumer.ConsumerRebalanceListener to the EKC to receive notifications of partition assignment decisions from kafka-based coordination. We also call KafkaConsumer.pause in this custom org.apache.kafka.clients.consumer.ConsumerRebalanceListener so prevent EKC from fetching data.

Zookeeper State

This KIP adds the following znodes to zookeeper:

/consumers/<group id>/migrationThis is just the parent directory for all state needed for coordination migration.
/consumers/<group id>/migration/idsMEZKCCs register themselves here in addition to /consumers/<group id>/ids. This exists so that members of the group can tell who is an MEZKCC as opposed to an MDZKCC or OZKCC. This lets us revert back to partition assignment decisions from zookeeper-based coordination if an MDZKCC or OZKCC were to join the group.
/consumers/<group id>/migration/modeThis is the dynamic toggle used to notify MEZKCCs whether to use zookeeper-based coordination or kafka-based coordination partition assignment decisions. At any given point in time, this znode should either not exist, exist with value kafka, or exist with value zookeeper.

Coordination Mode Toggle

MEZKCCs register a CoordinationModeListener of type IZkDataListener to the /consumers/<group id>/migration/mode path to watch for changes to the coordination mode toggle.

If the toggle path does not exist or if the group contains OZKCCs or MDZKCCs, then MEZKCCs are to treat zookeeper-based coordination as the source of truth for partition assignment.

Else if the toggle path exists and is set to anything other than kafka, then MEZKCCs are to treat zookeeper-based coordination as the source of truth for partition assignment.

Otherwise, MEZKCCs are to treat kafka-based coordination as the source of truth for partition assignment.


ZookeeperConsumerConnector exposes a kafka.javaapi.consumer.ConsumerRebalanceListener which gives the user a hook called ConsumerRebalanceListener.beforeReleasingPartitions for logic to be performed before releasing partitions and gives a hook called ConsumerRebalanceListener.beforeStartingFetchers for logic to be performed after owning partitions but before starting the fetcher threads. One complication here is that ConsumerRebalanceListener.beforeStartingFetchers provides the global partition assignment of the group at the consumer thread granularity. In order to preserve existing behavior when the user provides their own ConsumerRebalanceListener when kafka-based group coordination is active for the group, we need to be able to:

  1. obtain the partition assignment decision for every member of the group. This can be obtained by sending out a DescribeGroupsRequest to the GroupCoordinator.
  2. match up the EKC with its parent MEZKCC from the group state returned in DescribeGroupsResponse. Since KafkaConsumer exposes a configurable client id and DescribeGroupsResponse provides the client ids of each member of the group, we can set the EKC's client id to be its parent MEZKCC's consumer id.
  3. consistently subdivide the partitions assigned to each member down to their consumer threads. DescribeGroupsResponse already gives us the partitions assigned to each member of the group. Zookeeper already contains information stating the topics of interest for each consumer thread in the group. These two pieces of information can be used to consistently assign partitions to consumer threads. For convenience, org.apache.kafka.clients.consumer.RoundRobinAssignor is used to provide the consumer thread-level assignment. The assignor expects a mapping of partitions-per-topic as well as topics-per-consumer. We can simply use the topics-per-consumer-thread we already have from zookeeper as the topics-per-consumer. We can count the assigned partitions per topic as the partitions-per-topic. The assignor will return us "relative" partitions starting from zero, so we can convert these into the assigned partitions by using these relative partitions as offsets into the sorted order of the assigned partitions. We now have the global partition assignment of the group at the consumer thread granularity.


A build with this patch should be backwards compatible with previous builds. Users who do not configure their consumer for migration should not see any deviations in behavior.

Migration Plan

The migration plan has been broken out to its own section since it is the core of the KIP.

At any point in time, this KIP supports the following group configurations:

  1. ability to support a group with a mix of OZKCC, MDZKCC, and MEZKCC
  2. ability to support a group with a mix of MEZKCC and KC

How to migrate from OZKCCs and MDZKCCs to MEZKCCs:

  1. Ensure that all members of the group already have dual.commit.enabled set to false and set to zookeeper. If this is not the case, first do a deployment to make it so.
    1. This is necessary because the GroupCoordinator currently rejects offset commits on non-empty groups. OZKCCs and MDZKCCs naively send OffsetCommitRequests over a BlockingChannel with generationId -1 and an empty memberId. With MEZKCCs having an EKC, the group may no longer be empty. If the group has a mix of OZKCCs, MDZKCCs, and MEZKCCs, this would cause commits to kafka from the OZKCCs and MDZKCCs to fail. Current behavior does not attempt dual committing to zookeeper if kafka offset commit fails, meaning the whole offset commit would silently get dropped.
  2. Shutdown as many OZKCCs and MDZKCCs as you'd like and replace them with MEZKCCs.

How to toggle between kafka-based coordination and zookeeper-based coordination when all members of the group are MEZKCCs:
In order to transition to kafka-based coordination, update the /consumers/<group id>/migration/mode znode with the znode data set to kafka.
In order to transition to zookeeper-based coordination, update the /consumers/<group id>/migration/mode znode with the znode data set to zookeeper.

How to migrate from MEZKCCs to KCs:

  1. If all consumers of the group have migrated to MEZKCCs and you wish to transition to kafka-based coordination, toggle the coordination in zookeeper by creating the znode /consumers/<group id>/migration/mode with the znode data set to kafka. Your MEZKCCs should now be using the partition assignments from kafka-based group coordination.
  2. You can now canary as many members of your group as you'd like from MEZKCC to KC.

How to rollback from KCs to MEZKCCs:

  1. Shutdown as many KCs as you'd like and replace them with MEZKCCs.

How to rollback from MEZKCCs back to MDZKCCs and OZKCCs:

  1. Shutdown as many MEZKCCs as you'd like and replace them with MDZKCCs or OZKCCs with dual.commit.enabled set to false and set to zookeeper.

One benefit of the dynamic toggle and the added zookeeper state is that it gives admins control over migrating consumers between zookeeper-based coordination and kafka-based coordination across a large organization. You can imagine a script that scans groups in zookeeper and toggles them to kafka-based group coordination if the group was fully migrated to MEZKCCs and stable for some time. If the switch to kafka-based group coordination proves to stress the kafka cluster, admins can toggle the MEZKCC groups back to zookeeper-based coordination on-the-fly to relieve stress from the kafka cluster.

Example Migration

The following diagrams illustrate a full migration from OZKCCs to KCs.

Initial state with a group of OZKCCs:

Initial state with a group of OZKCCs

Begin migration from OZKCCs to MEZKCCs:

Begin migration from OZKCCs to MEZKCCs

The group has fully migrated to MEZKCCs while still using zookeeper-based coordination:

The group has fully migrated to MEZKCCs while still using zookeeper-based coordination

The coordination mode toggle is applied so that the group of MEZKCCs uses kafka-based coordination:

The coordination mode toggle is applied so that the group of MEZKCCs uses kafka-based coordination

Begin migration from MEZKCCs to KCs:

Begin migration from MEZKCCs to KCs

Final state with a group of KCs:

Final state with a group of KCs

Rejected Alternatives

  1. Adapt KafkaConsumer to understand zookeeper-based coordination. This approach was rejected since it introduces a zookeeper dependency into kafka-clients.
  2. Build a wrapper class comprised of a ZookeeperConsumerConnector and KafkaConsumer. When the coordination mode trigger is fired, toggle consumption to the corresponding consumer. This approach was rejected for several reasons:
    1. It requires a transformation from ZookeeperConsumerConnector's or KafkaConsumer's consumption API into the wrapper class consumption API while properly tracking offsets.
      1. Should it adopt ZookeeperConsumerConnector's API of providing KafkaStreams?
      2. Should it adopt KafkaConsumer's polling API that provides ConsumerRecords?
    2. It introduces yet another consumer client to kafka
    3. Users would need to change their code to use the new client
  3. Embed a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator inside ZookeeperConsumerConnector instead of a KafkaConsumer. This approach was rejected because ConsumerCoordinator is in the "internals" package and is subject to API changes without notice. Since API changes to ConsumerCoordinator might require changes to the KIP's proposed ZookeeperConsumerConnector running kafka-based coordination, this KIP instead opts for embedding the user-facing KafkaConsumer.
  4. Regarding the subtask of providing a global state for ConsumerRebalanceListener to preserve existing behavior, we had considered just instantiating an EKC per consumer thread id so that kafka-based coordination would solve the problem of mapping partitions to consumer threads for us instead of stitching together DescribeGroupsResponse and zookeeper state. We ultimately went against this approach due to the added complexity of managing many EKCs. Another downside of this approach is that a standard partition assignment strategy using kafka-based coordination would give equal weight to a ZookeeperConsumerConnector consumer thread and a KafkaConsumer, causing an uneven partition ownership distribution across the group.
  5. Merge the /consumers/<group id>/ids and /consumers/<group id>/migration/ids directories by simply defining a new znode data version 2 for MEZKCCs. This was rejected to avoid any possibility of breaking clients as they parse the znode.
  • No labels