DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/t4h273md03vzcbzk4r3bn86km8rnym66
Vote thread: https://lists.apache.org/thread/j0ol8tk2fdyk0bk6x859yw5dtjh8h1xc
JIRA: KAFKA-20167 - Getting issue details... STATUS
Motivation
KafkaStreams currently allows users to control shutdown behavior via CloseOptions, with two existing values: LEAVE_GROUP and REMAIN_IN_GROUP. When no options are provided, KafkaStreams.close() implicitly uses REMAIN_IN_GROUP.
With the introduction of the Streams Protocol (KIP-1071), this implicit default has become semantically inconsistent:
- Classic Protocol:
REMAIN_IN_GROUPbehaves as expected, the member stays in the consumer group, enabling a faster rebalance upon restart. - Streams Protocol (KIP-1071), dynamic member: The member unconditionally leaves the group regardless of this setting, since remaining in the group provides no benefit for non-static members under the new protocol.
- Streams Protocol (KIP-1071), static member: The member should remain in the group by default. Static membership is explicitly configured to reduce rebalance disruption on restart, and leaving the group on close would significantly reduce the benefit of static membership.. Notably, the underlying consumer already differentiates close semantics based on membership type, making this behavior both natural and consistent.
To resolve this ambiguity and provide a cleaner API for the long term, we should introduce a new enum value: CloseOptions.DEFAULT. This allows the "default" behavior to be adaptive based on the underlying protocol, while strictly reserving REMAIN_IN_GROUP for when the user explicitly requests that intent regardless of protocol or membership type.
Public Interfaces
The following public API is affected:
org.apache.kafka.streams.CloseOptions— new enum valueDEFAULTwill be added.KafkaStreams.close()— default behavior will be updated to useCloseOptions.DEFAULT.
Proposed Changes
1. Add CloseOptions.DEFAULT
A new enum constant DEFAULT is added to org.apache.kafka.streams.CloseOptions. This value signals that the runtime should choose the most appropriate closing behavior based on the active protocol.
2. Update KafkaStreams.close() Behavior
Currently, KafkaStreams.close() internally delegates to close(CloseOptions.REMAIN_IN_GROUP). This will be changed to delegate to close(CloseOptions.DEFAULT), making the no-arg form truly protocol-agnostic.
3. Define Logic for CloseOptions.DEFAULT
When CloseOptions.DEFAULT is used, the behavior adapts based on the active protocol:
- Classic Protocol → maps to Consumer
CloseOptions.REMAIN_IN_GROUP(preserves existing behavior; no regression for current users). Streams Protocol (KIP-1071), dynamic member →
DEFAULTmaps to consumerDEFAULT, causing the member to leave the group, since remaining provides no benefit.Streams Protocol (KIP-1071), static member → maps to Consumer REMAIN_IN_GROUP, preserving static membership and avoiding unnecessary rebalances on restart. This is consistent with the consumer, which already differentiates close semantics based on membership type.
4. Redefine CloseOptions.REMAIN_IN_GROUP
REMAIN_IN_GROUP transitions from being the implicit default to an explicit user intent. When specified, it will always map to Consumer CloseOptions.REMAIN_IN_GROUP, regardless of the active protocol. Users who rely on this specific behavior under the Streams Protocol must now opt in explicitly.
5. Implement Consumer CloseOptions.REMAIN_IN_GROUP support in KIP-1071
Currently, the Streams Protocol implementation in StreamsGroupHeartbeatManager does not honor REMAIN_IN_GROUP. This gap must be addressed as part of this KIP to ensure the option functions correctly end-to-end under KIP-1071: Streams Rebalance Protocol#Compatibility,DeprecationandMigrationPlan.
Behavior Matrix
| Streams CloseOption | Classic Protocol (Dynamic Member) | Classic Protocol (Static Member) | Streams Protocol (Dynamic Member) | Streams Protocol (Static Member) |
|---|---|---|---|---|
| close() (current) [1] | Consumer REMAIN | Consumer REMAIN | Consumer LEAVE | N/A |
| close() (new) [2] | Consumer REMAIN | Consumer REMAIN | Consumer DEFAULT (LEAVE) | Consumer DEFAULT (REMAIN) |
| close(REMAIN_IN_GROUP) (current) | Consumer REMAIN | Consumer REMAIN | Consumer LEAVE | N/A |
| close(REMAIN_IN_GROUP) (new) | Consumer REMAIN | Consumer REMAIN | Consumer REMAIN | Consumer REMAIN |
| close(LEAVE_GROUP) (current) | Consumer LEAVE | Consumer LEAVE | Consumer LEAVE | N/A |
| close(LEAVE_GROUP) (new) | Consumer LEAVE | Consumer LEAVE | Consumer LEAVE | Consumer LEAVE |
Legend:
close()(current) internally delegates toclose(REMAIN_IN_GROUP)close()(new) internally delegates toclose(DEFAULT)
Compatibility, Deprecation, and Migration Plan
- Classic Protocol users: No behavioral change.
KafkaStreams.close()continues to result in the member remaining in the group, as before. - Streams Protocol users (dynamic member): The behavior of the no-arg
close()becomes cleaner and consistent with the protocol's design intent. - Streams Protocol users (static member): The behavior of the no-arg
close()now correctly honors static membership semantics — the member remains in the group by default. - Streams Protocol users (explicit REMAIN_IN_GROUP): This is a behavioral change worth calling out explicitly. Prior to this KIP, REMAIN_IN_GROUP was silently ignored under the Streams Protocol, and the member would always leave the group. After this KIP, the option will be correctly honored. We consider this a bug fix rather than an intentional design change and expect very few users to be affected in practice.
- No existing enum values are removed or deprecated. The new
DEFAULTvalue is purely additive.
Test Plan
- All existing regression tests must continue to pass without modification.
- New unit tests should cover:
- The routing logic of
CloseOptions.DEFAULTfor both protocol variants. REMAIN_IN_GROUPbehavior under the Streams Protocol viaStreamsGroupHeartbeatManager.
- The routing logic of
- New integration tests should cover end-to-end close behavior for all three
CloseOptionsvalues under both the Classic and Streams protocols.
Rejected Alternatives
n/a