Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussionAccepted

Discussion threadhere

JIRA: 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9119


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

In the proposed architecture, three controller nodes substitute for the three ZooKeeper nodes.  The controller nodes and the broker nodes run in separate JVMs.  The controller nodes elect a single leader for the metadata partition, shown in orange.  Instead of the controller pushing out updates to the brokers, the brokers pull metadata updates from this leader.  That is why the arrows point towards the controller rather than away.

The Controller Quorum

The controller nodes comprise a Raft quorum which manages the metadata log.  This log contains information about each change to the cluster metadata.  Everything that is currently stored in ZooKeeper, such as topics, partitions, ISRs, configurations, and so on, will be stored in this log.

Using the Raft algorithm, the controller nodes will elect a leader from amongst themselves, without relying on any external system.  The leader of the metadata log is called the active controller.  The active controller handles all RPCs made from the brokers.  The follower controllers replicate the data which is written to the active controller, and serve as hot standbys if the active controller should fail.  Because the controllers will now all track the latest state, controller failover will not require a lengthy reloading period where we transfer all the state to the new controller.

Just like ZooKeeper, Raft requires a majority of nodes to be running in order to continue running.  Therefore, a three-node controller cluster can survive one failure.  A five-node controller cluster can survive two failures, and so on.

Note that although the controller processes are logically separate from the broker processes, they need not be physically separate.  In some cases, it may make sense to deploy some or all of the controller processes on the same node as the broker processes.  This is similar to how ZooKeeper processes may be deployed on the same nodes as Kafka brokers today in smaller clusters.  As per usual, all sorts of deployment options are possible, including running in the same JVM.

The Controller Quorum

The controller nodes comprise a Raft quorum which manages the metadata log.  This log contains information about each change to the cluster metadata.  Everything that is currently stored in ZooKeeper, such as topics, partitions, ISRs, configurations, and so on, will be stored in this log.

Using the Raft algorithm, the controller nodes will elect a leader from amongst themselves, without relying on any external system.  The leader of the metadata log is called the active controller.  The active controller handles all RPCs made from the brokers.  The follower controllers replicate the data which is written to the active controller, and serve as hot standbys if the active controller should fail.  Because the controllers will now all track the latest state, controller failover will not require a lengthy reloading period where we transfer all the state to the new controller.

Just like ZooKeeper, Raft requires a majority of nodes to be running in order to continue running.  Therefore, a three-node controller cluster can survive one failure.  A five-node controller cluster can survive two failures, and so on.

Periodically, the controllers will write out a snapshot of the metadata to disk.  While this is conceptually similar to compaction, the code path will be a bit different because we can simply read the state from memory rather than re-Periodically, the controllers will write out a snapshot of the metadata to disk.  While this is conceptually similar to compaction, the code path will be a bit different because we can simply read the state from memory rather than re-reading the log from disk.

...

In the current world, a broker which can contact ZooKeeper but which is partitioned from the active controller will continue serving user requests, but will not receive any metadata updates.  This can lead to some confusing and difficult situations.  For example, a producer using acks=1 might continue to produce to a leader that actually was not the leader any more, but which failed to receive the controller's LeaderAndIsrRequest moving the leadership.

...

When a broker is stopping, it is still running, but we are trying to migrate the partition leaders off of the broker.

Eventually, the active controller will ask the broker to finally go offline, by returning a special result code in the MetadataFetchResponse.  Alternately, the broker will shut down if the leaders can't be moved in a predetermined amount of time.

...

New versions of the clients should send these operations directly to the active controller.  This is a backwards compatible change: it will work with both old and new clusters.  In order to preserve compatibility with old clients that sent these operations to a random broker, the brokers will forward these requests to the active controller.

...

In some cases, we will need to create a new API to replace an operation that was formerly done via ZooKeeper.  One example of this is that when the leader of a partition wants to modify the in-sync replica set, it currently modifies ZooKeeper directly  In the post-ZK world, the leader will make an RPC to the active controller instead.

Removing Direct ZooKeeper Access from Tools

...

We will preserve compatibility with the existing Kafka clients.  In some cases, the existing clients will take a less efficient code path.  For example, the brokers may need to forward their requests to the active controller.

Bridge Release

The overall plan for compatibility is to create a "bridge release" of Kafka where the ZooKeeper dependency is well-isolated.  We will be able to upgrade from any version of Kafka to this bridge release, and from the bridge release to a post-ZK release.  When upgrading from an earlier release to a post-ZK release, the upgrade must be done in two steps: first, you must upgrade to the bridge release, and then you must upgrade to the post-ZK release.

