Status

Current state: "Under Discussion"

Discussion thread: here 

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The new Consumer rebalance protocol introduced with KIP-848 has been a major improvement to the group consumption experience. It offers a powerful new protocol for faster rebalances that significantly improves stability and performance of consumer groups, while allowing for simpler client implementations. This new rebalance protocol was tailored to consumer needs, and its client-side implementation on the KafkaConsumer also brings an improved consumer internal architecture (only available when using the new Consumer protocol). This has been available since Apache Kafka 3.7, and has been production-ready since 4.0, receiving significant improvements through version 4.2.

Nevertheless, the KafkaConsumer still uses the legacy Classic protocol by default. This directly affects adoption of the new protocol, slowing down the overall evolution of Kafka consumer groups applications. 

This KIP proposes a phased plan to drive a smooth evolution of Java client applications towards the next generation of the Consumer rebalance protocol: 

  • Phase 1 (AK 4.3): Start logging a message whenever the KafkaConsumer is used with the Classic protocol, suggesting upgrade to the improved Consumer protocol, and informing that support for the Classic protocol in the KafkaConsumer is going to be deprecated in the next major release and eventually removed.
  • Phase 2 (AK 5.0): Change the default protocol to Consumer in Apache Kafka 5.0 and deprecate support for the Classic protocol in KafkaConsumer 
  • Phase 3 (AK 6.0): Remove support for the Classic protocol in the KafkaConsumer in Apache Kafka 6.0

Public Interfaces

These are the changes to the public API in each of the 3 phases proposed: 

Phase 1 (AK 4.3): 

  • No changes to existing public interfaces. 
  • New info message logged on KafkaConsumer startup if the Classic protocol is used, informing about the new improved Consumer protocol and with recommendation to upgrade.

Phase 2 (AK 5.0): 

  • Change default rebalance protocol, from `classic` to `consumer`, in KafkaConsumer. 
  • Deprecate support for Classic protocol in KafkaConsumer, informing it will be removed in the next major release.
    • Deprecate the public client config group.protocol and the classic configurations (partition.assigment.strategy, session.timeout.ms, heartbeat.interval.ms)
    • Deprecate KafkaConsumer#enforceRebalance and KafkaConsumer#enforceRebalance(String)
    • Deprecate KafkaConsumer#subscribe(java.util.regex.Pattern) and KafkaConsumer#subscribe(java.util.regex.Pattern, ConsumerRebalanceListener)
    • Deprecate client-side assignors interface org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 

Phase 3 (AK 6.0): 

  • Remove support for the Classic protocol in KafkaConsumer
    • Remove public client configuration group.protocol (defined in the public ConsumerConfig  class)
    • Remove classic-related configurations, methods and assignors interface that were deprecated on Phase 2
  • Throw org.apache.kafka.common.config.ConfigException  when creating a KafkaConsumer if explicitly setting group.protocol=classic (e.g., providing string properties when creating a KafkaConsumer)

Proposed Changes


PhaseVersionDefaultSupportedKey Change
14.3classicclassic, consumerinfo message recommending Consumer protocol
25.0consumerclassic, consumerchange default Consumer & deprecate Classic
36.0consumerconsumer

remove support for Classic in KafkaConsumer


Phase 1: Recommend Consumer rebalance protocol in applications not using it

  • New info message logged when creating instances of the KafkaConsumer using the Classic protocol, to recommend the improved Consumer protocol.
  • Log will show if:
    • group.protocol is explicitly set to “classic” → Inform that the Classic protocol is going to be deprecated in the next major release to be eventually removed. Suggest upgrading to the new improved Consumer protocol.
    • group.protocol is not explicitly defined in the client configs → Inform that the default behaviour will change in the next major release, where Consumer will become the default protocol.
  • No changes to the default value of the group.protocol client configuration (default continues to be “classic”)
  • No deprecation or warnings for Classic protocol usage.
  • Target release: Apache Kafka 4.3

Phase 2: Change the default in KafkaConsumer to use the Consumer protocol and deprecate Classic

  • Change KafkaConsumer to internally select Consumer as the default rebalance protocol if no group.protocol config is defined.  
  • Deprecate support for the Classic protocol in the KafkaConsumer
    • deprecate group.protocol  config in the public API ConsumerConfig  
    • deprecate classic protocol configs in the public API ConsumerConfig : partition.assigment.strategy, session.timeout.ms, heartbeat.interval.ms (these properties already fail with ConfigException if used with the Consumer protocol) 
    • deprecate KafkaConsumer#enforceRebalance methods

    • deprecate KafkaConsumer#subscribe methods that take java.util.regex.Pattern. Suggest upgrading to KafkaConsumer#subscribe equivalents that take org.apache.kafka.clients.consumer.SubscriptionPattern
    • deprecate the org.apache.kafka.clients.consumer.ConsumerPartitionAssignor  interface
  • if group.protocol explicitly set to classic: log warning about using a deprecated protocol that will be removed in the next major release. Suggest removing the property (upgrade to the default Consumer protocol)
  • The “classic” protocol remains supported in this phase (showing deprecation log). 
  • All usages of the KafkaConsumer within the code base following the default behaviour will be using the Consumer protocol, e.g. Connect usages of KafkaConsumer will use the new protocol.
  • Target: Apache Kafka 5.0

