DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Voting
Vote thread: https://lists.apache.org/thread/95941nto2kxfqclb1pn00pmdtlgq2zjz
Discussion thread: https://lists.apache.org/thread/vnzmqvcbfxo7hhyj9gzpgmdq59w3n7dy
JIRA:
KAFKA-20174
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The reason for this KIP is to remove the requirement of nodes needing to run kafka-storage format before starting Kafka. When running kafka-storage format , nodes are required to supply a --cluster-id argument, which represents the cluster ID to which the node belongs, and this is persisted to the node's meta.properties file. Below are the other data that each node persists to disk upon invoking kafka-storage format :
For brokers, meta.properties ’ other data: node.id and directory id , are obtained from the node’s static config and randomly generated, respectively. Persisting this data does not have to be done during storage formatting and can be done later during startup.
For controllers, meta.properties ’ directory id may come from —-initial-controllers or --standalone , but otherwise controllers are the same as brokers with respect to the above data.
The main purpose of cluster id is to prevent nodes from contacting other Kafka clusters (ref KIP-78). This KIP seeks to preserve this behavior around cluster id, while removing the necessity of formatting brokers and observer controllers.
Currently, Kafka relies on the operator/caller of kafka-storage format on all intended nodes in a cluster to generate consensus on the actual value of clusterID during format time. KRaft guarantees that upon forming a given quorum X, X's members will all have the same cluster id (which could clusterId = null) contained in meta.properties. Additionally, when a KRaft node sends a request with a non-null cluster id, this request is only handled if the request's cluster ID matches the handler's cluster ID.
Background on cluster.id from ZooKeeper Kafka
Cluster id was a znode, /cluster/id , that was initially empty. During the startup of a cluster, brokers would race to write a random UUID in ZK to this znode, which would never change after being set, via getOrGenerateClusterId() .
Public Interfaces
meta.properties
Introduce meta.properties v2 with optional cluster id. However, node.id is guaranteed to exist. directory.id is written to meta.properties V1 during startup if it does not exist.
meta.properties V2 example: version=2 (required) node.id=9990 (required) directory.id=UUID (optional, but always gets written) cluster.id=UUID (now optional)
ClusterIdRecord + MetadataVersion
Introduce a new metadata record to store cluster id and a new MetadataVersion (MV) that supports encoding/decoding this record. This means that during formatting, the bootstrap ClusterIdRecord is only written if the node is formatted with a MV that supports this feature. When a node runs kafka-storage format, some value for MV is always resolved and written to disk alongside meta.properties . The value either comes from --release-version or --feature , or it defaults to the latest production MV.
If this feature is supported by the cluster's MV, the first elected KRaft leader will write the ClusterIdRecord upon becoming the active controller alongside the other bootstrap metadata records.
One invariant of this feature is there is at most one unique ClusterId in the metadata log. When the MV does not support ClusterIdRecord, there is no ClusterId in the metadata log, and instead the cluster id is local to each node's meta.properties . When the MV does support ClusterIdRecord, there must be exactly one unique ClusterId in the metadata log. When the active controller writes the bootstrap metadata records when the MV supports this feature, or when it updates its MV to support this feature, it must write the ClusterIdRecord. This invariant is enforceable when replaying the log and checking the image and deltas.
{
"apiKey": 29,
"type": "metadata",
"name": "ClusterIdRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+",
"about": "The unique ID of this cluster" }
]
}
The type of ClusterId in this record is of type String, rather than UUID like with TopicRecord, because of the following:
Currently, you can format a cluster with a non-UUID cluster ID string, and kafka considers this "well-formatted" (i.e. formatting code accepts String, server startup works, and clusterId is a String in-memory etc.). Kafka documentation references formatting with a UUID cluster id generated via kafka-storage random-uuid , but this is not a requirement in the code.
If this record were to have UUID like TopicRecord, it is not clear what the MV upgrade path is for existing clusters who formatted meta.properties with a non-UUID String. The active controller would need to write a new UUID cluster id, which violates the invariant that the cluster id cannot change over the lifetime of a cluster.
Storage Tool + Formatting requirements
Prior to this KIP, it is expected that all nodes running kafka have invoked the kafka-storage format command and have persisted a V1 meta.properties with a cluster.id file prior to starting kafka. We do this by trying to read in the meta.properties during startup, and crashing if that file does not contain all the data we expect.
After this KIP, it is no longer required for nodes to run kafka-storage format in order to start kafka. Additionally, the --cluster-id argument for kafka-storage format will now be optional, rather than required. However, operators still have the option to format nodes to set the MV, feature versions, scram credentials, or to properly provision a kraft.version=1 cluster. If an operator decides to format a node, they will still need to specify --cluster-id if the MV which is resolved by the formatter does not support the ClusterIdRecord feature.
A consequence of the above relaxation is the the initial KRaft leader can be elected without a cluster.id in its local meta.properties. In this case, the initial KRaft leader (active controller) will randomly generate a cluster id as part of the bootstrap metadata write (this either writes the bootstrap records as a transaction if the MV supports it, or as an atomic batch). This is similar to how cluster.id is generated in ZK-based kafka.
Proposed Changes
Remove the requirement of nodes to format before starting kafka
After KIP-1262, kafka operators no longer need to invoke kafka-storage format to start kafka. Clusters who skip formatting essentially have a bootstrapping "state" of the latest MetadataVersion and a ClusterIdRecord who is generated by the initial active controller.
Clusters who skip formatting will also have a static KRaft quorum, which is explained in the below section, so controller.quorum.voters must be defined. Operators can still end up with a dynamic quorum without formatting because static to dynamic quorum upgrades are supported.
Operators can still format clusters the same way as prior to this KIP. However, the --cluster-id flag now becomes optional. When --cluster-id is specified, write meta.properties V2 with it. When --cluster-id is not specified, and the formatter's resolved MV supports ClusterIdRecord, write meta.properties V2 with an empty cluster.id. If the formatter's resolved MV does not support ClusterIdRecord, formatting will fail.
When is formatting still required before starting kafka
There are a few situations where operators still need to format nodes before starting kafka, but this only applies to bootstrap controllers. Bootstrap controllers are those who participate in the initial KRaft leader election. These are the nodes specified by controller.quorum.voters in a static quorum, and the nodes who format with --standalone/--initial-controllers in a dynamic quorum.
Formatting of bootstrap controllers is still required when operators want to:
- Specify bootstrap SCRAM credentials
- Specify bootstrap non-default feature levels or metadata version
- Bootstrap a dynamic quorum (kraft.version=1) cluster.
meta.properties will be written during kafka broker/controller startup if it doesn't exist already (from formatting)
During startup of the KafkaRaftServer, we attempt to read the meta.properties file:
- If
meta.propertiesdoes not exist and the node is a broker/observer controller, writemeta.propertiesV2 withnode.idanddirectory.id- This means either this node skipped formatting, or the file/disk was lost
- If
meta.propertiesexists and is v1, do the same validations as we do today in kafka - If meta.properties exists without a cluster.id and is V2, it will be discovered later (described below)
- One correctness invariant of this feature is that updating the cluster.id in-memory must occur after persisting it to
meta.properties.
- One correctness invariant of this feature is that updating the cluster.id in-memory must occur after persisting it to
- If meta.properties exists with a
cluster.id, kafka behaves as it does today:- The node assumes it to be correct and passes it to KRaft
- If this ID doesn't match the KRaft leader's ID, the leader will reject requests from the node
If a broker/observer controller has already written a cluster id to its meta.properties, either from formatting or discovering it from the cluster metadata, it is impossible for it to learn of another cluster id via Fetch/FetchSnapshot. For the broker (with clusterid = X) to receive a non-error FetchResponse with metadata records (which would be the only way to learn of a different ClusterIdRecord), the KRaft leader (clusterid = Y) must either receive a request without clusterid, or a request whose clusterid is Y. The broker fulfills neither of these conditions. This case could happen when bootstrap endpoints point to the wrong cluster during restart of a node. KRaft's own clusterid checks would mean startup of this node would time out and shut down the node because it is not able to contact the quorum of another cluster.
Introduce a new MetadataVersion and the ClusterIdRecord
One invariant of this feature is that if the persisted MV supports this feature, a ClusterIdRecord must also be persisted. This is enforceable along the write-path for MV, which occurs at the following points: formatting a node (specifically controllers who can become leader), the bootstrap metadata write of the initial active controller, and upgrading the MV using kafka-features upgrade .
- During formatting, nodes must resolve a MV with which to format. This comes from the
--release-version/--featureflag and defaults to the latest production MV. If the MV at format time supports this feature, a ClusterIdRecord must be written as part of the bootstrap metadata checkpoint (0-0.checkpoint post KIP-1170) - The first active controller will write the ClusterIdRecord + MV as part of the bootstrap metadata records write if the MV supports this feature
- During MV upgrades, successfully upgrading the MV to one that supports this feature requires successfully committing a ClusterIdRecord alongside the new MetadataVersion feature record.
There is a precedent already for this kind of invariant which is enforced along the write path with kraft.version and the VotersRecord
When nodes discover cluster.id from the metadata publishing pipeline, they persist it to meta.properties + update KRaft
Nodes whose meta.properties do not contain cluster.id will discover this value via the metadata publishing pipeline. This can be implemented as a MetadataPublisher that registers to the MetadataLoader.
- It is the responsibility of this MetadataPublisher to persist a discovered value of cluster.id to all meta.properties files before making the cluster.id available to in-memory data structures.
- Because of JBOD, nodes can have many log.dirs that each have a meta.properties file (or we need to write each instance of meta.properties V2 during startup without cluster.id if we skipped formatting).
- The MetadataPublisher is the single writer of cluster.id (unless it is already set by formatting, in which case there are no writers to cluster.id), both in persisting the value to meta.properties and writing the in-memory representation of cluster.id
- There are many readers of this value, but most of them can block their initialization until this value is discovered for the sake of simplicity.
- The KafkaRaftClient and QuorumController are the readers who need to handle the concept of an empty/not-set cluster id.
Pros:
- Fetch replication automatically handles persistence of the cluster id for each local node
- Raft module remains independent from metadata module in that KRaft is only responsible for consensus. ClusterID is simply another piece of metadata on which Kraft achieves consensus
Cons:
- Currently, KRaft client also needs to be aware of the cluster ID for its own RPC handling, but the raft module does not decode metadata records
- Having a mechanism for “pushing-down” cluster ID from metadata to raft may be complicated.
- We could duplicate data and have a raft level control record for cluster ID.
Compatibility, Deprecation, and Migration Plan
Since this feature is associated with a new metadata record and MetadataVersion, broker bootstrapping with cluster ID is required on all MVs < X where X is the first MV that supports this feature. Because some MetadataVersion is resolved during each node's formatting, we can determine at format time if a ClusterIdRecord is needed as part of a controller's 0-0/bootstrap.checkpoint.
Test Plan
- Unit tests
- Integration tests
- System tests to verify cross-software-version compatibility
Rejected Alternatives
Continue to persist cluster id in meta.properties but have KRaft discover it + persist it via FetchResponse
- Rough design:
- Node can complete a future to allow this value to be discovered by readers outside of kraft layer who need it during startup
- Raft layer is brought up early during startup, so it is fine to wait until this future completes to proceed with initializing the server
- Brokers/observers can start Kafka with no cluster id, and rely on the fetch/another RPC response to discover it in-memory
- If discovered, node persists cluster id to meta.properties during the startup process before in-memory readers of cluster id
- Pros:
- Backwards compatibility is straightforward, since new nodes on old clusters keep using meta.properties for persisting cluster id
- This functionality is not tied to a MetadataVersion, meaning that any Kafka broker/observer with a software version that supports this KIP can use it, rather than the whole cluster needing to be on some MV >= X.
- Kraft can easily do its own cluster ID validation for its RPCs, since nodes receive cluster ID via the fetch response if they do not know it and can update that state in-memory + persist it
- Cons:
- Since each local node's
meta.propertiesis its source of truth, if this file is deleted or is changed, it means the broker/observer controller can join another cluster.- This is no worse than what exists currently.
- Since each local node's
Remove the requirement for formatting for brokers and observer controllers, but still require it for bootstrap controllers
The main reason we can relax this to say no nodes require formatting is that KRaft can elect a leader when all nodes do not have cluster.id defined.