DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
These two independent features do not work well together. Additionally, while Kafka Streams provides the `KafkaStreams#close(CloseOptionCloseOptions)` method to manage this, it currently only functions correctly for static group members due to the way the APIs were designed.
...
We propose adding a new method and an inner class to the Consumer interface:
- Consumer#close(CloseOption CloseOptions options): This method allows the caller to specify shutdown behavior using the `CloseOption` `CloseOptions` class.
- Consumer.CloseOption CloseOptions class: This inner class contains the options that control the shutdown process.
The `CloseOption` `CloseOptions` class will have two fields:
- groupMembershipOperation (GroupMembershipOperation): This field is an enum that specifies whether the consumer should send a `LeaveGroup` request upon shutdown. The available options in the enum are:
- LEAVE_GROUP: The consumer will send a leaveGroup request and trigger a rebalance.
- REMAIN_IN_GROUP: The consumer will remain in the group and not trigger a rebalance.
- DEFAULT: If no value is set, the default behavior will be applied, which may differ depending on whether the consumer is a static or dynamic member.
- timeout (Duration): Specifies the maximum amount of time to wait for the shutdown process to complete. This allows users to define a custom timeout for gracefully stopping the consumer. If no value is set, the default timeout
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS(30s) will be applied.
The `CloseOption` `CloseOptions` class will provide two static factory methods to create instances:
- CloseOptionCloseOptions.groupMembershipOperation(GroupMembershipOperation operation): This method allows users to set the desired group membership behavior during shutdown.
- CloseOptionCloseOptions.timeout(Duration timeout): This method allows users to set a custom timeout for the shutdown process.
Additionally, the class will support fluent API methods for more flexibility:
- CloseOptionCloseOptions.withTimeout(Duration timeout): This method allows users to set the timeout using a fluent API.
- CloseOptionCloseOptions.withGroupMembershipOperation(GroupMembershipOperation operation): This method allows users to set the group membership operation using a fluent API.
...
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
public interface Consumer<K, V> extends Closeable {
// omit the exisiting class members
/**
* This method has been deprecated since Kafka 4.0 and should use {@link Consumer#close(CloseOption)} instead.
*
* @see KafkaConsumer#close(Duration)
*/
@Deprecated
void close(Duration timeout);
/**
* This method allows the caller to specify shutdown behavior using the {@link CloseOptionCloseOptions} class.
* If {@code null} is provided, the default behavior will be applied, equivalent to providing a new {@link CloseOptionCloseOptions} instance.
*
* @param options see {@link CloseOptionCloseOptions}
*/
void close(final CloseOptionCloseOptions option);
class CloseOptionCloseOptions {
/**
* Enum to specify the group membership operation upon shutdown.
* {@code LEAVE_GROUP} means the consumer will leave the group.
* {@code REMAIN_IN_GROUP} means the consumer will remain in the group.
* {@code DEFAULT} applies the default behavior, which may depend on whether the consumer is static or dynamic.
*/
public enum GroupMembershipOperation {
LEAVE_GROUP,
REMAIN_IN_GROUP,
DEFAULT
}
/**
* Specifies the group membership operation upon shutdown.
* By default, {@code GroupMembershipOperation.DEFAULT} will be applied, which follows the consumer's default behavior.
*/
protected GroupMembershipOperation operation = GroupMembershipOperation.DEFAULT;
/**
* Specifies the maximum amount of time to wait for the close process to complete.
* This allows users to define a custom timeout for gracefully stopping the consumer.
* If no value is set, the default timeout {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
*/
protected Optional<Duration> timeout = Optional.empty();
private CloseOptionCloseOptions() {
}
protected CloseOptionCloseOptions(final CloseOptionCloseOptions option) {
this.operation = option.operation;
this.timeout = option.timeout;
}
/**
* Static method to create a {@code CloseOptionCloseOptions} with a custom timeout.
*
* @param timeout the maximum time to wait for the consumer to close.
* @return a new {@code CloseOptionCloseOptions} instance with the specified timeout.
*/
public static CloseOptionCloseOptions timeout(final Duration timeout) {
CloseOptionCloseOptions option = new CloseOptionCloseOptions();
option.timeout = Optional.ofNullable(timeout);
return option;
}
/**
* Static method to create a {@code CloseOptionCloseOptions} with a specified group membership operation.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
* @return a new {@code CloseOptionCloseOptions} instance with the specified group membership operation.
*/
public static CloseOptionCloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
CloseOptionCloseOptions option = new CloseOptionCloseOptions();
option.operation = operation;
return option;
}
/**
* Fluent method to set the timeout for the close process.
*
* @param timeout the maximum time to wait for the consumer to close. If {@code null}, the default timeout will be used.
* @return this {@code CloseOptionCloseOptions} instance.
*/
public CloseOptionCloseOptions withTimeout(final Duration timeout) {
this.timeout = Optional.ofNullable(timeout);
return this;
}
/**
* Fluent method to set the group membership operation upon shutdown.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
* @return this {@code CloseOptionCloseOptions} instance.
*/
public CloseOptionCloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
this.operation = operation;
return this;
}
}
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
/**
* This class represents an internal version of {@link CloseOption}, used for internal processing of consumer shutdown options.
*
* <p>While {@link CloseOption} is the user-facing class, {@code CloseOptionInternal} is intended for accessing internal fields
* and performing operations within the Kafka codebase. This class should not be used directly by users.
* It extends {@link CloseOption} and provides getters for the {@link GroupMembershipOperation} and timeout values.</p>
*/
public class CloseOptionInternal extends CloseOption {
CloseOptionInternal(final CloseOption option) {
super(option);
}
public GroupMembershipOperation groupMembershipOperation() {
return operation;
}
public Optional<Duration> timeout() {
return timeout;
}
} |
Proposed Proposed Changes
We propose adding a new public API to the `Consumer` interface that allows the caller to explicitly opt in or out of sending a `leaveGroup` request when `close()` is called. This new API will be similar to the `KafkaStreams#close(CloseOptionCloseOptions)` method and will allow more dynamic control over group membership management.
...
- It will provide plain consumer applications with the ability to opt out of leaving the group when closed, which is currently not possible through any public API.
- The new API allows the caller to dynamically select the appropriate behavior based on the context of the shutdown. For example, a consumer may not need to leave the group during a simple restart, but should leave the group during a permanent shutdown or node scaling down. This dynamic behavior is currently not possible because internal configurations are immutable.
- This change will also make it possible to adjust the behavior of the `org.apache.kafka.streams.KafkaStreams#close(CloseOptionCloseOptions)` API so that the user's choice to leave the group during close will be respected for non-static members, extending the current functionality to be more flexible.
...
The introduction of the `Consumer#close(CloseOption CloseOptions options)` method provides a more flexible and configurable way for users to specify shutdown behavior. This method not only allows users to define a custom `Duration` the shutdown process but also introduces the `GroupMembershipOperation` parameter, giving greater control over consumer group membership management during shutdown.
In light of this enhancement, the `Consumer#close(Duration timeout)` method will be deprecated starting with Kafka 4.1. The deprecation aims to guide users towards the more versatile `Consumer#close(CloseOption CloseOptions options)` method.
The deprecation of `Consumer#close(Duration timeout)` will be documented in the API and source code, and users will be strongly encouraged to use the CloseOptionCloseOptions-based method for the following reasons:
- Flexibility: The new method not only allows setting a shutdown timeout but also provides control over whether the consumer should leave the group.
- Future-proof: The `CloseOption` `CloseOptions` class can be extended with additional fields in the future, without introducing new methods, thus offering a more scalable solution.
...
- Replacing calls to `Consumer#close(Duration timeout)` with ` with `Consumer#close(CloseOption CloseOptions options)`.
- Adopting the new `CloseOption` `CloseOptions` class to take advantage of the group membership operation options.
...
| Code Block | ||||
|---|---|---|---|---|
| ||||
// Old usage consumer.close(Duration.ofSeconds(30)); // New usage consumer.close(CloseOptionCloseOptions.timeout(Duration.ofSeconds(30)) .withGroupMembershipOperation(GroupMembershipOperation.DEFAULT)); // New usage (shorter version) consumer.close(CloseOptionCloseOptions.timeout(Duration.ofSeconds(30))); // Note: The shorter version is equivalent to the full version since GroupMembershipOperation.DEFAULT is applied by default if not specified. |
...
The proposed changes to the `Consumer#close(CloseOptionCloseOptions)` API will be verified through the following tests:
- Functional Testing:
- leaveGroup(LEAVE_GROUP)
- Verify that the consumer successfully leaves the group when `close()` is called with `leaveGroup(LEAVE_GROUP)`, and a rebalance is triggered.
- Confirm that the consumer leaves the group without waiting for the timeout to expire, and the rebalance process completes as expected.
- leaveGroup(REMAIN_IN_GROUP)
- Verify that the consumer remains in the group when `close()` is called with `leaveGroup(REMAIN_IN_GROUP)`, and no rebalance is triggered.
- Ensure that the consumer does not leave the group until timeout and that no unnecessary task movements occur.
- leaveGroup(DEFAULT)
- For dynamic members: Verify that the consumer leaves the group upon close, triggering a rebalance.
- For static members: Verify that the consumer remains in the group, and the group remains stable.
- leaveGroup(LEAVE_GROUP)
- Timeout Testing
- Custom Timeout
- Test that when a custom `timeout` is provided via `CloseOption`CloseOptions.timeout()` or `withTimeout()`, the application waits for the specified duration for all consumer threads to shut down gracefully.
- Ensure that when a shorter timeout is specified, the shutdown process respects this limit and reaches the correct state based on whether the shutdown completes within the specified timeout.
- Default Timeout
- Confirm that when no `timeout` is explicitly set, the default value (30s) is applied, allowing the system to take an indefinite amount of time to shut down gracefully.
- Verify that when the default timeout is applied, `close()` waits for all threads to stop without prematurely returning due to a timeout.
- Custom Timeout
- System Testing:
- Test how the consumer behaves during a rolling restart of the Kafka cluster. Verify that the group membership status reflects the options specified in `CloseOption``CloseOptions`, and the behavior aligns with the expected outcome for both static and non-static group members.
- Ensure that the `leaveGroup` and `timeout` options work correctly with multiple consumers.
- Compatibility Testing:
- Verify that the new `Consumer#close(CloseOptionCloseOptions)` API works seamlessly with both static and non-static group members, ensuring proper group membership handling across different configurations.
- Test compatibility across different versions of the Kafka Consumer to ensure no backward compatibility issues arise from the introduction of the new API.
...