Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This KIP aims to provide a mechanism for migrating from ZookeeperConsumerConnector
to KafkaConsumer
as well as a mechanism for rolling back from KafkaConsumer
to ZookeeperConsumerConnector
.
Motivation
ZookeeperConsumerConnector
is a high-level consumer client that comes with Kafka. ZookeeperConsumerConnector
is a high-level consumer in the sense that it also handles group coordination. Group coordination here means coordinating partition ownership among the consumer group in such a way that each partition is owned by exactly one consumer in the group. This implicitly means group coordination must handle group membership changes so that partition ownership can be redistributed upon members joining/leaving the group or partitions being added. ZookeeperConsumerConnector
uses zookeeper as the coordination system.
Kafka now comes with a KafkaConsumer
consumer client that utilizes a coordination system built into the Kafka brokers themselves. This removes the zookeeper dependency.
While groups using ZookeeperConsumerConnector
are able to successfully partake in zookeeper-based coordination and groups using KafkaConsumer
are able to successfully partake in in kafka-based coordination, there is no graceful migration path from ZookeeperConsumerConnector
to KafkaConsumer
. The need for such a migration path is critical given that ZookeeperConsumerConnector
is slated for deprecation.
A number of suboptimal migration plans have been proposed:
- shutdown all the
ZookeeperConsumerConnector
s first and start them back up under the same group asKafkaConsumer
s, causing downtime. - have a mix of
ZookeeperConsumerConnector
s andKafkaConsumer
s in the same group, causing duplicate partition ownership and consumption as each rebalance protocol ignores the other. - form a brand new group for the
KafkaConsumer
s doing the same work as theZookeeperConsumerConnector
group, again causing duplicate partition ownership and consumption across the two groups.
All of these suffer from either downtime or dual partition ownership and consumption.
There is also a bigger assumption with all of these suboptimal migration plans: they assume users are able to do the full migration immediately. This assumption is not realistic. Some teams may be okay with doing a fully migration immediately with the existing migration plans and their earlier stated consequences, but most need to "canary" new code. That is, they want to deploy a single instance of the new code to test for regressions while all the other instances run old code. It is not unreasonable for this deployment strategy to span hours or even days. With this canary release strategy, the above migration plans would place the canary under heavy load since it is the sole consumer running kafka-based coordination in the group and it is guaranteed to own every partition the group is interested in. So the canary is likely going to look unhealthy and the canaried consumer can fall behind.
It is necessary to support a canary-friendly migration mechanism that does not suffer from downtime, dual partition ownership or consumption, and also spreads partition ownership among the canary and the rest of the group.
Public Interfaces
This KIP adds the following configs to kafka.consumer.ConsumerConfig
:
coordination.migration.enabled | A boolean that determines whether the ZookeeperConsumerConnector should participate in coordination migration. The default value is false . |
coordination.migration.poll.interval.ms | The poll interval for the embedded KafkaConsumer participating in coordination migration. The default value is 1000 . |
coordination.migration. {configs from org.apache.kafka.clients.consumer.ConsumerConfig } | Expose all configs from org.apache.kafka.clients.consumer.ConsumerConfig for the embedded KafkaConsumer . The user only needs to specify coordination.migration.bootstrap.servers . |
This KIP adds two yammer metrics to ZookeeperConsumerConnector:
kafka.consumer:type=ZookeeperConsumerConnector,name=ActiveKafkaCoordinationCount | This metric has value 0 when zookeeper-based coordination is in use and value 1 when kafka-based coordination is in use. |
kafka.consumer:type=ZookeeperConsumerConnector,name=KafkaOwnedPartitionsCount,clientId=<client id>,groupId=<group id> | This metric reports the number of partitions the consumer owns according to kafka-based coordination. |
Terminology
OZKCC | An old ZookeeperConsumerConnector from a build prior to this KIP with dual.commit.enabled set to false and offsets.storage set to zookeeper |
MEZKCC | A ZookeeperConsumerConnector with coordination.migration.enabled set to true , dual.commit.enabled set to true , and offsets.storage set to kafka . MEZKCC actively runs an EKC. |
MDZKCC | A ZookeeperConsumerConnector with coordination.migration.enabled set to false , dual.commit.enabled set to false , and offsets.storage set to zookeeper . MDZKCC does not instantiate an EKC. |
EKC | A KafkaConsumer embedded inside an MEKZCC |
KC | A pure non-embedded KafkaConsumer |
Requirements
A canary-friendly means of migration and rollback with:
- no downtime
- no dual partition ownership or consumption
- even partition ownership distribution among the canary and the rest of the group
Proposed Changes
In order to meet the above requirements, we need ZookeeperConsumerConnector
and KafkaConsumer
to share a coordination system.
This KIP proposes that we adapt ZookeeperConsumerConnector
to understand kafka-based coordination. This adapted ZookeeperConsumerConnector
would participate in both coordination systems in parallel but would only adopt the partition assignment decision from one of the two coordination systems based on a dynamic toggle.
Kafka-based coordination requires sophisticated logic for group coordinator discovery, rebalancing a group, and heartbeating. Rather than re-implement this in the ZookeeperConsumerConnector
, we propose to embed a KafkaConsumer
inside so it can partake in kafka-based coordination. KafkaConsumer
defines membership liveness in terms of processing messages and calling KafkaConsumer.poll
, so we would need to add a dedicated thread to ZookeeperConsumerConnector
that periodically calls poll on the embedded KafkaConsumer
. We provide a custom org.apache.kafka.clients.consumer.ConsumerRebalanceListener
to the EKC to receive notifications of partition assignment decisions from kafka-based coordination. We also call KafkaConsumer.pause
in this custom org.apache.kafka.clients.consumer.ConsumerRebalanceListener
so prevent EKC from fetching data.
Zookeeper State
This KIP adds the following znodes to zookeeper:
/consumers/<group id>/migration | This is just the parent directory for all state needed for coordination migration. |
/consumers/<group id>/migration/ids | MEZKCCs register themselves here in addition to /consumers/<group id>/ids . This exists so that members of the group can tell who is an MEZKCC as opposed to an MDZKCC or OZKCC. This lets us revert back to partition assignment decisions from zookeeper-based coordination if an MDZKCC or OZKCC were to join the group. |
/consumers/<group id>/migration/mode | This is the dynamic toggle used to notify MEZKCCs whether to use zookeeper-based coordination or kafka-based coordination partition assignment decisions. At any given point in time, this znode should either not exist, exist with value kafka , or exist with value zookeeper . |
Coordination Mode Toggle
MEZKCCs register a CoordinationModeListener
of type IZkDataListener
to the /consumers/<group id>/migration/mode
path to watch for changes to the coordination mode toggle.
If the toggle path does not exist or if the group contains OZKCCs or MDZKCCs, then MEZKCCs are to treat zookeeper-based coordination as the source of truth for partition assignment.
Else if the toggle path exists and is set to anything other than kafka
, then MEZKCCs are to treat zookeeper-based coordination as the source of truth for partition assignment.
Otherwise, MEZKCCs are to treat kafka-based coordination as the source of truth for partition assignment.
ConsumerRebalanceListener
ZookeeperConsumerConnector
exposes a kafka.javaapi.consumer.ConsumerRebalanceListener
which gives the user a hook called ConsumerRebalanceListener.beforeReleasingPartitions
for logic to be performed before releasing partitions and gives a hook called ConsumerRebalanceListener.beforeStartingFetchers
for logic to be performed after owning partitions but before starting the fetcher threads. One complication here is that ConsumerRebalanceListener.beforeStartingFetchers
provides the global partition assignment of the group at the consumer thread granularity. In order to preserve existing behavior when the user provides their own ConsumerRebalanceListener
when kafka-based group coordination is active for the group, we need to be able to:
- obtain the partition assignment decision for every member of the group. This can be obtained by sending out a
DescribeGroupsRequest
to theGroupCoordinator
. - match up the EKC with its parent MEZKCC from the group state returned in
DescribeGroupsResponse
. SinceKafkaConsumer
exposes a configurable client id andDescribeGroupsResponse
provides the client ids of each member of the group, we can set the EKC's client id to be its parent MEZKCC's consumer id. - consistently subdivide the partitions assigned to each member down to their consumer threads.
DescribeGroupsResponse
already gives us the partitions assigned to each member of the group. Zookeeper already contains information stating the topics of interest for each consumer thread in the group. These two pieces of information can be used to consistently assign partitions to consumer threads. For convenience,org.apache.kafka.clients.consumer.RoundRobinAssignor
is used to provide the consumer thread-level assignment. The assignor expects a mapping of partitions-per-topic as well as topics-per-consumer. We can simply use the topics-per-consumer-thread we already have from zookeeper as the topics-per-consumer. We can count the assigned partitions per topic as the partitions-per-topic. The assignor will return us "relative" partitions starting from zero, so we can convert these into the assigned partitions by using these relative partitions as offsets into the sorted order of the assigned partitions. We now have the global partition assignment of the group at the consumer thread granularity.
Compatibility
A build with this patch should be backwards compatible with previous builds. Users who do not configure their consumer for migration should not see any deviations in behavior.
Migration Plan
The migration plan has been broken out to its own section since it is the core of the KIP.
At any point in time, this KIP supports the following group configurations:
- ability to support a group with a mix of OZKCC, MDZKCC, and MEZKCC
- ability to support a group with a mix of MEZKCC and KC
How to migrate from OZKCCs and MDZKCCs to MEZKCCs:
- Ensure that all members of the group already have
dual.commit.enabled
set tofalse
andoffsets.storage
set tozookeeper
. If this is not the case, first do a deployment to make it so.- This is necessary because the
GroupCoordinator
currently rejects offset commits on non-empty groups. OZKCCs and MDZKCCs naively sendOffsetCommitRequests
over aBlockingChannel
with generationId -1 and an empty memberId. With MEZKCCs having an EKC, the group may no longer be empty. If the group has a mix of OZKCCs, MDZKCCs, and MEZKCCs, this would cause commits to kafka from the OZKCCs and MDZKCCs to fail. Current behavior does not attempt dual committing to zookeeper if kafka offset commit fails, meaning the whole offset commit would silently get dropped.
- This is necessary because the
- Shutdown as many OZKCCs and MDZKCCs as you'd like and replace them with MEZKCCs.
How to toggle between kafka-based coordination and zookeeper-based coordination when all members of the group are MEZKCCs:
In order to transition to kafka-based coordination, update the /consumers/<group id>/migration/mode
znode with the znode data set to kafka
.
In order to transition to zookeeper-based coordination, update the /consumers/<group id>/migration/mode
znode with the znode data set to zookeeper
.
How to migrate from MEZKCCs to KCs:
- If all consumers of the group have migrated to MEZKCCs and you wish to transition to kafka-based coordination, toggle the coordination in zookeeper by creating the znode
/consumers/<group id>/migration/mode
with the znode data set tokafka
. Your MEZKCCs should now be using the partition assignments from kafka-based group coordination. - You can now canary as many members of your group as you'd like from MEZKCC to KC.
How to rollback from KCs to MEZKCCs:
- Shutdown as many KCs as you'd like and replace them with MEZKCCs.
How to rollback from MEZKCCs back to MDZKCCs and OZKCCs:
- Shutdown as many MEZKCCs as you'd like and replace them with MDZKCCs or OZKCCs with
dual.commit.enabled
set tofalse
andoffsets.storage
set tozookeeper
.
One benefit of the dynamic toggle and the added zookeeper state is that it gives admins control over migrating consumers between zookeeper-based coordination and kafka-based coordination across a large organization. You can imagine a script that scans groups in zookeeper and toggles them to kafka-based group coordination if the group was fully migrated to MEZKCCs and stable for some time. If the switch to kafka-based group coordination proves to stress the kafka cluster, admins can toggle the MEZKCC groups back to zookeeper-based coordination on-the-fly to relieve stress from the kafka cluster.
Example Migration
The following diagrams illustrate a full migration from OZKCCs to KCs.
Initial state with a group of OZKCCs:
Begin migration from OZKCCs to MEZKCCs:
The group has fully migrated to MEZKCCs while still using zookeeper-based coordination:
The coordination mode toggle is applied so that the group of MEZKCCs uses kafka-based coordination:
Begin migration from MEZKCCs to KCs:
Final state with a group of KCs:
Rejected Alternatives
- Adapt
KafkaConsumer
to understand zookeeper-based coordination. This approach was rejected since it introduces a zookeeper dependency intokafka-clients
. - Build a wrapper class comprised of a
ZookeeperConsumerConnector
andKafkaConsumer
. When the coordination mode trigger is fired, toggle consumption to the corresponding consumer. This approach was rejected for several reasons:- It requires a transformation from
ZookeeperConsumerConnector
's orKafkaConsumer
's consumption API into the wrapper class consumption API while properly tracking offsets.- Should it adopt
ZookeeperConsumerConnector
's API of providingKafkaStream
s? - Should it adopt
KafkaConsumer
's polling API that providesConsumerRecords
?
- Should it adopt
- It introduces yet another consumer client to kafka
- Users would need to change their code to use the new client
- It requires a transformation from
- Embed a
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
insideZookeeperConsumerConnector
instead of aKafkaConsumer
. This approach was rejected becauseConsumerCoordinator
is in the "internals" package and is subject to API changes without notice. Since API changes toConsumerCoordinator
might require changes to the KIP's proposedZookeeperConsumerConnector
running kafka-based coordination, this KIP instead opts for embedding the user-facingKafkaConsumer
. - Regarding the subtask of providing a global state for
ConsumerRebalanceListener
to preserve existing behavior, we had considered just instantiating an EKC per consumer thread id so that kafka-based coordination would solve the problem of mapping partitions to consumer threads for us instead of stitching togetherDescribeGroupsResponse
and zookeeper state. We ultimately went against this approach due to the added complexity of managing many EKCs. Another downside of this approach is that a standard partition assignment strategy using kafka-based coordination would give equal weight to aZookeeperConsumerConnector
consumer thread and aKafkaConsumer
, causing an uneven partition ownership distribution across the group. - Merge the
/consumers/<group id>/ids
and/consumers/<group id>/migration/ids
directories by simply defining a new znode data version 2 for MEZKCCs. This was rejected to avoid any possibility of breaking clients as they parse the znode.