DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Voting
Vote thread: https://lists.apache.org/thread/95941nto2kxfqclb1pn00pmdtlgq2zjz
Discussion thread: https://lists.apache.org/thread/vnzmqvcbfxo7hhyj9gzpgmdq59w3n7dy
JIRA:
KAFKA-20174
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The reason for this KIP is to remove the requirement of brokers needing to run kafka-storage format before starting Kafka. When running kafka-storage format , nodes are required to supply a --cluster-id argument, which represents the cluster ID to which the node belongs, and this is persisted to the node's meta.properties file. Below are the other data that each node persists to disk upon invoking kafka-storage format :
For brokers, meta.properties ’ other data: node.id and directory id , are obtained from the node’s static config and randomly generated, respectively. Persisting this data does not have to be done during storage formatting and can be done later during startup.
For controllers, meta.properties ’ directory id may come from —-initial-controllers or --standalone , but otherwise controllers are the same as brokers with respect to the above data.
The main purpose of cluster id is to prevent nodes from contacting other Kafka clusters (ref KIP-78). This KIP seeks to preserve this behavior around cluster id, while removing the necessity of formatting brokers and observer controllers.
Currently, Kafka relies on the operator/caller of kafka-storage format on all intended nodes in a cluster to generate consensus on the actual value of clusterID during format time. KRaft guarantees that upon forming a given quorum X, X's members will all have the same cluster id (which could clusterId = null) contained in meta.properties. Additionally, when a KRaft node sends a request with a non-null cluster id, this request is only handled if the request's cluster ID matches the handler's cluster ID.
Background on cluster.id from ZooKeeper Kafka
Cluster id was a znode, /cluster/id , that was initially empty. During the startup of a cluster, brokers would race to write a random UUID in ZK to this znode, which would never change after being set, via getOrGenerateClusterId() .
Public Interfaces
meta.properties
Introduce meta.properties v2 with optional cluster id. However, node.id is guaranteed to exist. directory.id is written to meta.properties V1 during startup if it does not exist.
ClusterIdRecord + MetadataVersion
Introduce a new metadata record to store cluster id and a new MetadataVersion (MV) that supports encoding/decoding this record. This means that during formatting, the bootstrap ClusterIdRecord is only written if the node is formatted with a MV that supports this feature. When a node runs kafka-storage format, some value for MV is always resolved and written to disk alongside meta.properties . The value either comes from --release-version or --feature , or it defaults to the latest production MV.
If this feature is supported by the cluster's MV, the first elected KRaft leader will write the ClusterIdRecord upon becoming the active controller alongside the other bootstrap metadata records.
One invariant of this feature is there is at most one ClusterIdRecord in the metadata log. When the MV does not support ClusterIdRecord, there is no ClusterIdRecord in the metadata log, and instead the cluster id is a local to each node's meta.properties . When the MV does support ClusterIdRecord, there must be exactly one ClusterIdRecord in the metadata log. When the active controller writes the bootstrap metadata records when the MV supports this feature, or when it updates its MV to support this feature, it must write the ClusterIdRecord. This invariant is enforceable when replaying the log and checking the image and deltas.
{
"apiKey": 29,
"type": "metadata",
"name": "ClusterIdRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+",
"about": "The unique ID of this cluster" }
]
}
The type of ClusterId in this record is of type String, rather than UUID like with TopicRecord, because of the following:
Currently, you can format a cluster with a non-UUID cluster ID string, and kafka considers this "well-formatted" (i.e. formatting code accepts String, server startup works, and clusterId is a String in-memory etc.). Kafka documentation references formatting with a UUID cluster id generated via kafka-storage random-uuid , but this is not a requirement in the code.
If this record were to have UUID like TopicRecord, it is not clear what the MV upgrade path is for existing clusters who formatted meta.properties with a non-UUID String. The active controller would need to write a new UUID cluster id, which violates the invariant that the cluster id cannot change over the lifetime of a cluster.
Storage Tool + Formatting requirements
Prior to this KIP, it is expected that all nodes running kafka have invoked the kafka-storage format command and have persisted a meta.properties with a cluster.id file prior to starting kafka. We do this by trying to read in the meta.properties during startup, and crashing if that file does not contain all the data we expect.
Observer controllers are controllers who are either not part of the static voter set in kraft.version=0, or controllers who are not part of the bootstrap voter set defined by --initial-controllers or --standalone in kraft.version=1 (i.e. controllers who format today with --no-initial-controllers). Non-observer/"bootstrap controllers" are therefore either part of the static voter set in kraft.version=0, or controllers who format with --initial-controllers or --standalone in a dynamic quorum setup. This distinction is important because observer controllers are not responsible for the initial KRaft leader election, but bootstrap controllers are.
We can still enforce that bootstrap controllers must have formatted (and therefore persisted a cluster id to meta.properties) prior to starting kafka. The validation of meta.properties during startup described above can be done whenever the node is a bootstrap controller (i.e. part of the static voters config, or if a 0-0.checkpoint exists with a VotersRecord). We still need to do this validation mainly for kraft.version=0 clusters with newer software versions but an older MV (kraft.version=1 clusters require formatting of at least one node to elect a leader). Otherwise, these clusters will have no way to persist a cluster id to meta.properties if the operator skips formatting on all nodes (which would be possible without this requirement). A consequence of this requirement is that when KRaft elects a leader, it is guaranteed to have a non-null cluster id.
Proposed Changes
Remove the requirement of brokers and observer controllers to format before starting kafka
- After KIP-1262, kafka operators no longer need to format all nodes. They only need to format the "bootstrap" controller nodes before starting kafka.
- "Bootstrap controllers" are the nodes listed the
controller.quorum.votersstatic config when using a static quorum, or the controllers who format with--initial-controllers/--standalonein a dynamic quorum setup. - Formatting brokers and observer controllers is now optional. Failing to run
kafka-storage formaton these nodes before starting kafka will no longer crash startup. - Faling to run
kafka-storage formaton bootstrap controllers will cause kafka to crash during startup. This behavior is identical to how kafka behaves today with respect to bootstrap controllers not being formatted properly.
- "Bootstrap controllers" are the nodes listed the
meta.properties will be written during kafka broker/controller startup if it doesn't exist already (from formatting)
During startup of the KafkaRaftServer, we attempt to read the meta.properties file
- If
meta.propertiesdoes not exist, writemeta.propertiesV2 withnode.idanddirectory.id- This means either this node skipped formatting, or the file/disk was lost
- If meta.properties exists without a cluster.id and is V2, it will be discovered later (described below)
- One correctness invariant of this feature is that updating the cluster.id in-memory must occur after persisting it to
meta.properties.
- One correctness invariant of this feature is that updating the cluster.id in-memory must occur after persisting it to
- If meta.properties exists with a
cluster.id, kafka behaves as it does today:- The node assumes it to be correct and passes it to KRaft
- If this ID doesn't match the KRaft leader's the leader will reject requests from the node
- If a broker/observer controller has already written a cluster id to its
meta.properties, either from formatting or discovering it from the cluster metadata, it is impossible for it to learn of another cluster id viaFetch/FetchSnapshot.- For the broker (with clusterid = X) to receive a non-error FetchResponse with metadata records (which would be the only way to learn of a different ClusterIdRecord), the KRaft leader (clusterid = Y) must either receive a request without clusterid, or a request whose clusterid is Y. The broker fulfills neither of these conditions.
- This case could happen when bootstrap endpoints point to the wrong cluster during restart of a node. KRaft's own clusterid checks would mean startup of this node would time out and shut down the node because it is not able to contact the quorum of another cluster.
Introduce a metadata record for cluster id
- Introduce a new MetadataVersion for this feature alongside a new metadata record called ClusterIdRecord
- One invariant of this feature is that if the persisted MV supports this feature, a ClusterIdRecord must also be persisted
- This is enforceable along the write-path for MV, which occurs at two points: formatting a node (specifically controllers who can become leader), and upgrading the MV using
kafka-features upgrade- During formatting, nodes must resolve a MV with which to format. This comes from the
--release-version/--featureflag and defaults to the latest production MV. If the MV at format time supports this feature, a ClusterIdRecord must be written as part of the bootstrap metadata checkpoint.- The first active controller will write the ClusterIdRecord + MV as part of the bootstrap metadata records write if the MV supports this feature
- During MV upgrades, successfully upgrading the MV to one that supports this feature requires successfully committing a ClusterIdRecord alongside the new MetadataVersion feature record.
- During formatting, nodes must resolve a MV with which to format. This comes from the
- There is a precedent already for this kind of invariant which is enforced along the write path with kraft.version and the
VotersRecord
- This is enforceable along the write-path for MV, which occurs at two points: formatting a node (specifically controllers who can become leader), and upgrading the MV using
When nodes discover cluster.id from the metadata publishing pipeline, they persist it to meta.properties + update KRaft
- Nodes whose
meta.propertiesdo not containcluster.idwill discover this value via the metadata publishing pipeline - The point at which nodes can discover this value in-memory is after both:
- Learning of a HWM from the leader, which the leader allows for because it will send valid fetch responses back to nodes who do not have a cluster id
- The MetadataLoader is registered as a listener to the raft layer
- Besides the raft client, readers of cluster id initialized during startup can block for both the above conditions to be met before being initialized.
- The raft client's clusterId will be updated after discovering this value from the metadata pipeline. One detail here is that observer controllers with auto-join must wait until they have a cluster id before trying to add or remove themselves.
- This can be implemented as a MetadataPublisher that registers to the raft client alongside the MetadataLoader.
Pros:
- Fetch replication automatically handles persistence of the cluster id for each local node
- Raft module remains independent from metadata module in that KRaft is only responsible for consensus. ClusterID is simply another piece of metadata on which Kraft achieves consensus
Cons:
- Currently, KRaft client also needs to be aware of the cluster ID for its own RPC handling, but the raft module does not decode metadata records
- Having a mechanism for “pushing-down” cluster ID from metadata to raft may be complicated.
- We could duplicate data and have a raft level control record for cluster ID.
Compatibility, Deprecation, and Migration Plan
Since this feature is associated with a new metadata record and MetadataVersion, broker bootstrapping with cluster ID is required on all MVs < X where X is the first MV that supports this feature. Because some MetadataVersion is resolved during each node's formatting, we can determine at format time if a ClusterIdRecord is needed as part of a controller's 0-0/bootstrap.checkpoint.
Test Plan
- Unit tests
- Integration tests
- System tests to verify cross-software-version compatibility
Rejected Alternatives
Continue to persist cluster id in meta.properties but have KRaft discover it + persist it via FetchResponse
- Rough design:
- Node can complete a future to allow this value to be discovered by readers outside of kraft layer who need it during startup
- Raft layer is brought up early during startup, so it is fine to wait until this future completes to proceed with initializing the server
- Brokers/observers can start Kafka with no cluster id, and rely on the fetch/another RPC response to discover it in-memory
- If discovered, node persists cluster id to meta.properties during the startup process before in-memory readers of cluster id
- Pros:
- Backwards compatibility is straightforward, since new nodes on old clusters keep using meta.properties for persisting cluster id
- This functionality is not tied to a MetadataVersion, meaning that any Kafka broker/observer with a software version that supports this KIP can use it, rather than the whole cluster needing to be on some MV >= X.
- Kraft can easily do its own cluster ID validation for its RPCs, since nodes receive cluster ID via the fetch response if they do not know it and can update that state in-memory + persist it
- Cons:
- Since each local node's
meta.propertiesis its source of truth, if this file is deleted or is changed, it means the broker/observer controller can join another cluster.- This is no worse than what exists currently.
- Since each local node's