Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

These two independent features do not work well together. Additionally, while Kafka Streams provides the `KafkaStreams#close(CloseOptionCloseOptions)` method to manage this, it currently only functions correctly for static group members due to the way the APIs were designed.

...

We propose adding a new method and an inner class to the Consumer interface:

  • Consumer#close(CloseOption CloseOptions options): This method allows the caller to specify shutdown behavior using the `CloseOption` `CloseOptions` class.
  • Consumer.CloseOption CloseOptions class: This inner class contains the options that control the shutdown process.

The `CloseOption` `CloseOptions` class will have two fields:

  • groupMembershipOperation (GroupMembershipOperation): This field is an enum that specifies whether the consumer should send a `LeaveGroup` request upon shutdown. The available options in the enum are:
    • LEAVE_GROUP: The consumer will send a leaveGroup request and trigger a rebalance.
    • REMAIN_IN_GROUP: The consumer will remain in the group and not trigger a rebalance.
    • DEFAULT: If no value is set, the default behavior will be applied, which may differ depending on whether the consumer is a static or dynamic member.
  • timeout (Duration): Specifies the maximum amount of time to wait for the shutdown process to complete. This allows users to define a custom timeout for gracefully stopping the consumer. If no value is set, the default timeout ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS (30s) will be applied.

The `CloseOption` `CloseOptions` class will provide two static factory methods to create instances:

  • CloseOptionCloseOptions.groupMembershipOperation(GroupMembershipOperation operation): This method allows users to set the desired group membership behavior during shutdown.
  • CloseOptionCloseOptions.timeout(Duration timeout): This method allows users to set a custom timeout for the shutdown process.

Additionally, the class will support fluent API methods for more flexibility:

  • CloseOptionCloseOptions.withTimeout(Duration timeout): This method allows users to set the timeout using a fluent API.
  • CloseOptionCloseOptions.withGroupMembershipOperation(GroupMembershipOperation operation): This method allows users to set the group membership operation using a fluent API.

...

Code Block
languagejava
titleConsumer
linenumberstrue
public interface Consumer<K, V> extends Closeable {
    // omit the exisiting class members    

    /**
     * This method has been deprecated since Kafka 4.0 and should use {@link Consumer#close(CloseOption)} instead.
     *
     * @see KafkaConsumer#close(Duration)
     */
    @Deprecated
    void close(Duration timeout);

    /**
     * This method allows the caller to specify shutdown behavior using the {@link CloseOptionCloseOptions} class.
     * If {@code null} is provided, the default behavior will be applied, equivalent to providing a new {@link CloseOptionCloseOptions} instance.
     *
     * @param options see {@link CloseOptionCloseOptions}
     */
    void close(final CloseOptionCloseOptions option);

    class CloseOptionCloseOptions {
        /**
         * Enum to specify the group membership operation upon shutdown.
         * {@code LEAVE_GROUP} means the consumer will leave the group.
         * {@code REMAIN_IN_GROUP} means the consumer will remain in the group.
         * {@code DEFAULT} applies the default behavior, which may depend on whether the consumer is static or dynamic.
         */
        public enum GroupMembershipOperation {
            LEAVE_GROUP,
            REMAIN_IN_GROUP,
            DEFAULT
        }

        /**
         * Specifies the group membership operation upon shutdown.
         * By default, {@code GroupMembershipOperation.DEFAULT} will be applied, which follows the consumer's default behavior.
         */
        protected GroupMembershipOperation operation = GroupMembershipOperation.DEFAULT;

        /**
         * Specifies the maximum amount of time to wait for the close process to complete.
         * This allows users to define a custom timeout for gracefully stopping the consumer.
         * If no value is set, the default timeout {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
         */
        protected Optional<Duration> timeout = Optional.empty();

        private CloseOptionCloseOptions() {
        }

        protected CloseOptionCloseOptions(final CloseOptionCloseOptions option) {
            this.operation = option.operation;
            this.timeout = option.timeout;
        }

        /**
         * Static method to create a {@code CloseOptionCloseOptions} with a custom timeout.
         *
         * @param timeout the maximum time to wait for the consumer to close.
         * @return a new {@code CloseOptionCloseOptions} instance with the specified timeout.
         */
        public static CloseOptionCloseOptions timeout(final Duration timeout) {
            CloseOptionCloseOptions option = new CloseOptionCloseOptions();
            option.timeout = Optional.ofNullable(timeout);
            return option;
        }

        /**
         * Static method to create a {@code CloseOptionCloseOptions} with a specified group membership operation.
         *
         * @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
         * @return a new {@code CloseOptionCloseOptions} instance with the specified group membership operation.
         */
        public static CloseOptionCloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
            CloseOptionCloseOptions option = new CloseOptionCloseOptions();
            option.operation = operation;
            return option;
        }

        /**
         * Fluent method to set the timeout for the close process.
         *
         * @param timeout the maximum time to wait for the consumer to close. If {@code null}, the default timeout will be used.
         * @return this {@code CloseOptionCloseOptions} instance.
         */
        public CloseOptionCloseOptions withTimeout(final Duration timeout) {
            this.timeout = Optional.ofNullable(timeout);
            return this;
        }

        /**
         * Fluent method to set the group membership operation upon shutdown.
         *
         * @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
         * @return this {@code CloseOptionCloseOptions} instance.
         */
        public CloseOptionCloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
            this.operation = operation;
            return this;
        }
    }
}
Code Block
languagejava
titleCloseOptionInternal
linenumberstrue
/**
 * This class represents an internal version of {@link CloseOption}, used for internal processing of consumer shutdown options.
 *
 * <p>While {@link CloseOption} is the user-facing class, {@code CloseOptionInternal} is intended for accessing internal fields 
 * and performing operations within the Kafka codebase. This class should not be used directly by users. 
 * It extends {@link CloseOption} and provides getters for the {@link GroupMembershipOperation} and timeout values.</p>
 */
