Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, safely resetting a Kafka consumer group's offset requires the group to be completely inactive. In a production environment, this means shutting down the entire service or disabling consumer instances - a process that often requires a code change. For critical applications, taking the system offline simply isn't an option. Yet, offset resets are a necessary tool for recovering from client-side failures, such as data loss. Because of this downtime limitation, teams managing large Kafka deployments are forced to build complex, custom control planes to reset offsets more effectively and safely.

Proposed Changes

With the new consumer group rebalance protocol, group assignment is now managed on the broker side. This gives us an opportunity to significantly improve the offset reset experience.

We can achieve this by adding a new API to the Admin client to "pause" partitions for a specific consumer group. Upon receiving this request, the Group Coordinator will unassign the specified partitions from the group and set them aside. By enabling this broker-managed "pausing" functionality, we eliminate the need for the entire consumer group to be inactive before performing an offset reset.

Finally, with this mechanism in place, we can enhance the kafka-consumer-groups --reset-offsets  CLI tool to recognise paused partitions, allowing users to safely reset offsets for those specific topic-partitions at runtime.

Pausing functionality should work for CONSUMER("consumer") and STREAMS("streams") GroupType.


Public Interfaces

KRPC

PausePartitions

Request schema

{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "PausePartitionsRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The unique group identifier." },
    { "name": "Topics", "type": "[]PausePartitionsRequestTopic", "versions": "0+",
      "about": "The topics to pause.", "fields": [
      { "name": "Name",  "type": "string",  "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PausePartitionsRequestPartition", "versions": "0+",
        "about": "Each partition to pause.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]
      }
    ]
    }
  ]
}

Response schema

{
  "apiKey": TBD,
  "type": "response",
  "name": "PausePartitionsResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "ThrottleTimeMs",  "type": "int32",  "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]PausePartitionsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]PausePartitionsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ResumePartitions

Request schema

{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "ResumePartitionsRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The unique group identifier." },
    { "name": "Topics", "type": "[]ResumePartitionsRequestTopic", "versions": "0+",
      "about": "The topics to resume.", "fields": [
      { "name": "Name",  "type": "string",  "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]ResumePartitionsRequestPartition", "versions": "0+",
        "about": "Each partition to resume.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]
      }
    ]
    }
  ]
}

Response schema

{
  "apiKey": TBD,
  "type": "response",
  "name": "ResumePartitionsResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "ThrottleTimeMs",  "type": "int32",  "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]ResumePartitionsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]ResumePartitionsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ListPausedPartitions

Request schema

{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "ListPausedPartitionsRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The unique group identifier." }
  ]
}

Response schema

{
  "apiKey": TBD,
  "type": "response",
  "name": "ListPausedPartitionsResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]ListPausedPartitionsTopicResponse", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]ListPausedPartitionsResponse", "versions": "0+",
        "about": "Each partition in the response.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Offset", "type": "int64", "versions": "1+", "default": "-1", "ignorable": false,
          "about": "The returned offset." }
      ]}
    ]}
  ]
}

Records

This section describes the new record types required for pausing functionality. We define new record types that persist all data related to groups, and their paused partitions in the __consumer_offsets compacted topic. Persisting this state allows us to "remember" the paused partitions state even after broker restarts.

PausedTopicPartitionsKey

{
  "apiKey": TBD,
  "type": "coordinator-key",
  "name": "PausedTopicPartitionsKey",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group ID." }
  ]
}

PausedTopicPartitionsValue

{
  "apiKey": TBD,
  "type": "coordinator-value",
  "name": "PausedTopicPartitionsValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PausedTopicPartitions", "type": "[]PausedTopicPartitions", "versions": "0+",
      "about": "The set of paused partitions." }
  ],
  "commonStructs": [
    { "name": "PausedTopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic Id." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partition Ids." }
    ]}
  ]
}

Admin API

As part of the proposal we will add the following methods on the Admin interface.

Method SignatureDescriptionRPC Used
PausePartitionsResult pausePartitions(String groupId, Set<TopicPartition> partitions)
Pause partitions for the consumer group.PausePartitions
PausePartitionsResult pausePartitions(String groupId, Set<TopicPartition> partitions,
PausePartitionsOptions options);
Pause partitions for the consumer group.PausePartitions
ResumePartitionsResult resumePartitions(String groupId, Set<TopicPartition> partitions)
Resume partitions for the consumer group.ResumePartitions
ResumePartitionsResult resumePartitions(String groupId, Set<TopicPartition> partitions,
ResumePartitionsOptions options);
Resume partitions for the consumer group. ResumePartitions
ListPausedPartitionsResult listPausedPartitions(String groupId)
List paused partitions for the consumer group.ListPausedPartitions
ListPausedPartitionsResult listPausedPartitions(String groupId, 
ListPausedPartitionsOptions options)
List paused partitions for the consumer group.ListPausedPartitions


