DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Greg Harris, Ivan Yurchenko, Jorge Quilcate, Giuseppe Lillo, Anatolii Popov, Juha Mynttinen, Josep Prat, Filip Yonov
Status
Current state: Discarded (incorporated into KIP-1163: Diskless Core)
Discussion thread: here
JIRA: KAFKA-19257
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-1150: Diskless Topics introduces the concept of diskless topics. It brings the capability for the clients to interact with the closest broker without the requirement to use only designated leaders for this new topic type. To utilize this capability and to reduce the cost of cross-rack traffic (e.g. in cloud environments), we propose here to allow brokers to choose a rack-local broker as the leader of diskless topics based on the information provided by the client.
Proposed Changes
Metadata rack awareness
Diskless topics do not rely on leaders to serve Produce and Fetch requests, which could be served by any broker.
Diskless piggybacks on the Broker and Client rack-awareness to offer the brokers within the same rack to serve requests. This will be achieved by returning the selected broker in MetadataResponse and DescribeTopicPartitionsResponse. Clients will specify their rack via "client.rack" configuration, and Broker metadata will return the brokers within the rack using "broker.rack" configuration (if available). The algorithm here is the following:
- For classic topics, their real replicas, leader, and ISR will be returned.
- For diskless topics:
- Based on "client.rack", a set of candidate brokers is selected. If the client rack is specified, it’s the set of alive brokers in this rack. If the client rack is not specified or there are no brokers in this rack, it’s all alive brokers in the cluster.
- The broker is picked from this set in the round-robin fashion.
- All the diskless partitions in the metadata returned in this response will have the selected broker as the sole replica, sole ISR, and the leader.
When this or another client sends another metadata request, it is likely that they will receive a different leader for the diskless topics. This will serve as a load balancing mechanism.
By following this approach, requests will be fully processed within the same rack, avoiding cross-rack costs and additional latency. Remote storage ensures batches are replicated and available to the other racks.
Public Interfaces
Binary Protocol
In order to implement metadata rack awareness for topic-partitions, Kafka clients will need to expose their "client.rack" configuration to the serving broker. This could be done by providing the "RackId" in the MetadataRequest and DescribeTopicPartitionsRequest, and having corresponding responses provide tailored metadata for the client.
Metadata API
Request schema
{
"apiKey": 3,
"type": "request",
"listeners": ["broker"],
"name": "MetadataRequest",
"validVersions": "0-14",
"flexibleVersions": "9+",
"fields": [
{ "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
"about": "The topics to fetch metadata for.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "nullableVersions": "10+",
"about": "The topic name." }
]},
{ "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false,
"about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." },
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8-10",
"about": "Whether to include cluster authorized operations." },
{ "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+",
"about": "Whether to include topic authorized operations." },
{ "name": "RackId", "type": "string", "versions": "14+", "default": "", "ignorable": true,
"about": "Rack ID of the client making this request"}
]
}
Response schema
Version 14 is the same as version 13.
DescribeTopicPartitions API
Request schema
{
"apiKey": 75,
"type": "request",
"listeners": ["broker"],
"name": "DescribeTopicPartitionsRequest",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
"about": "The topics to fetch details for.",
"fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." }
]
},
{ "name": "ResponsePartitionLimit", "type": "int32", "versions": "0+", "default": "2000",
"about": "The maximum number of partitions included in the response." },
{ "name": "Cursor", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The first topic and partition index to fetch details for.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name for the first topic to process." },
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index to start with." }
]},
{ "name": "RackId", "type": "string", "versions": "14+", "default": "", "ignorable": true,
"about": "Rack ID of the client making this request"}
]
}
Response schema
Version 1 is the same as version 0.
Compatibility, Deprecation, and Migration Plan
This change is backwards compatible with previous versions and is applicable to diskless topics only. For classic topics the behavior does not change.
Test Plan
Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Unit tests will be added to make sure the logic is correct.
Additionally system tests in a multi-rack cluster environment could be included, for example covering the cases below:
- Deploy a Kafka cluster across multiple racks (e.g., 2-3 racks simulated using different broker.rack configurations on nodes)
- Create a classic topic and write to it → Check that produce request is sent to partition leader regardless of rack configuration
- Create a diskless topic and configure "client.rack" on producer side matching one of the broker racks → Check that produce request is sent to a broker with corresponding rack
- Create a diskless topic and configure "client.rack" on producer side not matching any of the broker racks → Check that produce request is sent to one of the brokers
- While a rack-aware producer is running, stop the brokers in its preferred rack. Verify that the producer receives updated metadata and successfully fails over to sending produce requests to brokers in other racks.
- Run an older producer against a cluster with diskless support and verify that it is able to produce correctly to both types of topics.
Documentation Plan
Documentation needs to be adjusted clarifying what leader means for diskless topics.
- Clarify the leader definition for diskless topics since they are effectively leader-less but the concept of leader is used at least in MetadataResponse and DescribeTopicPartitionsResponse.
- Document how the broker chooses leaders for diskless topic partitions.
Rejected Alternatives
Client-side broker selection
The client could choose the broker in its own rack based on metadata and "client.rack" configuration. This increases the complexity because the client would need to manage liveliness and availability checks for brokers and deal with inconsistent or stale cluster views. Centralizing the broker selection on the broker side simplifies that and gives additional benefit of being able to modify the broker selection logic without modifying the clients.
Separate rack-aware proxy
Brings significant additional overhead and adds unnecessary complexity from an operational perspective.
Adding rack information to ProduceRequest
The client could send rack information in ProduceRequest and the broker could respond with NOT_LEADER_OR_FOLLOWER returning the preferred broker information to the client.
This brings additional complexity and overhead in terms of additional requests to discover a preferred broker. The broker sends the MetadataRequest before the first ProduceRequest and could know the preferred broker in advance from MetadataResponse.