Phase 3: Remove support for the Classic protocol in the KafkaConsumer

  • KafkaConsumer will use Consumer as the rebalance protocol, by default and only one supported.
  • group.protocol removed from the public API (along with the other 3 classic configs: partition.assigment.strategy , session.timeout.msheartbeat.interval.ms
    • remove them from the public class ConsumerConfig  
    • ignore if group.protocol is explicitly set to consumer: log warning about an unused property 
    • fail with ConfigException if group.protocol explicitly set to classic or any of the classic properties used: unsupported protocol. 
      • this aligns with how the classic protocol properties are handled since the new protocol was introduced (classic properties fail with ConfigException  if used with the consumer protocol). We would treat all the classic-related properties consistently at this point: if defined, they fail with ConfigException. 
  • Remove the classic-related methods and classes that were deprecated on Phase 2
  • Note that at this point we may need to keep internal support for the classic protocol, to be used by Kafka Streams if it hasn't completed a full transition to KIP-1071. See KafkaStreams section below.
  • Target: Apache Kafka 6.0

 

Note that the proposal is about deprecating and eventually removing support for the Classic protocol on the KafkaConsumer only. The Classic protocol remains supported on the broker side, as it is used as a more general-purpose group protocol by other components like Connect, expected to follow its own evolution path regarding it's usage of the classic protocol. 

Kafka Streams

Kafka Streams uses the Classic protocol via the KafkaConsumer, and has already introduced support for the new Consumer protocol, with the Streams rebalance protocol KIP-1071 (EA in 4.1, GA in 4.2).

There is no clear timeline yet for KIP-1071 to become the only streams protocol supported, so when we get to phase 3 (AK 6.0, remove support for the Classic protocol in KafkaConsumer), we need to consider this. If KafkaStreams still supports pre-KIP-1071 behaviour, we can remove public support to the classic protocol as planned, but keeping an internal configs to be used by KafkaStreams while it completes the transition to KIP-1071.

internal.group.protocol config supported by the KafkaConsumer (same for the other 3 classic properties as needed: internal.partition.assigment.strategy, internal.session.timeout.msinternal.heartbeat.interval.ms) 

  • internal.group.protocol will support Classic as value
  • explicitly mentioning that this is an internal config, not intended for user applications usage, that can be modified/removed anytime.
  • same approach as with other internal configs used in KafkaStreams before, like the internal.leave.group.on.close
  • once Kafka Streams completes the transition to KIP-1071 we can remove the internal configs and components we may have left for Kafka Streams usage. 

If by phase 3 (AK 6.0) KafkaStreams has already completed the transition and supports KIP-1071 only, we can remove classic support as planned, without the need to keep anything internally. 

Connect

Connect uses the classic protocol in 2 different ways :

  1. it uses KafkaConsumer to read data from topics
  2. it uses a custom implementation of the Classic protocol to assign connect tasks. This is a Connect-specific implementation of the Classic protocol, different from one used by the KafkaConsumer

As part of this KIP, we want to align on point 1 above: all usages of the KafkaConsumer in Connect will be migrated to use the new Consumer protocol, and that should happen as part of Phase 2 (changing the default)

Outside of the scope of this KIP will remain the need to align on point 2: decide an evolution path for Connect usage of the Classic protocol.

Compatibility, Deprecation, and Migration Plan

Migration

Client applications can safely migrate to the new Consumer rebalance protocol following the proposed upgrade paths: offline upgrade, or online upgrade. Both are fully supported and documented since Apache Kafka 4.0. See https://kafka.apache.org/41/operations/consumer-rebalance-protocol/#upgrade--downgrade 

These are the minimum and recommended versions to upgrade to, for both client and broker:

  • Minimum version: Apache Kafka 4.0 (Consumer protocol GA)
  • Recommended version: Apache Kafka 4.2+ (latest 4.x for latest improvements)

Compatibility

The main compatibility consideration when moving Consumer applications to the new Consumer protocol relates to client-side assignors.

Client-side assignors are not supported under the new Consumer rebalance protocol, replaced by pluggable broker-side assignors.

How to migrate applications using client-side assignors?

The migration proposal for applications using client-side assignors is to convert the client-side assignor to a broker-side assignor. 

  • Implement broker-side assignor interface org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor 
  • Enable the new assignor on the broker, adding its class name to the broker config `group.consumer.assignors` 
  • Configure client applications to use the new broker-side assignor, by setting the assignor name in the client config `group.remote.assignor`


Note that there are currently 2 broker-side assignors supported by default since Apache Kafka 4.0:

These are readily available, with the Uniform used by default. Also note that rack-awareness is being added to these, to be delivered as part of KAFKA-19387 - Getting issue details... STATUS

Test Plan

All applicable consumer integration and system tests are already running for both protocols, Classic and Consumer (including unit, integration and system tests). 

Phase 2 Testing (Change Default)

  • Add tests to cover the new default selection.
  • Make test updates as needed to remove unnecessary protocol selection.
  • Run all tests to validate the behavior under the new default consumer config. This would include Connect tests to validate behaviour of its KafkaConsumers under the new protocol (integration and system test)

Phase 3 Testing (Remove Classic Support)

  • Make test updates as needed to remove all `classic` combinations.
  • Add test to cover the validations around the removed property group.protocol 
  • Run all tests to validate the behavior.
  • If we need to keep internal support for classic at this point, to be used by KafkaStreams, we should include tests for the internal mechanism/config.

Rejected Alternatives

  • Supporting client-side assignors with the new Consumer protocol was initially considered as part of KIP-848. That was driven mainly by the need to support client-side assignors for Kafka Streams, but with the implementation of the new Streams Rebalance protocol (KIP-1071), Kafka Streams moved away from client-side assignors too.
  • Deprecate the usage of "classic" in the KafkaConsumer group.protocol  config before 5.0. It was initially considered to deprecate in 4.3 in preparation for the default change in 5.0 and as a way to drive adoption. But given the strong message of a deprecation, and the fact that the removal of the Classic support in KafkaConsumer is not expected until AK 6.0, seems sensible to deprecate for removal in 5.0 along with the default change. 


  • No labels