DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, safely resetting a Kafka consumer group's offset requires the group to be completely inactive. In a production environment, this means shutting down the entire service or disabling consumer instances - a process that often requires a code change. For critical applications, taking the system offline simply isn't an option. Yet, offset resets are a necessary tool for recovering from client-side failures, such as data loss. Because of this downtime limitation, teams managing large Kafka deployments are forced to build complex, custom control planes to reset offsets more effectively and safely.
Proposed Changes
With the new consumer group rebalance protocol, group assignment is now managed on the broker side. This gives us an opportunity to significantly improve the offset reset experience.
We can achieve this by adding a new API to the Admin client to "pause" partitions for a specific consumer group. Upon receiving this request, the Group Coordinator will unassign the specified partitions from the group and set them aside. By enabling this broker-managed "pausing" functionality, we eliminate the need for the entire consumer group to be inactive before performing an offset reset.
Finally, with this mechanism in place, we can enhance the kafka-consumer-groups --reset-offsets CLI tool to recognise paused partitions, allowing users to safely reset offsets for those specific topic-partitions at runtime.
Pausing functionality should work for CONSUMER("consumer") and STREAMS("streams") GroupType.
Public Interfaces
KRPC
PausePartitions
Request schema
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker"],
"name": "PausePartitionsRequest",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The unique group identifier." },
{ "name": "Topics", "type": "[]PausePartitionsRequestTopic", "versions": "0+",
"about": "The topics to pause.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PausePartitionsRequestPartition", "versions": "0+",
"about": "Each partition to pause.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." }
]
}
]
}
]
}
Response schema
{
"apiKey": TBD,
"type": "response",
"name": "PausePartitionsResponse",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]PausePartitionsResponseTopic", "versions": "0+",
"about": "The results for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique topic ID." },
{ "name": "Partitions", "type": "[]PausePartitionsResponsePartition", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." }
]}
]}
]
}
ResumePartitions
Request schema
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker"],
"name": "ResumePartitionsRequest",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The unique group identifier." },
{ "name": "Topics", "type": "[]ResumePartitionsRequestTopic", "versions": "0+",
"about": "The topics to resume.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]ResumePartitionsRequestPartition", "versions": "0+",
"about": "Each partition to resume.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." }
]
}
]
}
]
}
Response schema
{
"apiKey": TBD,
"type": "response",
"name": "ResumePartitionsResponse",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]ResumePartitionsResponseTopic", "versions": "0+",
"about": "The results for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique topic ID." },
{ "name": "Partitions", "type": "[]ResumePartitionsResponsePartition", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." }
]}
]}
]
}
ListPausedPartitions
Request schema
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker"],
"name": "ListPausedPartitionsRequest",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The unique group identifier." }
]
}
Response schema
{
"apiKey": TBD,
"type": "response",
"name": "ListPausedPartitionsResponse",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Topics", "type": "[]ListPausedPartitionsTopicResponse", "versions": "0+",
"about": "Each topic in the response.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]ListPausedPartitionsResponse", "versions": "0+",
"about": "Each partition in the response.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "Offset", "type": "int64", "versions": "1+", "default": "-1", "ignorable": false,
"about": "The returned offset." }
]}
]}
]
}
Records
This section describes the new record types required for pausing functionality. We define new record types that persist all data related to groups, and their paused partitions in the __consumer_offsets compacted topic. Persisting this state allows us to "remember" the paused partitions state even after broker restarts.
PausedTopicPartitionsKey
{
"apiKey": TBD,
"type": "coordinator-key",
"name": "PausedTopicPartitionsKey",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0",
"about": "The group ID." }
]
}
PausedTopicPartitionsValue
{
"apiKey": TBD,
"type": "coordinator-value",
"name": "PausedTopicPartitionsValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "PausedTopicPartitions", "type": "[]PausedTopicPartitions", "versions": "0+",
"about": "The set of paused partitions." }
],
"commonStructs": [
{ "name": "PausedTopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic Id." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partition Ids." }
]}
]
}
Admin API
As part of the proposal we will add the following methods on the Admin interface.
| Method Signature | Description | RPC Used |
|---|---|---|
PausePartitionsResult pausePartitions(String groupId, Set<TopicPartition> partitions) | Pause partitions for the consumer group. | PausePartitions |
PausePartitionsResult pausePartitions(String groupId, Set<TopicPartition> partitions, | Pause partitions for the consumer group. | PausePartitions |
ResumePartitionsResult resumePartitions(String groupId, Set<TopicPartition> partitions) | Resume partitions for the consumer group. | ResumePartitions |
ResumePartitionsResult resumePartitions(String groupId, Set<TopicPartition> partitions, | Resume partitions for the consumer group. | ResumePartitions |
ListPausedPartitionsResult listPausedPartitions(String groupId) | List paused partitions for the consumer group. | ListPausedPartitions |
ListPausedPartitionsResult listPausedPartitions(String groupId, | List paused partitions for the consumer group. | ListPausedPartitions |
Admin
PausePartitionsResult
PausePartitionsOptions
ResumePartitionsResult
ResumePartitionsOptions
ListPausedPartitionsResult
ListPausedPartitionsOptions
Command-line tools
kafka-consumer-groups.sh
We will add the following options to the kafka-consumer-groups.sh CLI tool.
Option | Description |
|---|---|
--pause | Pause partitions for the consumer group. |
--resume | Resume partitions for the consumer group. |
| --paused-partitions | List paused partitions for the consumer group. |
Examples:
Pause all the partitions for the given consumer group and topic
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --topic "example_topic" --pause
Pause specific partitions for the given consumer group and topic
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --topic "example_topic:0,1,2" --pause
Resume all the partitions for the given consumer group and topic
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --topic "example_topic" --resume
Resume specific partitions for the given consumer group and topic
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --topic "example_topic:0,1,2" --resume
List paused topic-partitions for the given consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --paused-partitions
Compatibility, Deprecation, and Migration Plan
The change is backwards compatible with previous versions. There's no change in existing functionality. The new offset reset functionality will be enabled when partitions are in a "paused" state which can only happen when invoking the new broker APIs presented in this KIP.
Test Plan
- Unit tests
- Integration tests with end to end flow
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.