Status

Current stateAccepted

Discussion thread: https://lists.apache.org/thread/t4h273md03vzcbzk4r3bn86km8rnym66

Vote thread: https://lists.apache.org/thread/j0ol8tk2fdyk0bk6x859yw5dtjh8h1xc

JIRA: KAFKA-20167 - Getting issue details... STATUS

Motivation

KafkaStreams currently allows users to control shutdown behavior via CloseOptions, with two existing values: LEAVE_GROUP and REMAIN_IN_GROUP. When no options are provided, KafkaStreams.close() implicitly uses REMAIN_IN_GROUP.

With the introduction of the Streams Protocol (KIP-1071), this implicit default has become semantically inconsistent:

  • Classic Protocol: REMAIN_IN_GROUP behaves as expected, the member stays in the consumer group, enabling a faster rebalance upon restart.
  • Streams Protocol (KIP-1071), dynamic member: The member unconditionally leaves the group regardless of this setting, since remaining in the group provides no benefit for non-static members under the new protocol.
  • Streams Protocol (KIP-1071), static member: The member should remain in the group by default. Static membership is explicitly configured to reduce rebalance disruption on restart, and leaving the group on close would significantly reduce the benefit of static membership.. Notably, the underlying consumer already differentiates close semantics based on membership type, making this behavior both natural and consistent.

To resolve this ambiguity and provide a cleaner API for the long term, we should introduce a new enum value: CloseOptions.DEFAULT. This allows the "default" behavior to be adaptive based on the underlying protocol, while strictly reserving REMAIN_IN_GROUP for when the user explicitly requests that intent regardless of protocol or membership type.

Public Interfaces

The following public API is affected:

  • org.apache.kafka.streams.CloseOptions — new enum value DEFAULT will be added.
  • KafkaStreams.close() — default behavior will be updated to use CloseOptions.DEFAULT.

Proposed Changes

1. Add CloseOptions.DEFAULT

A new enum constant DEFAULT is added to org.apache.kafka.streams.CloseOptions. This value signals that the runtime should choose the most appropriate closing behavior based on the active protocol. 

2. Update KafkaStreams.close() Behavior

Currently, KafkaStreams.close() internally delegates to close(CloseOptions.REMAIN_IN_GROUP). This will be changed to delegate to close(CloseOptions.DEFAULT), making the no-arg form truly protocol-agnostic.

3. Define Logic for CloseOptions.DEFAULT

When CloseOptions.DEFAULT is used, the behavior adapts based on the active protocol:

  • Classic Protocol → maps to Consumer CloseOptions.REMAIN_IN_GROUP (preserves existing behavior; no regression for current users).
  • Streams Protocol (KIP-1071), dynamic member → DEFAULT maps to consumer DEFAULT, causing the member to leave the group, since remaining provides no benefit.

  • Streams Protocol (KIP-1071), static member → maps to Consumer REMAIN_IN_GROUP, preserving static membership and avoiding unnecessary rebalances on restart. This is consistent with the consumer, which already differentiates close semantics based on membership type.

4. Redefine CloseOptions.REMAIN_IN_GROUP

REMAIN_IN_GROUP transitions from being the implicit default to an explicit user intent. When specified, it will always map to Consumer CloseOptions.REMAIN_IN_GROUP, regardless of the active protocol. Users who rely on this specific behavior under the Streams Protocol must now opt in explicitly.

5. Implement Consumer CloseOptions.REMAIN_IN_GROUP support in KIP-1071

Currently, the Streams Protocol implementation in StreamsGroupHeartbeatManager does not honor REMAIN_IN_GROUP. This gap must be addressed as part of this KIP to ensure the option functions correctly end-to-end under KIP-1071: Streams Rebalance Protocol#Compatibility,DeprecationandMigrationPlan.

Behavior Matrix

Streams CloseOptionClassic Protocol (Dynamic Member)Classic Protocol (Static Member)Streams Protocol (Dynamic Member)Streams Protocol (Static Member)
close() (current) [1]Consumer REMAINConsumer REMAINConsumer LEAVEN/A
close() (new) [2]Consumer REMAINConsumer REMAINConsumer DEFAULT (LEAVE)Consumer DEFAULT (REMAIN)
close(REMAIN_IN_GROUP) (current)Consumer REMAINConsumer REMAINConsumer LEAVEN/A
close(REMAIN_IN_GROUP) (new)Consumer REMAINConsumer REMAINConsumer REMAINConsumer REMAIN
close(LEAVE_GROUP) (current)Consumer LEAVEConsumer LEAVEConsumer LEAVEN/A
close(LEAVE_GROUP) (new)Consumer LEAVEConsumer LEAVEConsumer LEAVEConsumer LEAVE

Legend:

  1. close() (current) internally delegates to close(REMAIN_IN_GROUP) 
  2. close() (new) internally delegates to close(DEFAULT) 

Compatibility, Deprecation, and Migration Plan

  • Classic Protocol users: No behavioral change. KafkaStreams.close() continues to result in the member remaining in the group, as before.
  • Streams Protocol users (dynamic member): The behavior of the no-arg close() becomes cleaner and consistent with the protocol's design intent.
  • Streams Protocol users (static member): The behavior of the no-arg close() now correctly honors static membership semantics — the member remains in the group by default. 
  • Streams Protocol users (explicit REMAIN_IN_GROUP): This is a behavioral change worth calling out explicitly. Prior to this KIP, REMAIN_IN_GROUP was silently ignored under the Streams Protocol, and the member would always leave the group. After this KIP, the option will be correctly honored. We consider this a bug fix rather than an intentional design change and expect very few users to be affected in practice.
  • No existing enum values are removed or deprecated. The new DEFAULT value is purely additive.

Test Plan

  • All existing regression tests must continue to pass without modification.
  • New unit tests should cover:
    • The routing logic of CloseOptions.DEFAULT for both protocol variants.
    • REMAIN_IN_GROUP behavior under the Streams Protocol via StreamsGroupHeartbeatManager.
  • New integration tests should cover end-to-end close behavior for all three CloseOptions values under both the Classic and Streams protocols.

Rejected Alternatives

n/a

  • No labels