Admin

    /**
     * Pause specific partitions for a consumer group in the cluster, with the default options.
     * <p>
     * This is a convenience method for {@link #pausePartitions(String, Set, PausePartitionsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupId    The ID of the consumer group.
     * @param partitions The partitions to pause.
     * @return The PausePartitionsResult.
     */
    default PausePartitionsResult pausePartitions(String groupId, Set<TopicPartition> partitions) {
        return pausePartitions(groupId, partitions, new PausePartitionsOptions());
    }

    /**
     * Pause specific partitions for a consumer group in the cluster.
     *
     * @param groupId    The ID of the consumer group.
     * @param partitions The partitions to pause.
     * @param options    The options to use when pausing the partitions.
     * @return The PausePartitionsResult.
     */
    PausePartitionsResult pausePartitions(String groupId,
                                          Set<TopicPartition> partitions,
                                          PausePartitionsOptions options);

    /**
     * Resume specific paused partitions for a consumer group in the cluster, with the default options.
     * <p>
     * This is a convenience method for {@link #resumePartitions(String, Set, ResumePartitionsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupId    The ID of the consumer group.
     * @param partitions The partitions to resume.
     * @return The ResumePartitionsResult.
     */
    default ResumePartitionsResult resumePartitions(String groupId, Set<TopicPartition> partitions) {
        return resumePartitions(groupId, partitions, new ResumePartitionsOptions());
    }

    /**
     * Resume specific paused partitions for a consumer group in the cluster.
     *
     * @param groupId    The ID of the consumer group.
     * @param partitions The partitions to resume.
     * @param options    The options to use when resuming the partitions.
     * @return The ResumePartitionsResult.
     */
    ResumePartitionsResult resumePartitions(String groupId,
                                            Set<TopicPartition> partitions,
                                            ResumePartitionsOptions options);

    /**
     * List the paused partitions for a consumer group in the cluster, with the default options.
     * <p>
     * This is a convenience method for {@link #listPausedPartitions(String, ListPausedPartitionsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupId The ID of the consumer group.
     * @return The ListPausedPartitionsResult.
     */
    default ListPausedPartitionsResult listPausedPartitions(String groupId) {
        return listPausedPartitions(groupId, new ListPausedPartitionsOptions());
    }

    /**
     * List the paused partitions for a consumer group in the cluster.
     *
     * @param groupId The ID of the consumer group.
     * @param options The options to use when describing the paused partitions.
     * @return The ListPausedPartitionsResult.
     */
    ListPausedPartitionsResult listPausedPartitions(String groupId, ListPausedPartitionsOptions options);

PausePartitionsResult

/**
 * The result of {@link Admin#pausePartitions(String, Set, PausePartitionsOptions)} .
 */
public class PausePartitionsResult {

    private final KafkaFuture<Map<TopicPartition, Errors>> future;

    PausePartitionsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
        this.future = future;
    }

    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();

        this.future.whenComplete((topicPartitions, throwable) -> {
            if (throwable != null) {
                result.completeExceptionally(throwable);
            } else if (!topicPartitions.containsKey(partition)) {
                result.completeExceptionally(new IllegalArgumentException(
                    "Pausing of the \"" + partition + "\" was not attempted"));
            } else {
                final Errors error = topicPartitions.get(partition);
                if (error == Errors.NONE) {
                    result.complete(null);
                } else {
                    result.completeExceptionally(error.exception());
                }
            }
        });

        return result;
    }

    /**
     * Return a future which succeeds if pausing of all the partitions succeed.
     */
    public KafkaFuture<Void> all() {
        return this.future.thenApply(topicPartitionErrorsMap ->  {
            List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
                                                                           .stream()
                                                                           .filter(e -> e.getValue() != Errors.NONE)
                                                                           .map(Map.Entry::getKey)
                                                                           .collect(Collectors.toList());
            for (Errors error : topicPartitionErrorsMap.values()) {
                if (error != Errors.NONE) {
                    throw error.exception(
                        "Failed pausing the following partitions: " + partitionsFailed);
                }
            }
            return null;
        });
    }
}