public class CloseOptionInternal extends CloseOption {
    CloseOptionInternal(final CloseOption option) {
        super(option);
    }

    public GroupMembershipOperation groupMembershipOperation() {
        return operation;
    }

    public Optional<Duration> timeout() {
        return timeout;
    }
}



Proposed Proposed Changes

We propose adding a new public API to the `Consumer` interface that allows the caller to explicitly opt in or out of sending a `leaveGroup` request when `close()` is called. This new API will be similar to the `KafkaStreams#close(CloseOptionCloseOptions)` method and will allow more dynamic control over group membership management.

...

  • It will provide plain consumer applications with the ability to opt out of leaving the group when closed, which is currently not possible through any public API.
  • The new API allows the caller to dynamically select the appropriate behavior based on the context of the shutdown. For example, a consumer may not need to leave the group during a simple restart, but should leave the group during a permanent shutdown or node scaling down. This dynamic behavior is currently not possible because internal configurations are immutable.
  • This change will also make it possible to adjust the behavior of the `org.apache.kafka.streams.KafkaStreams#close(CloseOptionCloseOptions)` API so that the user's choice to leave the group during close will be respected for non-static members, extending the current functionality to be more flexible.

...

The introduction of the `Consumer#close(CloseOption CloseOptions options)` method provides a more flexible and configurable way for users to specify shutdown behavior. This method not only allows users to define a custom `Duration` the shutdown process but also introduces the `GroupMembershipOperation` parameter, giving greater control over consumer group membership management during shutdown.

In light of this enhancement, the `Consumer#close(Duration timeout)` method will be deprecated starting with Kafka 4.1. The deprecation aims to guide users towards the more versatile `Consumer#close(CloseOption CloseOptions options)` method.

The deprecation of `Consumer#close(Duration timeout)` will be documented in the API and source code, and users will be strongly encouraged to use the CloseOptionCloseOptions-based method for the following reasons:

  1. Flexibility: The new method not only allows setting a shutdown timeout but also provides control over whether the consumer should leave the group.
  2. Future-proof: The `CloseOption` `CloseOptions` class can be extended with additional fields in the future, without introducing new methods, thus offering a more scalable solution.

...

  1. Replacing calls to `Consumer#close(Duration timeout)` with ` with `Consumer#close(CloseOption CloseOptions options)`.
  2. Adopting the new `CloseOption` `CloseOptions` class to take advantage of the group membership operation options.

...

Code Block
languagejava
titleExample
// Old usage
consumer.close(Duration.ofSeconds(30));

// New usage
consumer.close(CloseOptionCloseOptions.timeout(Duration.ofSeconds(30))
	.withGroupMembershipOperation(GroupMembershipOperation.DEFAULT));

// New usage (shorter version)
consumer.close(CloseOptionCloseOptions.timeout(Duration.ofSeconds(30)));

// Note: The shorter version is equivalent to the full version since GroupMembershipOperation.DEFAULT is applied by default if not specified.

...

The proposed changes to the `Consumer#close(CloseOptionCloseOptions)` API will be verified through the following tests:

  • Functional Testing:
    • leaveGroup(LEAVE_GROUP) 
      • Verify that the consumer successfully leaves the group when `close()` is called with `leaveGroup(LEAVE_GROUP)`, and a rebalance is triggered.
      • Confirm that the consumer leaves the group without waiting for the timeout to expire, and the rebalance process completes as expected.
    • leaveGroup(REMAIN_IN_GROUP)
      • Verify that the consumer remains in the group when `close()` is called with `leaveGroup(REMAIN_IN_GROUP)`, and no rebalance is triggered.
      • Ensure that the consumer does not leave the group until timeout and that no unnecessary task movements occur.
    • leaveGroup(DEFAULT)
      • For dynamic members: Verify that the consumer leaves the group upon close, triggering a rebalance.
      • For static members: Verify that the consumer remains in the group, and the group remains stable.
  • Timeout Testing
    • Custom Timeout
      • Test that when a custom `timeout` is provided via `CloseOption`CloseOptions.timeout()` or `withTimeout()`, the application waits for the specified duration for all consumer threads to shut down gracefully.
      • Ensure that when a shorter timeout is specified, the shutdown process respects this limit and reaches the correct state based on whether the shutdown completes within the specified timeout.
    • Default Timeout
      • Confirm that when no `timeout` is explicitly set, the default value (30s) is applied, allowing the system to take an indefinite amount of time to shut down gracefully.
      • Verify that when the default timeout is applied, `close()` waits for all threads to stop without prematurely returning due to a timeout.
  • System Testing:
    • Test how the consumer behaves during a rolling restart of the Kafka cluster. Verify that the group membership status reflects the options specified in `CloseOption``CloseOptions`, and the behavior aligns with the expected outcome for both static and non-static group members.
    • Ensure that the `leaveGroup` and `timeout` options work correctly with multiple consumers.
  • Compatibility Testing:
    • Verify that the new `Consumer#close(CloseOptionCloseOptions)` API works seamlessly with both static and non-static group members, ensuring proper group membership handling across different configurations.
    • Test compatibility across different versions of the Kafka Consumer to ensure no backward compatibility issues arise from the introduction of the new API.

...