Status
Current state: Accepted
Discussion thread: here
Voting thread: here
JIRA:
-
KAFKA-16758Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When a user shuts down a Kafka Streams application and tries to leave the associated consumer group using the `leaveGroup` option, this option has no effect.
The expected behavior is that the group member would shut down and leave the group, immediately triggering a consumer group rebalance. However, in practice, the rebalance occurs only after the configured timeout period has expired.
There are two mechanisms that cause this unexpected behavior:
- Generally, a consumer can leave a group by calling `consumer#close()`. However, in Kafka Streams, the internal configuration (`internal.leave.group.on.close`) prevents the consumer from sending the leave group request when calling `consumer#close()`. Additionally, because this internal config is immutable, it cannot be dynamically adjusted, further limiting its flexibility.
- The static group membership feature was introduced with a new Admin API that allows removing a static member from a group. This API is specifically designed to trigger a rebalance for static groups, which often have long session timeouts, when a member has crashed and is not expected to return.
These two independent features do not work well together. Additionally, while Kafka Streams provides the `KafkaStreams#close(CloseOption)` method to manage this, it currently only functions correctly for static group members due to the way the APIs were designed.
Public Interfaces
Our goal is to provide a more flexible mechanism for controlling whether a consumer leaves the group during shutdown, eliminating the reliance on the `internal.leave.group.on.close` configuration, which is currently used by Kafka Streams as a workaround. To achieve this, we will introduce a new public API in the `Consumer` interface, allowing callers to specify whether the consumer should leave the group when `close` is called.
The following public interface will be modified:
- org.apache.kafka.clients.consumer.Consumer
We propose adding a new method and an inner class to the Consumer interface:
- Consumer#close(CloseOption options): This method allows the caller to specify shutdown behavior using the `CloseOption` class.
- Consumer.CloseOption class: This inner class contains the options that control the shutdown process.
The `CloseOption` class will have two fields:
- groupMembershipOperation (GroupMembershipOperation): This field is an enum that specifies whether the consumer should send a `LeaveGroup` request upon shutdown. The available options in the enum are:
- LEAVE_GROUP: The consumer will send a leaveGroup request and trigger a rebalance.
- REMAIN_IN_GROUP: The consumer will remain in the group and not trigger a rebalance.
- DEFAULT: If no value is set, the default behavior will be applied, which may differ depending on whether the consumer is a static or dynamic member.
- timeout (Duration): Specifies the maximum amount of time to wait for the shutdown process to complete. This allows users to define a custom timeout for gracefully stopping the consumer. If no value is set, the default timeout
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS
(30s) will be applied.
The `CloseOption` class will provide two static factory methods to create instances:
- CloseOption.groupMembershipOperation(GroupMembershipOperation operation): This method allows users to set the desired group membership behavior during shutdown.
- CloseOption.timeout(Duration timeout): This method allows users to set a custom timeout for the shutdown process.
Additionally, the class will support fluent API methods for more flexibility:
- CloseOption.withTimeout(Duration timeout): This method allows users to set the timeout using a fluent API.
- CloseOption.withGroupMembershipOperation(GroupMembershipOperation operation): This method allows users to set the group membership operation using a fluent API.
This new API will provide a more flexible and explicit way to control consumer group membership and shutdown behavior, benefiting both static and non-static consumers.
public interface Consumer<K, V> extends Closeable { // omit the exisiting class members /** * This method has been deprecated since Kafka 4.0 and should use {@link Consumer#close(CloseOption)} instead. * * @see KafkaConsumer#close(Duration) */ @Deprecated void close(Duration timeout); /** * This method allows the caller to specify shutdown behavior using the {@link CloseOption} class. * If {@code null} is provided, the default behavior will be applied, equivalent to providing a new {@link CloseOption} instance. * * @param options see {@link CloseOption} */ void close(final CloseOption option); class CloseOption { /** * Enum to specify the group membership operation upon shutdown. * {@code LEAVE_GROUP} means the consumer will leave the group. * {@code REMAIN_IN_GROUP} means the consumer will remain in the group. * {@code DEFAULT} applies the default behavior, which may depend on whether the consumer is static or dynamic. */ public enum GroupMembershipOperation { LEAVE_GROUP, REMAIN_IN_GROUP, DEFAULT } /** * Specifies the group membership operation upon shutdown. * By default, {@code GroupMembershipOperation.DEFAULT} will be applied, which follows the consumer's default behavior. */ protected GroupMembershipOperation operation = GroupMembershipOperation.DEFAULT; /** * Specifies the maximum amount of time to wait for the close process to complete. * This allows users to define a custom timeout for gracefully stopping the consumer. * If no value is set, the default timeout {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied. */ protected Optional<Duration> timeout = Optional.empty(); private CloseOption() { } protected CloseOption(final CloseOption option) { this.operation = option.operation; this.timeout = option.timeout; } /** * Static method to create a {@code CloseOption} with a custom timeout. * * @param timeout the maximum time to wait for the consumer to close. * @return a new {@code CloseOption} instance with the specified timeout. */ public static CloseOption timeout(final Duration timeout) { CloseOption option = new CloseOption(); option.timeout = Optional.ofNullable(timeout); return option; } /** * Static method to create a {@code CloseOption} with a specified group membership operation. * * @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}. * @return a new {@code CloseOption} instance with the specified group membership operation. */ public static CloseOption groupMembershipOperation(final GroupMembershipOperation operation) { CloseOption option = new CloseOption(); option.operation = operation; return option; } /** * Fluent method to set the timeout for the close process. * * @param timeout the maximum time to wait for the consumer to close. If {@code null}, the default timeout will be used. * @return this {@code CloseOption} instance. */ public CloseOption withTimeout(final Duration timeout) { this.timeout = Optional.ofNullable(timeout); return this; } /** * Fluent method to set the group membership operation upon shutdown. * * @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}. * @return this {@code CloseOption} instance. */ public CloseOption withGroupMembershipOperation(final GroupMembershipOperation operation) { this.operation = operation; return this; } } }
/** * This class represents an internal version of {@link CloseOption}, used for internal processing of consumer shutdown options. * * <p>While {@link CloseOption} is the user-facing class, {@code CloseOptionInternal} is intended for accessing internal fields * and performing operations within the Kafka codebase. This class should not be used directly by users. * It extends {@link CloseOption} and provides getters for the {@link GroupMembershipOperation} and timeout values.</p> */ public class CloseOptionInternal extends CloseOption { CloseOptionInternal(final CloseOption option) { super(option); } public GroupMembershipOperation groupMembershipOperation() { return operation; } public Optional<Duration> timeout() { return timeout; } }
Proposed Changes
We propose adding a new public API to the `Consumer` interface that allows the caller to explicitly opt in or out of sending a `leaveGroup` request when `close()` is called. This new API will be similar to the `KafkaStreams#close(CloseOption)` method and will allow more dynamic control over group membership management.
The key advantages of this proposed API are:
- It will provide plain consumer applications with the ability to opt out of leaving the group when closed, which is currently not possible through any public API.
- The new API allows the caller to dynamically select the appropriate behavior based on the context of the shutdown. For example, a consumer may not need to leave the group during a simple restart, but should leave the group during a permanent shutdown or node scaling down. This dynamic behavior is currently not possible because internal configurations are immutable.
- This change will also make it possible to adjust the behavior of the `org.apache.kafka.streams.KafkaStreams#close(CloseOption)` API so that the user's choice to leave the group during close will be respected for non-static members, extending the current functionality to be more flexible.
Compatibility, Deprecation, and Migration Plan
The introduction of the `Consumer#close(CloseOption options)` method provides a more flexible and configurable way for users to specify shutdown behavior. This method not only allows users to define a custom `Duration` the shutdown process but also introduces the `GroupMembershipOperation` parameter, giving greater control over consumer group membership management during shutdown.
In light of this enhancement, the `Consumer#close(Duration timeout)` method will be deprecated starting with Kafka 4.1. The deprecation aims to guide users towards the more versatile `Consumer#close(CloseOption options)` method.
The deprecation of `Consumer#close(Duration timeout)` will be documented in the API and source code, and users will be strongly encouraged to use the CloseOption-based method for the following reasons:
- Flexibility: The new method not only allows setting a shutdown timeout but also provides control over whether the consumer should leave the group.
- Future-proof: The `CloseOption` class can be extended with additional fields in the future, without introducing new methods, thus offering a more scalable solution.
Migration Plan
To avoid disruption, the deprecated `Consumer#close(Duration timeout)` method will remain available for a period of time. Users will be advised to migrate to the new method by:
- Replacing calls to `Consumer#close(Duration timeout)` with `Consumer#close(CloseOption options)`.
- Adopting the new `CloseOption` class to take advantage of the group membership operation options.
// Old usage consumer.close(Duration.ofSeconds(30)); // New usage consumer.close(CloseOption.timeout(Duration.ofSeconds(30)) .withGroupMembershipOperation(GroupMembershipOperation.DEFAULT)); // New usage (shorter version) consumer.close(CloseOption.timeout(Duration.ofSeconds(30))); // Note: The shorter version is equivalent to the full version since GroupMembershipOperation.DEFAULT is applied by default if not specified.
Removal of Deprecated Method
The deprecated `Consumer#close(Duration timeout)` method will be scheduled for removal in a future major release, potentially in Kafka 5.0. Users will be informed of this through deprecation warnings and documentation updates, ensuring they have sufficient time to migrate their codebases.
Test Plan
The proposed changes to the `Consumer#close(CloseOption)` API will be verified through the following tests:
- Functional Testing:
- leaveGroup(LEAVE_GROUP)
- Verify that the consumer successfully leaves the group when `close()` is called with `leaveGroup(LEAVE_GROUP)`, and a rebalance is triggered.
- Confirm that the consumer leaves the group without waiting for the timeout to expire, and the rebalance process completes as expected.
- leaveGroup(REMAIN_IN_GROUP)
- Verify that the consumer remains in the group when `close()` is called with `leaveGroup(REMAIN_IN_GROUP)`, and no rebalance is triggered.
- Ensure that the consumer does not leave the group until timeout and that no unnecessary task movements occur.
- leaveGroup(DEFAULT)
- For dynamic members: Verify that the consumer leaves the group upon close, triggering a rebalance.
- For static members: Verify that the consumer remains in the group, and the group remains stable.
- leaveGroup(LEAVE_GROUP)
- Timeout Testing
- Custom Timeout
- Test that when a custom `timeout` is provided via `CloseOption.timeout()` or `withTimeout()`, the application waits for the specified duration for all consumer threads to shut down gracefully.
- Ensure that when a shorter timeout is specified, the shutdown process respects this limit and reaches the correct state based on whether the shutdown completes within the specified timeout.
- Default Timeout
- Confirm that when no `timeout` is explicitly set, the default value (30s) is applied, allowing the system to take an indefinite amount of time to shut down gracefully.
- Verify that when the default timeout is applied, `close()` waits for all threads to stop without prematurely returning due to a timeout.
- Custom Timeout
- System Testing:
- Test how the consumer behaves during a rolling restart of the Kafka cluster. Verify that the group membership status reflects the options specified in `CloseOption`, and the behavior aligns with the expected outcome for both static and non-static group members.
- Ensure that the `leaveGroup` and `timeout` options work correctly with multiple consumers.
- Compatibility Testing:
- Verify that the new `Consumer#close(CloseOption)` API works seamlessly with both static and non-static group members, ensuring proper group membership handling across different configurations.
- Test compatibility across different versions of the Kafka Consumer to ensure no backward compatibility issues arise from the introduction of the new API.
Rejected Alternatives
- Introducing an Internal API Instead of a Public API:
- One alternative was to introduce an internal API that mimics the behavior of the existing internal configuration `internal.leave.group.on.close`, and use it specifically for Kafka Streams without exposing it as a public API. This would involve adding an internal overload to the `close()` method or creating an internal API like `leaveGroupOnClose()` that could be toggled on and off by Kafka Streams.
- This approach was rejected because it would limit the flexibility and transparency of the solution. While it could avoid the complexity of introducing a public API through a KIP, it would continue to rely on internal configurations that are not intended for general use by consumer applications. Moreover, keeping this functionality internal would restrict its benefits to Kafka Streams only, rather than making it available to all consumer applications. A public API, though more involved to implement, provides a cleaner and more scalable solution that can be used across all Kafka clients, including but not limited to Kafka Streams.
- Waiting for Mutable Configurations
- Another alternative discussed was to wait for a future enhancement to Kafka that would introduce mutable configurations, allowing the `internal.leave.group.on.close` setting to be dynamically adjusted.
- This idea was rejected because such an enhancement is not expected to be implemented soon, and it would not solve the immediate need for more flexible control over consumer group membership during shutdown. The goal of this KIP is to provide a near-term solution that addresses current limitations, whereas waiting for mutable configurations would leave the problem unresolved for an indeterminate period of time.