DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
Vote thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In some deployment environments such as Kubernetes, brokers may be assigned to worker nodes from an available pool. When a cluster rolls, it is possible that a broker changes its advertised hostname after a client has performed its initial bootstrap. As a result, a client may unwittingly use stale information to connect to a particular broker, only to connect to a different broker without realising. It is only later that the stale information becomes evident when things go badly wrong. Today, the only way to recover from this situation is to restart the client.
While it would be understandable to claim that this scenario is only possible because the cluster was not being orchestrated properly, the Kafka protocol does not pass any information from the client during session establishment which could spot this kind of inconsistency and automatically rebootstrap. It would also help with diagnosing situations where there is a mistake in the networking configuration for a Kafka cluster.
Proposed Changes
The Kafka protocol doesn't quite have sessions like many other protocols, but it does have an initial RPC used by a client when it connects to a broker which is ApiVersions. By adding optional ClusterId and NodeId information to the ApiVersions request, the receiving broker would be able to tell the client when it is attempting to make a misrouted connection.
If the client has disabled rebootstrapping by setting metadata.recovery.strategy=NONE , this checking is also disabled because the client would not be able to rebootstrap when instructed.
Public Interfaces
Kafka protocol
ApiVersions
This KIP introduces version 5 which adds ClusterId and NodeId to the request.
If the client is bootstrapping, it does not supply ClusterId or NodeId. After bootstrapping, during which it learns the information from its initial Metadata response, it supplies both. When rebootstrapping begins, the client discards the cluster ID and node information that it learnt from its earlier Metadata response so that it can bootstrap from scratch.
The validation of ClusterId and NodeId is as follows:
- If neither
ClusterIdnorNodeIdis specified, the request proceeds as normal - If just one of
ClusterIdorNodeIdis specified, the request fails and the error codeINVALID_REQUESTis returned. - If both
ClusterIdandNodeIdare specified and match the receiving broker, the request proceeds as normal. - If the
ClusterIdis incorrect, the request fails and the error codeREBOOTSTRAP_REQUIREDis returned. This is a non-fatal error for the client which should rebootstrap. - If the
ClusterIdis correct but theNodeIdis incorrect, the request fails and the error codeREBOOTSTRAP_REQUIREDis returned. This is a non-fatal error for the client which should rebootstrap.
Request schema
{
"apiKey": 18,
"type": "request",
"listeners": ["broker", "controller"],
"name": "ApiVersionsRequest",
// Versions 0 through 2 of ApiVersionsRequest are the same.
//
// Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion.
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0.
//
// Version 5 introduces ClusterId and NodeId (KIP-1242).
"validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
{ "name": "ClientSoftwareName", "type": "string", "versions": "3+",
"ignorable": true, "about": "The name of the client." },
{ "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
"ignorable": true, "about": "The version of the client." },
{ "name": "ClusterId", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": "true", "default": "null",
"about": "The cluster ID the client intends to connect to, if known." },
{ "name": "NodeId", "type": "int32", "versions": "5+", "default": -1, "ignorable": "true",
"about": "The broker ID the client intends to connect to, if known." }
]
}
Response schema
The response schema is unchanged. ApiVersions is now able to return an additional existing error code: REBOOTSTRAP_REQUIRED.
Configuration
Common client configuration
A low importance configuration is added just in case an unforeseen situation arises where the new checks introduced by this KIP cause problems with existing deployments. Kafka clients have always supported cluster ID change without requiring restart. By clearing cluster metadata when rebootstrapping begins, Kafka clients should be able to switch cluster ID safely which could be useful in disaster recovery scenarios.
| Configuration | Description | Values |
|---|---|---|
metadata.cluster.check.enable | Whether the client should send cluster and node information when connecting to a broker to enable it to check for a misrouted connection. This configuration is ignored if rebootstrapping is disabled by setting the configuration metadata.recovery.strategy=NONE . If the client is connecting to a broker older than Apache Kafka <insert version here>, no checking is performed and this configuration has no effect. | Boolean. true (default), or false |
Compatibility, Deprecation, and Migration Plan
There should be no negative impacts on existing users. Undiagnosed misrouted connections should be eliminated.
Test Plan
The KIP will be tested extensively using unit, integration and system tests. In particular, we should cause misrouted connections in a controlled way in order to check that the clients can rebootstrap automatically.
Rejected Alternatives
An alternative would be to introduce a new Connect RPC into the Kafka protocol, rather than extending ApiVersions. This would naturally extend to support new features such as multi-tenancy and namespaces in the future. We probably would not want to add additional context such as a namespace onto ApiVersions, so maybe we should bite the bullet now and add a new RPC. Part of the motivation of this KIP is to start the discussion in the community about the relative merits.
It would also be possible to carry node information in every RPC. That would allow individually misrouted packets to be diagnosed, but this KIP is not trying to handle situations in which the packets being sent to a broker are routed in different directions by a proxy or similar.