DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
Vote thread: here
JIRA: KAFKA-18193
Motivation
In Kafka Streams, configuration classes typically follow a fluent API pattern to ensure a consistent and intuitive developer experience. However, the current implementation of `org.apache.kafka.streams.KafkaStreams$CloseOptions` deviates from this convention by exposing a public constructor, breaking the uniformity expected across the API.
To address this inconsistency, we propose introducing a new `CloseOptions` class that adheres to the fluent API style, replacing the existing implementation. The new class will retain the existing `timeout(Duration)` and `leaveGroup(boolean)` methods but will enforce fluent instantiation and configuration. Given the design shift, we will not maintain backward compatibility with the current class.
This change aligns with the goal of standardizing configuration objects across Kafka Streams, offering developers a more cohesive and predictable API.
Additionally, with the introduction of a more flexible `CloseOptions` mechanism for the consumer in KIP-1092, this update brings `KafkaStreams$CloseOptions` in line with that proposal, enhancing shutdown semantics in a consistent manner across the ecosystem.
Public Interfaces
There are two classes will be modified.
org.apache.kafka.streams.KafkaStreams$CloseOptionsorg.apache.kafka.streams.CloseOptions
The `KafkaStreams$CloseOptions` will be deprecated.
@Deprecated
public static class CloseOptions {
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
private boolean leaveGroup = false;
@Deprecated
public CloseOptions timeout(final Duration timeout) {
this.timeout = timeout;
return this;
}
@Deprecated
public CloseOptions leaveGroup(final boolean leaveGroup) {
this.leaveGroup = leaveGroup;
return this;
}
}
The `CloseOptions` class will have two fields:
- groupMembershipOperation (GroupMembershipOperation): This field is an enum that specifies whether the KafkaStreams should send a `LeaveGroup` request upon shutdown. The available options in the enum are:
- LEAVE_GROUP: The KafkaStreams will send a leaveGroup request and trigger a rebalance.
- REMAIN_IN_GROUP: The KafkaStreams will remain in the group and not trigger a rebalance.
- timeout (Duration): Specifies the maximum amount of time to wait for the shutdown process to complete. This allows users to define a custom timeout for gracefully stopping the KafkaStreams. If no value is set, the default timeout
Long.MAX_VALUEwill be applied.
The `CloseOptions` class will provide two static factory methods to create instances:
- CloseOptions.groupMembershipOperation(GroupMembershipOperation operation): This method allows users to set the desired group membership behavior during shutdown.
- CloseOptions.timeout(Duration timeout): This method allows users to set a custom timeout for the shutdown process.
Additionally, the class will support fluent API methods for more flexibility:
- CloseOptions.withTimeout(Duration timeout): This method allows users to set the timeout using a fluent API.
- CloseOptions.withGroupMembershipOperation(GroupMembershipOperation operation): This method allows users to set the group membership operation using a fluent API.
This new API will provide a more flexible and explicit way to control consumer group membership and shutdown behavior.
public class CloseOptions {
/**
* Enum to specify the group membership operation upon leaving group.
*
* <ul>
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the group.</li>
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
* </ul>
*/
public enum GroupMembershipOperation {
LEAVE_GROUP,
REMAIN_IN_GROUP
}
/**
* Specifies the group membership operation upon shutdown.
* By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be applied, which follows the KafkaStreams default behavior.
*/
protected GroupMembershipOperation operation = GroupMembershipOperation.REMAIN_IN_GROUP;
/**
* Specifies the maximum amount of time to wait for the close process to complete.
* This allows users to define a custom timeout for gracefully stopping the KafkaStreams.
*/
protected Optional<Duration> timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE));
private CloseOptions() {
}
protected CloseOptions(final CloseOptions CloseOptions) {
this.operation = CloseOptions.operation;
this.timeout = CloseOptions.timeout;
}
/**
* Static method to create a {@code CloseOptions} with a custom timeout.
*
* @param timeout the maximum time to wait for the KafkaStreams to close.
* @return a new {@code CloseOptions} instance with the specified timeout.
*/
public static CloseOptions timeout(final Duration timeout) {
return new CloseOptions().withTimeout(timeout);
}
/**
* Static method to create a {@code CloseOptions} with a specified group membership operation.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return a new {@code CloseOptions} instance with the specified group membership operation.
*/
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
return new CloseOptions().withGroupMembershipOperation(operation);
}
/**
* Fluent method to set the timeout for the close process.
*
* @param timeout the maximum time to wait for the KafkaStreams to close. If {@code null}, the default timeout will be used.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withTimeout(final Duration timeout) {
if (timeout == null) {
this.timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE));
} else {
this.timeout = Optional.of(timeout);
}
return this;
}
/**
* Fluent method to set the group membership operation upon shutdown.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
this.operation = Objects.requireNonNull(operation, "operation should not be null");
return this;
}
}
Proposed Changes
We propose introducing a new public API method, `KafkaStreams#close(CloseOptions)`, to replace the existing `KafkaStreams#close(KafkaStreams$CloseOptions)` method, which will be deprecated.
The key advantages of the new API are:
- Improved readability: Replacing boolean flags with enum values makes the API more expressive and easier to understand.
- Consistent design: The new CloseOptions class will follow the fluent API pattern, aligning with the design of other configuration classes in Kafka Streams.
@Deprecated
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
Objects.requireNonNull(options, "options cannot be null");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
return close(Optional.of(timeoutMs), options.leaveGroup);
}
// new public API
public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
// skip...
}
Also introduces a new internal class, `org.apache.kafka.streams.internals.CloseOptionsInternal`, which extends the public `CloseOptions` API. While `CloseOptions` will adopt a fluent-style API without traditional getters, the internal subclass will expose getter methods to facilitate access by internal Kafka components.
public class CloseOptionsInternal extends CloseOptions {
public CloseOptionsInternal(CloseOptions options) {
super(options);
}
public GroupMembershipOperation operation() {
return operation;
}
public Optional<Duration> timeout() {
return timeout;
}
}
Compatibility, Deprecation, and Migration Plan
The introduction of the `KafkaStreams#close(CloseOptions)` method offers a more readable and fluent approach for managing application shutdown.
As part of this improvement, the existing `KafkaStreams#close(KafkaStreams$CloseOptions)` method will be deprecated starting with Kafka 4.1. This deprecation encourages users to adopt the new, more versatile `KafkaStreams#close(CloseOptions)` API.
The deprecated `KafkaStreams#close(KafkaStreams$CloseOptions)` method, along with the `KafkaStreams$CloseOptions` class, is planned for removal in a future major release - Kafka 5.0. Deprecation warnings and updated documentation will inform users of this change, ensuring ample time for migration.
Test Plan
Since the underlying logic of the close operation will remain unchanged, we will ensure that the new `KafkaStreams#close(CloseOptions)` method behaves identically to the deprecated `KafkaStreams#close(KafkaStreams$CloseOptions)` method. This includes writing tests that verify functional parity between the two APIs to guarantee consistent behavior during shutdown.
Rejected Alternatives
Reuse org.apache.kafka.clients.consumer.CloseOptions
Since Kafka Streams and the consumer may follow different leave group semantics, sharing a common CloseOptions class would introduce complexity and reduce flexibility for future enhancements. As a result, we will define a separate CloseOptions class for Kafka Streams rather than reusing the one from the consumer.