|Table of Contents|
Current state: Adopted
Discussion thread: https://lists.apache.org/thread.html/rf8947e0d6aad51023b378305acc285c69030988abc7bda9b9c429b8a%40%3Cdev.kafka.apache.org%3E
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
In order to facilitate rolling upgrades of Kafka in KRaft mode, we need the ability to upgrade controllers and brokers while holding back the use of new RPCs and record formats until the whole cluster has been upgraded. We would also like to perform this upgrade without the need for the infamous “double roll”.
Additionally, we want the ability to perform downgrades in a controlled and predictable manner.
Kafka clusters currently use the IBP (inter.broker.protocol.version) configuration to gate new features and RPC changes. Since this is a static configuration defined for each broker in its properties file, a broker must be restarted in order to update this value. This leads to the necessity of two rolling restarts for cluster upgrades:
Support for downgrading the IBP is also poorly defined. For versions prior to Kafka 3.0, downgrading the IBP is explicitly not supported (based on the documentation). There have been cases where a persisted format was gated by the IBP and downgrades are impossible. However, in practice, we see that for many versions it can often be done without any problems.
Brokers and Controllers advertise their API capabilities using ApiVersions RPC. In some cases, the advertised versions of a node's RPCs are influenced by broker’s configured record version or IBP (inter.broker.protocol.version). However, in most cases if an RPC version is defined in the binary of a particular node, it will be advertised through ApiVersions.
- ApiKeys and the min/max version supported by the broker
- Supported features and the min/max versions for the broker
- Finalized (cluster-wide) features and the min/max version for the cluster
Feature Flags (KIP-584)
Supported feature flags depend on the version of the code and represent the capabilities of the current binary. A broker defines a minimum and maximum supported version for each feature flag. Finalized feature flags are dynamic and set by an operator using the “kafka-features.sh” script. The operator defines a maximum finalized version for the given feature flag which is used to convey the “in-use” version of the feature within the cluster. To date, no features in Kafka are utilizing these feature flags.
When a KRaft client joins the quorum and begins fetching, the leader may determine that it needs to load a snapshot to catch up more quickly. This snapshot includes the entire metadata state at a given point in time and will effectively force a broker or controller to re-build it's internal data structures which derive from the metadata log. For example, on the broker, processing a snapshot will result in a new MetadataImage which is the backing data structure for MetadataCache.
Define new KIP-584 feature flag "metadata.version". This feature flag is only examined or initialized in KRaft mode. In ZK mode, this feature flag is ignored.
All sub-commands require the
--bootstrap-server argument and may take the
--command-config argument. All operations that might mutate the state of a feature flag also include a
The describe command shows the current features active in the cluster.
usage: kafka-features --bootstrap-server BOOTSTRAP_SERVER describe [-h] optional arguments: -h, --help show this help message and exit
The ugprade command upgrades one or more features in the cluster.
% ./bin/kafka-features.sh upgrade --help usage: kafka-features --bootstrap-server BOOTSTRAP_SERVER upgrade [-h] [--metadata METADATA] [--feature FEATURE] [--dry-run] optional arguments: -h, --help show this help message and exit --metadata METADATA The level to which we should upgrade the metadata. For example, 3.3-IV3. --feature FEATURE A feature upgrade we should perform, in key=value format. For example metadata.version=3.3-IV3. --dry-run Validate this downgrade, but do not perform it.
The downgrade command downgrades one or more features in the cluster.
% ./bin/kafka-features.sh downgrade --help usage: kafka-features --bootstrap-server BOOTSTRAP_SERVER downgrade [-h] [--metadata METADATA] [--feature FEATURE] [--unsafe] [--dry-run] optional arguments: -h, --help show this help message and exit --metadata METADATA The level to which we should downgrade the metadata. For example, 3.3-IV0. --feature FEATURE A feature downgrade we should perform, in key=value format. For example metadata.version=3.3-IV0. --unsafe Perform this downgrade even if it may irreversibly destroy metadata. --dry-run Validate this downgrade, but do not perform it.
The disable command downgrades one or more features in the cluster to level 0 (disabled).
% ./bin/kafka-features.sh disable --help usage: kafka-features --bootstrap-server BOOTSTRAP_SERVER disable [-h] [--feature FEATURE] [--unsafe] [--dry-run] optional arguments: -h, --help show this help message and exit --feature FEATURE A feature flag to disable. --unsafe Disable this feature flag even if it may irreversibly destroy metadata. --dry-run Perform a dry-run of this disable operation.
KIP-584 Finalized Version Addendum
Remove the concept of a minimum finalized version from feature flags. The supported version will continue to be a min/max range, but the finalized version will simply be the version specified by the operator using kafka-features.sh. This simplifies the KRaft upgrade and downgrade process by eliminating confusion about the meaning of a minimum finalized metadata.version.
New kafka-storage argument
--metadata-version option to "format" sub-command of kafka-storage.sh. The default value of this initial metadata.version is statically defined by the binary used to run the tool.
usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID [--metadata-version VERSION] [--ignore-formatted] optional arguments: -h, --help show this help message and exit --config CONFIG, -c CONFIG The Kafka configuration file to use. --cluster-id CLUSTER_ID, -t CLUSTER_ID The cluster ID to use. --metadata-version VERSION The initial value for metadata.version feature flag. --ignore-formatted, -g
To support the three possible downgrade types, we will add an enum and a new constructor to org.apache.kafka.clients.admin.FeatureUpdate
We will also add a DryRun boolean to UpdateFeaturesOptions with a default no-arg constructor setting the boolean to false.
The sections below go into more detail, but the overall workflow of an upgrade is:
- Operator decreases metadata.version feature flag using
- UpdateFeaturesRequest is sent to the active controller
- The controller validates that the cluster can be safely downgraded to this version (override with --unsafe)
- FeatureLevelRecord is written to the metadata log
- Controller generates new snapshot and components reload their state with it (this snapshot may be lossy!)
- Broker replicates FeatureLevelRecord for downgrade
- Broker generates new snapshot and components reload their state with it (this snapshot may be lossy!)
- Operator performs rolling restart of cluster with downgraded software version
New Feature Flag
We will introduce a new feature flag named metadata.version which takes over (and expands on) the role of inter.broker.protocol.version. This new feature flag will track changes to the metadata record format and RPCs. Whenever a new record or RPC is introduced, or an incompatible change is made to an existing record or RPC, we will increase this version. The metadata.version is free to increase many times between Kafka releases. This is similar to the IV (inter-version) versions of the IBP.
In the absence of an operator defined value for metadata.version, we cannot safely assume anything about which metadata.version to use. If we simply assumed the highest supported value, it could lead to unintended downgrades in the event that a broker with a lower supported version joined the cluster. To avoid this, and other upgrade complications, we will need to bootstrap metadata.version with some initial version.
When the quorum leader (i.e., active controller) is starting up for the first time after this feature flag has been introduced, it will need a way to initialize metadata.version. After the controller finishes loading its state from disk, if has not encountered a FeatureLevelRecord, it will read an initial value for this feature from its local meta.properties file and generate a FeatureLevelRecord. We will extend the
format sub-command of
kafka-storage.sh to allow operators to specify which version is initialized. If no value has been specified by the operator, the tool will select the default value that has been defined for that version of the software.
One special case is when we are upgrading from an existing "preview" KRaft cluster. In this case, the meta.properties fill will already exist and will not have an initial metadata.version specified. For this case, the controller will automatically initialize metadata.version as 1. In order to maintain compatibility with preview KRaft releases, we must ensure that metadata.version 1 is backwards compatible to KRaft at Apache Kafka 3.0. This allows for a straightforward downgrade path back to an earlier (preview) KRaft software version.
It is possible that brokers and controllers attempt to join the cluster or quorum, but cannot support the current metadata.version. For brokers, this is already handled by the controller during registration. If a broker attempts to register with the controller, but the controller determines that the broker cannot support the current set of finalized features (which includes metadata.version), the controller will reject the registration request and the broker will remain fenced. For controllers, it is more complicated since we need to allow the quorum to be established in order to allow records to be exchanged and learn about the new metadata.version. A controller running old software will join the quorum and begin replicating the metadata log. If this inactive controller encounters a FeatureLevelRecord for metadata.version that it cannot support, it should terminate.
If a broker encounters an unsupported metadata.version, it should unregister itself and terminate.
KRaft upgrades are done in two steps with only a single rolling restart of the cluster required. After all the nodes of the cluster are running the new software version, they will continue using the previous version of RPCs and record formats. Only after increasing the metadata.version will these new RPCs and records be used. Since a software upgrade may span across multiple metadata.version versions, it should be possible to perform many online upgrades without restarting any nodes. This provides a mechanism for incrementally increasing metadata.version to try out new features introduced between the initial software version and the upgraded software version.
One major difference with the static IBP-based upgrade is that the metadata.version may be changed arbitrarily at runtime. This means broker and controller components which depend on this version will need to dynamically adjust their state and behavior as the version changes.
Now that the RPCs in-use by a broker or controller can change at runtime (due to changing metadata.version), we will need a way to inform a node's remote clients that new RPCs are available. One example of this would be holding back a new RPC that has corresponding new metadata records. We do not want brokers to start using this RPC until the controller can actually persist the new record type.
Since clients have no visibility to changes in metadata.version, the only mechanism we have for updating the negotiated ApiVersions is connection establishment. By closing the broker side of the connection, clients would be forced to reconnect and receive an updated set of ApiVersions. We may want to investigate alternative approaches to this in a future KIP.
One of the goals of this design is to provide a clear path for metadata.version downgrades and software downgrades. Since metadata.version can introduce backwards incompatible formats of persisted data, we can classify downgrades into lossless and lossy. If the target downgrade version is fully compatible with the starting version, the downgrade can be executed without any loss of metadata. However, if a new metadata record has been introduced, or an incompatible change was made to a record, a downgrade is only possible if some metadata is removed from the log.
Once the downgrade snapshot has been loaded by all nodes, a software downgrade is now possible. In both lossy and lossless downgrade scenarios, there may be tagged fields present in the metadata records from previous newer version, but these are transparently skipped over during record deserialization.
Compatibility, Deprecation, and Migration Plan
Starting with the release that includes this KIP, clusters running self-managed mode will ignore inter.broker.protocol.version. Instead, components will begin using metadata.version as the gatekeeper for new features, RPCs, and metadata records. The new version will be managed using Kafka’s feature flag capabilities.
This design assumes a Kafka cluster in self-managed mode as a starting point. A future KIP will detail the procedure for migrating a ZooKeeper managed Kafka to a self-managed Kafka.
Transitional feature level
This idea utilizes the fact that KIP-584 feature flags can have a minimum and maximum finalized version. We would consider the cluster to be in a "transitional" version if these two were not equal. For example, min finalized version of 1 and max finalized version of 3. While in this transitional state, the cluster would enable new features introduced (up to the maximum version), but keep the metadata log backwards compatible to the minimum version.
Ultimately, this was determined to be too complex and placed a lot of burden on implementers to figure out various aspects of compatibility. It also increased the burden of knowledge on the operator to understand the implications of this transitional state.
Similar to the transitional approach above, but in this case the min and max versions of a record would be written to the log together within a generic composite record. For the upgrade scenario, components would only process the newer of the two. For downgrade scenarios, the older (backwards compatible) version would be read. This would allow a downgrade to occur without rewriting the metadata log.
The main downside of this approach is that there is an undetermined amount of time when both versions of a record would be needed in the metadata log. This could lead to increased space requirements for an extended period of time.
Initially, this design included details about a final IBP version that would be used by KRaft clusters to signal the transition from IBP to metadata.version. Instead of this, we decided to simply require that KRaft requires a metadata.version to be set starting with the release that this KIP will be included in. An additional constraint of making version 1 backwards compatible was added to deal with the KRaft preview upgrade case. Overall, this will simplify the code quite a bit