PausePartitionsOptions

/**
 * Options for {@link Admin#pausePartitions(String, Set, PausePartitionsOptions)}.
 */
public class PausePartitionsOptions extends AbstractOptions<PausePartitionsOptions> {
}


ResumePartitionsResult

/**
 * The result of {@link Admin#resumePartitions(String, Set, ResumePartitionsOptions)} .
 */
public class ResumePartitionsResult {

    private final KafkaFuture<Map<TopicPartition, Errors>> future;

    ResumePartitionsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
        this.future = future;
    }

    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();

        this.future.whenComplete((topicPartitions, throwable) -> {
            if (throwable != null) {
                result.completeExceptionally(throwable);
            } else if (!topicPartitions.containsKey(partition)) {
                result.completeExceptionally(new IllegalArgumentException(
                    "Resuming of the \"" + partition + "\" was not attempted"));
            } else {
                final Errors error = topicPartitions.get(partition);
                if (error == Errors.NONE) {
                    result.complete(null);
                } else {
                    result.completeExceptionally(error.exception());
                }
            }
        });

        return result;
    }

    /**
     * Return a future which succeeds if resuming of all the partitions succeed.
     */
    public KafkaFuture<Void> all() {
        return this.future.thenApply(topicPartitionErrorsMap ->  {
            List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
                                                                           .stream()
                                                                           .filter(e -> e.getValue() != Errors.NONE)
                                                                           .map(Map.Entry::getKey)
                                                                           .collect(Collectors.toList());
            for (Errors error : topicPartitionErrorsMap.values()) {
                if (error != Errors.NONE) {
                    throw error.exception(
                        "Failed resuming the following partitions: " + partitionsFailed);
                }
            }
            return null;
        });
    }
}


ResumePartitionsOptions

/**
 * Options for {@link Admin#resumePartitions(String, Set, ResumePartitionsOptions)}.
 */
public class ResumePartitionsOptions extends AbstractOptions<ResumePartitionsOptions> {
}


ListPausedPartitionsResult

/**
 * The result of {@link Admin#listPausedPartitions(String, ListPausedPartitionsOptions)}.
 */
public class ListPausedPartitionsResult {

    private final KafkaFuture<Map<TopicPartition, Long>> future;

    ListPausedPartitionsResult(KafkaFuture<Map<TopicPartition, Long>> future) {
        this.future = future;
    }

    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Long> partitionResult(final TopicPartition partition) {
        final KafkaFutureImpl<Long> result = new KafkaFutureImpl<>();

        this.future.whenComplete((topicPartitions, throwable) -> {
            if (throwable != null) {
                result.completeExceptionally(throwable);
            } else if (!topicPartitions.containsKey(partition)) {
                result.completeExceptionally(new IllegalArgumentException(
                    "Describing paused partitions for the \"" + partition + "\" was not attempted"));
            } else {
                result.complete(topicPartitions.get(partition));
            }
        });

        return result;
    }

    /**
     * Return a future.
     */
    public KafkaFuture<Map<TopicPartition, Long>> all() {
        return future;
    }
}


ListPausedPartitionsOptions

/**
 * Options for {@link Admin#listPausedPartitions(String, ListPausedPartitionsOptions)}.
 */
public class ListPausedPartitionsOptions extends AbstractOptions<ListPausedPartitionsOptions> {
}

Command-line tools

kafka-consumer-groups.sh

We will add the following options to the kafka-consumer-groups.sh  CLI tool.

Option

Description

--pause

Pause partitions for the consumer group.

--resume

Resume partitions for the consumer group.

--paused-partitions

List paused partitions for the consumer group.

Examples:

Pause all the partitions for the given consumer group and topic

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --topic "example_topic" --pause

Pause specific partitions for the given consumer group and topic

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --topic "example_topic:0,1,2" --pause

Resume all the partitions for the given consumer group and topic

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --topic "example_topic" --resume

Resume specific partitions for the given consumer group and topic

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --topic "example_topic:0,1,2" --resume

List paused topic-partitions for the given consumer group

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "example-cg" --paused-partitions


Compatibility, Deprecation, and Migration Plan

The change is backwards compatible with previous versions. There's no change in existing functionality. The new offset reset functionality will be enabled when partitions are in a "paused" state which can only happen when invoking the new broker APIs presented in this KIP.

Test Plan

  • Unit tests
  • Integration tests with end to end flow

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels