DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This proposal try to introduce a new data structure called chunk.
Status
Current state: "Voting"
Discussion thread: here
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This proposal is to resolve a problem in Kafka's design.
The replicas in Kafka serve two major function: accepting write and serving read. But only the end of the replica serves write and most of the read. Kafka has complicated logics to handle consistency for read and write operations at the end of the replica. The majority part of the replica is immutable and most of the time it is merely a data store.
The replica is managed as a single unit in Kafka. When the replica need to be moved between brokers all the data need to be moved. It could be costly and inefficient as most of the time we just interested with those data at the end of the replica.
This KIP proposed a new ability to separate the concerns of the 'active' data, which most read and write happens, and 'inactive' data in a replica. The 'active' data will still following existing replica management methodology. 'Inactive' data can be managed separately and in a simplified manner.
To achieve this, I propose to break the partition data into chunks. Data in chunks can be assigned to different brokers independently. The chunk at the end of the partition is active chunk. The rest chunks are inactive chunk.
There are some benefit of this approach:
Horizontal scalability: the data was distributed more evenly to brokers in cluster. Also achieving a more flexible resource allocation.
Better load balancing: The read of partition data, especially early data can be distributed to more nodes other than just leader nodes.
Increased fault tolerance: failure of leader node will not impact read older data.
Compare to tiered storage KIP-405: Kafka Tiered Storage
Tiered storage uses an external system to store partition data. It uses architecture complexity to trade for long term storage and small footprint local data. While this proposal solve the problem within Kafka.
This proposal can also achieve long term data storage by just keep adding brokers and move the active chunk to the new broker. Also that tiered storage can have a small amount of data but can't be zero. These data still need to be copied during partition reassignment. While with this proposal the chunk creation could be done with zero data copy.
The two KIPs are designed from a different perspective. They both want to address the problem that overtime the partition data is too big and hard to manage.
The tiered storage design principal is to make the data become somebody else's problem. Data can be provided over an interface. As long as the interface is implemented properly the external data can be accessed seamlessly.
The chunk design principal is to extends current Kafka data plane's capability to be able to handle large partition data, fully inside Kafka. No other systems are needed.
The main benefit is to manage the data fully in side Kafka data plane in a uniformed way. So that there a not too much logic need to be added and the data can be scaled linearly, with the cost of metadata complexity (thanks for the KRaft work to make this a lot easier).
Arguably we could implement a default RemoteStorageManager inside Kafka using a separated disk system. But 1) as the name indicated it meant to be an external / remote system, and 2) if it is running inside the same JVM with Kafka certain resources (throughput specifically) need to coordinated with Kafka broker, this increase complexity. And 3) would the data managed the same way as partition data or not (e.g. has segments and indices)? If the same why worth the hassle to add this additional RemoteStorageManager layer? If not why the same partition data managed differently inside the same broker?
There are no conflict between this proposal and KIP-405. A partition with chunks can still have data moved to tiered storage.
Compared to KIP-392: Allow consumers to fetch from closest replica
KIP-392 avoid cross DC data read. This proposal try to balance the read from all the inactive chunk replicas. Implementing balanced read on inactive chunks is easier as they are all identity copy of immutable data.
There are no conflict between this proposal and KIP-392. For active chunks they can still read from co-located DCs. For inactive chunks read could also consider DC locations (future work).
Proposed Changes
Add a new data structure called chunk in partition.
Chunk Definition
A chunk is a sub data structure under a partition that holds a continuous section of messages. A partition can contain one or more chunks. The latest chunk is the active chunk. Only the active chunk can accept message append. Other chunks called inactive chunk. Data in inactive chunks are immutable.
Each chunk has its own replica defined across multiple brokers. The number of replicas must be the same with partition replica number. Below is an example of partition with 3 replicas:
| Offset | Broker 0 | Broker 1 | Broker 2 | Broker 3 | Broker 4 |
|---|---|---|---|---|---|
| Partition started with replicas=[0, 1, 2] | |||||
| 0 | m0 | m0 | m0 | ||
| 1 | m1 | m1 | m1 | ||
| 2 | m2 | m2 | m2 | ||
| 3 | m3 | m3 | m3 | ||
| New chunk created with replicas=[1, 3, 4] | |||||
| 4 | m4 | m4 | m4 | ||
| 5 | m5 | m5 | m5 | ||
| 6 | m6 | m6 | m6 | ||
| 7 | m7 | m7 | m7 | ||
| New chunk created with replicas=[0, 2, 4] | |||||
| 8 | m8 | m8 | m8 | ||
| 9 | m9 | m9 | m9 | ||
| 10 | m10 | m10 | m10 | ||
| 11 | m11 | m11 | m11 | ||
The chunk has a startOffset, stopOffset and endOffset. The startOffset is the smallest offset in the chunk. StartOffset is also a key that point to a unique inactive chunks in a partition. The stopOffset is the last offset the consumer can read. The endOffset is the biggest offset in the chunk.
Messages in the chunk are immutable and all the replicas hold the identical data. There is no leader for a chunk. All the replicas can serve read request.
Create A Chunk
The chunk can only be created from the end of the partition. The creation process is like partition reassignment but the new replicas only replicate data start from the chunk start offset.
The new inactive chunk will be finished by set a stop and end offset. The stop offset is the lastStableOffset of the partition. The end offset is the high watermark offset of the partition.
The start offset of the new active chunk is partition's lastStableOffset + 1. The new chunk should have a new set of replicas.
The new chunk and and previous chunk may or may not have data overlap depend on if lastStableOffset is equal to high watermark offset.
Chunk Representation in Record
This KIP introduces two new records to manage inactive chunks and their transitions:
- ChunkRecord: Represents an inactive chunk.
- ChunkChangeRecord: Represents movements of inactive chunk replicas.
The chunk's start, stop, and end offsets are immutable and cannot be modified.
The existing PartitionRecord now represent active chunks. Two new fields will be added to this record and the PartitionChangeRecord:
- startOffset: Specifies the starting offset of the active chunk.
- startTimestamp: Indicates the timestamp corresponding to the chunk's start offset.
For more information, refer to the Public Interfaces section.
Chunk Representation in Metadata
The metadata response will have a new field chunks to include all the inactive chunks. The original partition part represent the active chunk.
Client Side Changes
Producer
No changes needed on the producer side. The chunk creation looked identical to partition reassignment from producer side.
Consumer
All the inactive chunks and active chunks has a start offset and start timestamp field. Consumer need to use this info to lookup the right chunk to find the in sync replica list. For inactive chunks consumer will select a replica randomly by hash. For active chunks consumer still need to read from leader node, unless configuration said otherwise.
Server Side Changes
Handle ChunkCreateRequest
The broker and controller will handle the ChunkCreate request. Only the leader broker of given active chunk should process this request.
The leader broker should process the request like this:
- Provide last stable offset as stop offset for previous inactive chunk
- Provide high watermark offset + 1 as start offset for the new active chunk
- Forward the new request to Controller
The controller should process the request like this:
- Create a ChunkRecord that has the existing active chunks data plus stop as provided in request, and end offset as request start offset - 1.
- Create a PartitionChangeRecord with the start offset as in request.
- Send those record in a single atomic batch to Raft
Handle AlterChunkAssignmentsRequest
The controller should process the request like this:
- Based on the new replica set in the request, calculate total, adding and removing replica
- Create a ChunkChangeRecord based above calculation and send to Raft
ReplicaManager to process chunk change
The ReplicaManager on each broker will detect and process chunk related changes include:
- chunkStops: current active chunk become inactive
Processed as partition stop without delete the partition data. - chunkCreates: current broker need to create a new chunk replica
Create a new partition if not present, then start to replicate data from a random ISR replica (by hash). - chunkDeletes: current broker need to delete a chunk replica
Stop any replica fetch for the given chunk. Delete segments and indices those only holds the data from given chunk.
Automatic chunk deletion
The ReplicaManager should detect the case that full chunk data was deleted due to TTL, partition size limit or moved to tiered storage, and send a AlterChunkAssignmentsRequest to controller to remove current broker from replica list.
The controller should detect that all of replicas in a given chunk was removed and then remove the chunk from topic images.
In addition, the ReplicaManager should also detect the deletion of the whole partition including all the active and inactive chunks and thus delete the partition data.
Command line change
Output of describe Kafka topic command:
./kafka-topics.sh describe --topic ...
Will also output the chunks under each partition. The fields include start offset, stop offset, end offset, replica set and ISR set.
Public Interfaces
Changes to PartitionRecord
{
"apiKey": 3,
"type": "metadata",
"name": "PartitionRecord",
// Version 1 adds Directories for KIP-858
// Version 2 implements Eligible Leader Replicas and LastKnownElr as described in KIP-966.
// Version 3 adds StartOffset and StartTimestamp for KIP-xxxx
"validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
"about": "The partition id." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique ID of this topic." },
...
{ "name": "LastKnownElr", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 2,
"about": "The last known eligible leader replicas of this partition." },
{ "name": "StartOffset", "type": "int64", "versions": "3+", "default": "-1", "taggedVersions": "3+", "tag": 3,
"about": "The start offset of the active chunk of this partition. Only valid when the partition has chunks." },
{ "name": "StartTimestamp", "type": "int64", "versions": "3+", "default": "-1", "taggedVersions": "3+", "tag": 4,
"about": "The start timestamp of the active chunk of this partition. Only valid when the partition has chunks." }
]
}
Changes to PartitionChangeRecord
{
"apiKey": 5,
"type": "metadata",
"name": "PartitionChangeRecord",
// Version 1 adds Directories for KIP-858.
// Version 2 implements Eligible Leader Replicas and LastKnownElr as described in KIP-966.
// Version 3 adds StartOffset and StartTimestamp for KIP-1114.
"validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
"about": "The partition id." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique ID of this topic." },
...
{ "name": "LastKnownElr", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 7,
"about": "null if the LastKnownElr didn't change; the last known eligible leader replicas otherwise." },
{ "name": "StartOffset", "type": "int64", "versions": "3+", "taggedVersions": "3+", "tag": 9, "default": "-1",
"about": "The start offset of the partition. Only valid when the partition has chunks." },
{ "name": "StartTimestamp", "type": "int64", "versions": "3+", "taggedVersions": "3+", "tag": 10, "default": "-1",
"about": "The start timestamp of the partition. Only valid when the partition has chunks." }
]
}
Changes to MetadataResponse
{
"apiKey": 3,
"type": "response",
"name": "MetadataResponse",
// Version 1 adds fields for the rack of each broker, the controller id, and
// whether or not the topic is internal.
// Version 13 adds the Chunks field to the partition metadata (KIP-1114).
"validVersions": "0-13",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Brokers", "type": "[]MetadataResponseBroker", "versions": "0+",
"about": "A list of brokers present in the cluster.", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId",
"about": "The broker ID." },
{ "name": "Host", "type": "string", "versions": "0+",
"about": "The broker hostname." },
{ "name": "Port", "type": "int32", "versions": "0+",
"about": "The broker port." },
{ "name": "Rack", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true, "default": "null",
"about": "The rack of the broker, or null if it has not been assigned to a rack." }
]},
{ "name": "ClusterId", "type": "string", "nullableVersions": "2+", "versions": "2+", "ignorable": true, "default": "null",
"about": "The cluster ID that responding broker belongs to." },
{ "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, "entityType": "brokerId",
"about": "The ID of the controller broker." },
{ "name": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+",
"about": "Each topic in the response.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The topic error, or 0 if there was no error." },
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "12+",
"about": "The topic name. Null for non-existing topics queried by ID. This is never null when ErrorCode is zero. One of Name and TopicId is always populated." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic id. Zero for non-existing topics queried by name. This is never zero when ErrorCode is zero. One of Name and TopicId is always populated." },
{ "name": "IsInternal", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
"about": "True if the topic is internal." },
{ "name": "Partitions", "type": "[]MetadataResponsePartition", "versions": "0+",
"about": "Each partition in the topic.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition error, or 0 if there was no error." },
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the leader broker." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true,
"about": "The leader epoch of this partition." },
{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The set of all nodes that host this partition." },
{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The set of nodes that are in sync with the leader for this partition." },
{ "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true, "entityType": "brokerId",
"about": "The set of offline replicas of this partition." },
{ "name": "StartOffset", "type": "int64", "versions": "13+", "default": "-1",
"about": "The start offset of the active chunk." },
{ "name": "Chunks", "type": "[]MetadataResponseChunk", "versions": "13+", "ignorable": true,
"about": "The set of chunks of this partition.", "fields": [
{ "name": "StartOffset", "type": "int64", "versions": "13+", "default": "-1",
"about": "The start offset of the chunk." },
{ "name": "StartTimestamp", "type": "int64", "versions": "13+", "default": "-1",
"about": "The timestamp of the start offset." },
{ "name": "StopOffset", "type": "int64", "versions": "13+", "default": "-1",
"about": "The stop offset of the chunk." },
{ "name": "EndOffset", "type": "int64", "versions": "13+", "default": "-1",
"about": "The end offset of the chunk." },
{ "name": "ReplicaNodes", "type": "[]int32", "versions": "13+", "entityType": "brokerId",
"about": "The set of all nodes that host this chunk." },
{ "name": "IsrNodes", "type": "[]int32", "versions": "13+", "entityType": "brokerId",
"about": "The set of nodes that holds a full copy of the chunk." }
]}
]},
{ "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this topic." }
]},
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this cluster." }
]
}
NEW: ChunkRecord
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 28,
"type": "metadata",
"name": "ChunkRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
"about": "The partition id." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique ID of this topic." },
{ "name": "StartOffset", "type": "int64", "versions": "0+", "default": "-1",
"about": "The offset that the read can start from (inclusive)." },
{ "name": "StartTimestamp", "type": "int64", "versions": "0+", "default": "-1",
"about": "The timestamp of the start offset (inclusive)." },
{ "name": "StopOffset", "type": "int64", "versions": "0+", "default": "-1",
"about": "The offset that the read should stop (inclusive)." },
{ "name": "EndOffset", "type": "int64", "versions": "0+", "default": "-1",
"about": "The offset that the replicate should stop (inclusive)." },
{ "name": "ChunkEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
"about": "The epoch of this chunk." },
{ "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The replicas of this partition, sorted by preferred order." },
{ "name": "Directories", "type": "[]uuid", "versions": "0+",
"about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."},
{ "name": "Isr", "type": "[]int32", "versions": "0+",
"about": "The in-sync replicas of this partition" },
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The replicas that we are in the process of removing." },
{ "name": "AddingReplicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The replicas that we are in the process of adding." },
{ "name": "Epoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "The epoch of the chunk." }
]
}
NEW: ChunkChangeRecord
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 29,
"type": "metadata",
"name": "ChunkChangeRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
"about": "The partition id." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique ID of this topic." },
{ "name": "StartOffset", "type": "int64", "versions": "0+", "default": "-1",
"about": "The offset that the read can start from (inclusive)." },
{ "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId", "taggedVersions": "0+", "tag": 0,
"about": "The replicas of this partition, sorted by preferred order." },
{ "name": "Directories", "type": "[]uuid", "default": "null", "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 1,
"about": "null if the log dirs didn't change; the new log directory for each replica otherwise."},
{ "name": "Isr", "type": "[]int32", "versions": "0+", "taggedVersions": "0+", "tag": 2,
"about": "The in-sync replicas of this partition" },
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId", "taggedVersions": "0+", "tag": 3,
"about": "The replicas that we are in the process of removing." },
{ "name": "AddingReplicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId", "taggedVersions": "0+", "tag": 4,
"about": "The replicas that we are in the process of adding." },
{ "name": "Epoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "The epoch of the chunk." }
]
}
NEW: CreateChunkRequest
{
"apiKey": 76,
"type": "request",
"listeners": ["broker", "controller"],
"name": "CreateChunksRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "TopicPartitions", "type": "[]CreateChunksTopicPartition", "versions": "0+",
"about": "Each topic partition that we want to create new chunks.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionId", "type": "int32", "versions": "0+",
"about": "The partition that will create chunk on." },
{ "name": "BrokerIds", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The new replica assigned broker IDs." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The start offset of the new chunk where the chunk replication should start from." },
{ "name": "StartTimestamp", "type": "int64", "versions": "0+",
"about": "The timestamp of the start offset." },
{ "name": "PrevStopOffset", "type": "int64", "versions": "0+",
"about": "The stop offset of the previous chunk where the chunk replication should stop." }
]}
]
}
NEW: CreateChunkResponse
{
"apiKey": 76,
"type": "response",
"name": "CreateChunksResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Results", "type": "[]CreateChunksTopicPartitionResult", "versions": "0+",
"about": "The chunk creation results for each topic partition.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionId", "type": "int32", "versions": "0+",
"about": "The partition index."},
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The result error, or zero if there was no error."},
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"default": "null", "about": "The result message, or null if there was no error."}
]}
]
}
NEW: AlterChunkAssignmentsRequest
{
"apiKey": 77,
"type": "request",
"listeners": ["controller"],
"name": "AlterChunkAssignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Chunks", "type": "[]AlterChunkAssignment", "versions": "0+",
"about": "Each chunk that we want to alter assignment.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionId", "type": "int32", "versions": "0+",
"about": "The partition of the chunk to be reassigned." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The start offset of the chunk to be reassigned." },
{ "name": "ChunkEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
"about": "The epoch of this chunk." },
{ "name": "BrokerIds", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The new replica assigned broker IDs." }
]}
]
}
NEW: AlterChunkAssignmentsResponse
{
"apiKey": 77,
"type": "response",
"name": "AlterChunkAssignmentsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Results", "type": "[]AlterChunkAssignmentResult", "versions": "0+",
"about": "The chunk creation results for each topic partition.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionId", "type": "int32", "versions": "0+",
"about": "The partition index."},
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The start offset of the chunk." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The result error, or zero if there was no error."},
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"default": "null", "about": "The result message, or null if there was no error."}
]}
]
}
Both the KafkaApis and ControllerApis need to add the function to handle the CreateChunkRequest.
The ControllerApis need to add the function to handle AlterChunkAssignmentsRequest.
A ChunkCommand will be created under org.apache.kafka.tools to send the CreateChunkRequest and AlterChunkAssignmentsRequest to Kafka server.
./kafka-chunks.sh [create|alter] --bootstrap-server <bootstrap-server> [--create-chunk-json-file <path to file> | --alter-chunk-json-file <path to file>]
Example create chunk json file:
{
"partitions": [{
"topic": "topic1",
"partition": 0,
"replicas": [1, 2, 3]
}, {
"topic": "topic2",
"partition": 2,
"replicas": [2, 9, 12]
}]
}
In this file on topic topic1 partition 0 we will create a new chunk on broker 1, 2, 3. And on topic topic2, partition 2 we will create a new chunk on broker 2, 9, and 12.
Example alter chunk assignment json file:
{
"chunks": [{
"topic": "topic1",
"partition": 0,
"startOffset": 289818,
"replicas": [4, 5, 6]
}, {
"topic": "topic1",
"partition": 0,
"startOffset": 27784,
"replicas": [1, 2, 3]
}]
}
In this file we will modify the replicas of topic topic1, partition 0 and the chunk with start offset 289818. The new replica set is 4, 5, 6. And on topic2, partition 0 and the chunk with start offset 27784 we will modify the replica set to 1, 2, 3.
The consumer will be updated to load the chunks field in MetadataResponse.
The consumer seek function will be updated to be able to seek from inactive chunks by offset or timestamp.
Compatibility, Deprecation, and Migration Plan
The chunk is an addition to the existing data plane. If no chunk is created Kafka should work the same way as before.
If chunk is created in the new version of Kafka server, the Kafka client may or may not work as expected:
Old version of producer: will work as like the server did a partition reassignment.
Old version of consumer:
- If the consumer has no lag and is reading most up to date data: it will continue read the data in the newly created inactive chunk then move to the active chunk.
- If the consumer trying to read from a earlier offset: it will miss the data in the inactive chunk and can only read the data in active chunk.
Test Plan
Reinforce existing TopicDelta test to make sure not changing existing behavior
A CreateChunkTest extends BaseRequestTest
Test with non-transactional producer and consumer
Setup:
- Create a 3 brokers cluster.
- Create a producer and consumer (auto.offset.reset=earliest).
- Create a topic with 1 partition on broker 0 only.
- Produce 10 messages.
- Create a chunk on broker 1.
- Produce another 10 messages.
Expected:
- Consumer poll once to read the first 10 messages from broker 0.
- Consumer poll again to read another 10 messages from broker 1.
Test with transactional producer and consumer
Setup:
- Create a 3 brokers cluster.
- Create a normal producer, a transactional producer and consumer (auto.offset.reset=earliest, isolation.level=read_committed).
- Create a topic with 1 partition on broker 0 only.
- Produce 10 messages from normal producer.
- Begin transaction and produce 10 messages from transactional producer but without commit.
- Create a chunk on broker 1.
- Commit the transaction.
Expected:
- Consumer poll once to read the first 10 messages from broker 0.
- Consumer poll again to read another 10 messages from broker 1.
An AlterChunkAssignmentsTest extends BaseRequestTest
Setup:
- Create a 3 brokers cluster.
- Create a topic with 1 partition on broker 0 only.
- Create a producer.
- Produce 10 messages to the topic.
- Create a new chunk on broker 1.
- Produce another 10 messages.
- Submit an AlterChunkAssignmentsRequest that move first chunk to broker 2.
- Create a consumer
Expected:
- Consumer poll once to read 10 messages from broker 2.
- Consumer poll again to read next 10 messages from broker 1.
Rejected Alternatives
Chunk as part of partition record
The chunk can be part of partition record. However this will make the partition record too big. Also any changes to chunk will cause a change to partition record. This is not necessary as the change to the inactive chunk has no impact to the active chunk.
Create chunk in the middle of the partition
Technically this is possible but provide little benefit but a lot harder to implement.
Manual chunk deletion
It is possible to reverse the chunk creation process but that is a complicated process with little benefit.
Promote current segment data offsets into metadata
it is possible to just promote current segment data info to metadata. However this may end with too many data in metadata. Also segment data is the actual implementation in storage domain and shouldn't leak to the metadata domain.
Not to rename the partition record and partition change record to active chunk record. Since the behavior of active chunk is almost identical to current partition. We could consider rename it or merge into the chunk record in the future once the chunk feature become mature.
Remarks
This KIP will focus on provide core chunk functionality only. More features related to chunk can be provided in future KIPs.