Rolling Upgrade

The rolling upgrade from the bridge release will take several steps.

Upgrade to the Bridge Release

While this release will not remove ZooKeeper, it will eliminate most of the touch points where the rest of the system communicates with it.  As much as possible, we will perform all access to ZooKeeper in the controller, rather than in other brokers, clients, or tools.  Therefore, although ZooKeeper will still be required for the bridge release, it will be a well-isolated dependency.

We will be able to upgrade from any version of Kafka to this bridge release, and from the bridge release to a post-ZK release.  When upgrading from an earlier release to a post-ZK release, the upgrade must be done in two steps: first, you must upgrade to the bridge release, and then you must upgrade to the post-ZK release.

Rolling Upgrade

The rolling upgrade from the bridge release will take several steps.

Upgrade to the Bridge Release

The cluster must be upgraded to The cluster must be upgraded to the bridge release, if it isn't already.

...

We do not need to worry about the ZooKeeper state getting concurrently modified during this loading process.  In the bridge release, neither the tools nor the non-controller brokers will modify ZooKeeper.

The new active controller will monitor ZooKeeper for legacy broker node registrations.  It will know how to send the legacy "push" metadata requests to those nodes, during the transition period.

...

Once the last broker node has been rolled, there will be no more need for ZooKeeper.  We will remove it from the configuration of the controller quorum nodes, and then roll the controller quorum to fully remove it.

Rejected Alternatives

Combined Controller and Broker Nodes

We could have combined the broker and the controller in the same JVM.  This would have the advantage of minimizing the number of JVMs.

However, there are several advantages to keeping them separate.  One is that the deployment model is more familiar to Kafka administrators.  If they had a certain number of ZooKeeper nodes previously, they can just upgrade to having the same number of controller nodes without rethinking cluster sizing or topology.

Another reason is to avoid an unbalanced load.  As the amount of metadata managed by the controller grows, the nodes which must serve this metadata will experience a correspondingly heavier load.  This makes it less realistic to treat the controller nodes exactly the same as all other nodes when performing rebalancing or partition assignment.  Using separate nodes reduces the chance that the current controller will be disrupted by heavy load on a particular broker.  For clusters where the load is small enough that this is not an issue, the system administrator can simply choose to co-locate the controller and broker JVMs.

Pluggable Consensus

Rather than managing metadata ourselves, we could make the metadata storage layer pluggable so that it could work with systems other than ZooKeeper.  For example, we could make it possible to store metadata in etcd, Consul, or similar systems.

Unfortunately, this strategy would not address either of the two main goals of ZooKeeper removal.  Because they have ZooKeeper-like APIs and design goals, these external systems would not let us treat metadata as an event log.  Because they are still external systems that are not integrated with the project, deployment and configuration would still remain more complex than they needed to be.

Supporting multiple metadata storage options would inevitably decrease the amount of testing we could give to each configuration.  Our system tests would have to either run with every possible configuration storage mechanism, which would greatly increase the resources needed, or choose to leave some user under-tested.  Increasing the size of test matrix in this fashion would really hurt the project.

...

Pluggable Consensus

Rather than managing metadata ourselves, we could make the metadata storage layer pluggable so that it could work with systems other than ZooKeeper.  For example, we could make it possible to store metadata in etcd, Consul, or similar systems.

Unfortunately, this strategy would not address either of the two main goals of ZooKeeper removal.  Because they have ZooKeeper-like APIs and design goals, these external systems would not let us treat metadata as an event log.  Because they are still external systems that are not integrated with the project, deployment and configuration would still remain more complex than they needed to be.

Supporting multiple metadata storage options would inevitably decrease the amount of testing we could give to each configuration.  Our system tests would have to either run with every possible configuration storage mechanism, which would greatly increase the resources needed, or choose to leave some user under-tested.  Increasing the size of test matrix in this fashion would really hurt the project.

Additionally, if we supported multiple metadata storage options, we would have to use "least common denominator" APIs.  In other words, we could not use any API unless all possible metadata storage options supported it.  In practice, this would make it difficult to optimize the system.

Follow-on Work

This KIP expresses a vision of how we would like to evolve Kafka in the future.  We will create follow-on KIPs to hash out the concrete details of each change.

References

The Raft consensus algorithm

Handling Metadata via Write-Ahead Logging