Status

Current state: Accepted

Discussion thread: link

Voting thread: link link

JIRA: KAFKA-20385 - Getting issue details... STATUS

Motivation

The ConsumerRebalanceListener Javadoc recommends performing consumer operations during rebalance callbacks:

"It is common for the revocation callback to be used to commit offsets for the partitions being revoked." "The assigned callback could be used to set the initial position of the assigned partitions."

But the callback signatures provide no access to the consumer:


void onPartitionsAssigned(Collection<TopicPartition> partitions);
void onPartitionsRevoked(Collection<TopicPartition> partitions);


The standard workaround is constructor injection: consumer.subscribe(topics, new MyListener(consumer)). This gives the listener an unrestricted Consumer reference and nothing prevents calling poll(), close(), or subscribe() inside a rebalance callback.

Every major framework has independently built the same fix:

  • Spring Kafka (since 2.0, 2017): ConsumerAwareRebalanceListener passes Consumer<?, ?> to every callback. Also splits onPartitionsRevoked into before-commit and after-commit phases.
  • SmallRye Reactive Messaging (Quarkus): KafkaConsumerRebalanceListener passes Consumer<?, ?>. Their docs: "Unlike the ConsumerRebalanceListener from Apache Kafka, the methods pass the Kafka Consumer."
  • Micronaut Kafka: ConsumerAware interface injects the consumer via setter, used alongside ConsumerRebalanceListener.

All three pass the full Consumer. This KIP does the same but through a restricted view that omits operations dangerous during a rebalance (poll(), close(), subscribe(), wakeup()), making misuse a compile error.

Use Cases

Offset Commit on Revocation

MyRebalanceListener
// Today
class MyListener implements ConsumerRebalanceListener {
    private final Consumer<?, ?> consumer;
    MyListener(Consumer<?, ?> consumer) { this.consumer = consumer; }
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync(); // unrestricted reference
    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
}

// With this KIP
consumer.setRebalanceListener(new RebalanceListener() {
    // rest of the methods
    @Override
    public void onPartitionsRevoked(
            Collection<TopicPartition> partitions, RebalanceConsumer consumer) {
        consumer.commitSync();
    }
	@Override
    public void onPartitionsAssigned(
            Collection<TopicPartition> partitions, RebalanceConsumer consumer) {}
	
});
consumer.subscribe(topics);

Seek to External Offsets on Assignment

consumer.position(), consumer.seek(), consumer.commitSync() are all available through RebalanceConsumer.

Partition Pause/Resume on Assignment

Pause newly assigned partitions until caches are warm, then resume outside the callback. In this case, the paused state set during onPartitionsAssigned persists after the callback returns and the consumer does not automatically resume partitions when the callback exits. When partitions are revoked, any paused states on those partitions are cleared by the consumer.

Public Interfaces

New interface RebalanceListener

RebalanceListener.java
package org.apache.kafka.clients.consumer;

/**
 * reuse Javadoc from ConsumerRebalanceListener for class javadoc and methodlevel javadoc.
 */
public interface RebalanceListener {
    /** 
     * +inherit Javadoc from ConsumerRebalanceListener. 
     * 
     * Consumer-aware variant of {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}.
     *
     * <p>This method receives a {@link RebalanceConsumer}, a restricted view of the {@link Consumer}
     * that exposes only operations safe during a rebalance (e.g. seeking to externally stored offsets,
     * pausing partitions until caches warm up). The {@code RebalanceConsumer} is valid only for the
     * duration of this callback; storing a reference and using it later will throw
     * {@link IllegalStateException}.
     */
    void onPartitionsAssigned(Collection<TopicPartition> partitions, RebalanceConsumer rebalanceConsumer);

    //** 
     * +inherit Javadoc from ConsumerRebalanceListener. 
     * 
     * Consumer-aware variant of {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}.
     *
     * <p>This method receives a {@link RebalanceConsumer}, a restricted view of the {@link Consumer}
     * that exposes only operations safe during a rebalance (e.g. seeking to externally stored offsets,
     * pausing partitions until caches warm up). The {@code RebalanceConsumer} is valid only for the
     * duration of this callback; storing a reference and using it later will throw
     * {@link IllegalStateException}.
     */
     void onPartitionsRevoked(Collection<TopicPartition> partitions, RebalanceConsumer rebalanceConsumer);

    /**
     * Consumer-aware variant of {@link #onPartitionsLost(Collection)}.
     *
     * <p>This method receives a {@link RebalanceConsumer}, a restricted view of the {@link Consumer}
     * that exposes only operations safe during a rebalance. The {@code RebalanceConsumer} is valid
     * only for the duration of this callback; storing a reference and using it later will throw
     * {@link IllegalStateException}.
     *
     * <h4>Important difference from {@code onPartitionsRevoked}</h4>
     *
     * <p>Unlike {@link #onPartitionsRevoked(Collection, RebalanceConsumer)}, where the affected
     * partitions are still part of the consumer's assignment until the callback completes, the
     * partitions passed to this method have <em>already been removed</em> from the assignment
     * before this callback fires. This means:
     *
     * <ul>
     *   <li>{@link RebalanceConsumer#commitSync() commitSync()}/{@link RebalanceConsumer#commitAsync() commitAsync()}
     *       will not commit the current assignment, since the consumer no longer includes the lost partitions.</li>
     *   <li>{@link RebalanceConsumer#commitSync(java.util.Map) commitSync(offsets)}
     *       /{@link RebalanceConsumer#commitAsync(java.util.Map, OffsetCommitCallback) commitAsync(offsets)}
     *       with the lost partitions explicitly will be rejected by the broker since the consumer
     *       no longer owns them.</li>
     *   <li>{@link RebalanceConsumer#position(org.apache.kafka.common.TopicPartition) position()} and
     *       {@link RebalanceConsumer#seek(org.apache.kafka.common.TopicPartition, long) seek()} will
     *       fail for the lost partitions since they are not in the assignment.</li>
     * </ul>
     *
     * <p>The {@link RebalanceConsumer} does not add guardrails against these operations on lost
     * partitions. Users should be aware that offset management for lost partitions is not
     * possible in this callback.
     *
     * @param partitions The list of partitions that were assigned to the consumer and now have been reassigned
     *                   to other consumers. With both the Classic and Consumer protocols, this will always include
     *                   all partitions that were previously assigned to the consumer.
     * @param consumer   A restricted view of the {@link Consumer} that is only valid for the duration of this callback.
     *                   See {@link RebalanceConsumer} for the set of permitted operations.
     * @throws org.apache.kafka.common.errors.WakeupException    If raised from a nested call to {@link RebalanceConsumer}
     * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link RebalanceConsumer}
     *
     * @see #onPartitionsLost(Collection)
     * @see RebalanceConsumer
     */
    @Override
    default void onPartitionsLost(Collection<TopicPartition> partitions, RebalanceConsumer rebalanceConsumer) {
        onPartitionsRevoked(partitions, rebalanceConsumer);
    }
}


New Default methods to ConsumerRebalanceListener, deprecate the interface, extend base

ConsumerRebalanceListener
package org.apache.kafka.clients.consumer;

/**
 * {@inheritDoc}
 * @deprecated
 * @see RebalanceListener
 */
@Deprecated(since = "4.4", forRemoval = true)
public interface ConsumerRebalanceListener extends RebalanceListener {
   
    // Other methods stay unchanged.

	/**
     * Called after partitions have been assigned to this consumer.
     *
     * <p>The default implementation delegates to
     * {@link #onPartitionsAssigned(Collection)}. Override this method
     * instead of the one-arg variant when the callback needs to interact
     * with the consumer (commit offsets, seek, pause/resume).
     *
     * <p>When this method is overridden, the one-arg
     * {@code onPartitionsAssigned(Collection)} is not called unless the
     * override explicitly invokes it.
     *
     * @param partitions the newly assigned partitions
     * @param consumer   a restricted view of the consumer; supports seek,
     *                   commit, pause/resume, and read-only queries
     */
    @Override
    default void onPartitionsAssigned(Collection<TopicPartition> partitions, RebalanceConsumer consumer) {
        onPartitionsAssigned(partitions);
    }

    /**
     * Called before partitions are revoked from this consumer during a
     * cooperative rebalance, or before all partitions are revoked during
     * an eager rebalance.
     *
     * <p>The default implementation delegates to
     * {@link #onPartitionsRevoked(Collection)}.
     *
     * @param partitions the partitions being revoked
     * @param consumer   a restricted view of the consumer
     */
    @Override
    default void onPartitionsRevoked(Collection<TopicPartition> partitions, RebalanceConsumer consumer) {
        onPartitionsRevoked(partitions);
    }

    /**
     * Called when partitions have been lost. Unlike revocation, the consumer
     * no longer owns these partitions and commits will fail.
     *
     * <p>The default implementation delegates to {@link #onPartitionsLost(Collection)}.
     *
     * @param partitions the partitions that were lost
     * @param consumer   a restricted view of the consumer
     */
    @Override
    default void onPartitionsLost(Collection<TopicPartition> partitions, RebalanceConsumer consumer) {
        onPartitionsLost(partitions);
    }
}

RebalanceConsumer

A restricted projection of Consumer<?, ?> exposing only operations safe during a rebalance:

RebalanceConsumer
package org.apache.kafka.clients.consumer;

/**
 * A restricted view of a {@link Consumer} passed to {@link ConsumerRebalanceListener} methods.
 * This object is only valid for the duration of the rebalance callback. Any use outside of this scope,
 * will result in an {@link IllegalStateException}.
 *
 * <p>This interface exposes only operations that are safe and meaningful
 * during partition rebalancing:
 *
 * <ul>
 *   <li><b>Offset management:</b> {@code commitSync}, {@code commitAsync},
 *       {@code committed}, {@code position}</li>
 *   <li><b>Seek:</b> {@code seek}, {@code seekToBeginning},
 *       {@code seekToEnd}</li>
 *   <li><b>Partition state:</b> {@code assignment}, {@code paused},
 *       {@code pause}, {@code resume}</li>
 *   <li><b>Read-only queries:</b> {@code partitionsFor}, {@code listTopics},
 *       {@code offsetsForTimes}, {@code beginningOffsets},
 *       {@code endOffsets}, {@code metrics}, {@code clientInstanceId}, {@code currentLag}, {@code groupMetadata}</li>
 * </ul>
 *
 * <p>Operations that are dangerous, inapplicable during a rebalance are not present
 * on this interface:
 *
 * <ul>
 *   <li>{@code poll()} - would cause re-entrant polling</li>
 *   <li>{@code close()} - would terminate the consumer mid-rebalance</li>
 *   <li>{@code subscribe()}, {@code unsubscribe()} - would corrupt
 *       subscription state during rebalance</li>
 *   <li>{@code registerMetricForSubscription}, {@code unregisterMetricForSubscription} - metric
 *       registration does not belong during rebalance events</li>
 *   <li>{@code assign()} - conflicts with group-managed assignment</li>
 *   <li>{@code wakeup()} - would interrupt the rebalance</li>
 *   <li>{@code enforceRebalance()} - would trigger re-entrant rebalance</li>
 * </ul>
 */
public interface RebalanceConsumer {

    // --- Offset management ---

    void commitSync();
    void commitSync(Duration timeout);
    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout);
    void commitAsync();
    void commitAsync(OffsetCommitCallback callback);
    void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
                     OffsetCommitCallback callback);

    Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);
    Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions,
                                                      Duration timeout);
    long position(TopicPartition partition);
    long position(TopicPartition partition, Duration timeout);

    // --- Seek ---

    void seek(TopicPartition partition, long offset);
    void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
    void seekToBeginning(Collection<TopicPartition> partitions);
    void seekToEnd(Collection<TopicPartition> partitions);

    // --- Partition state ---

    Set<TopicPartition> assignment();
    void pause(Collection<TopicPartition> partitions);
    void resume(Collection<TopicPartition> partitions);
    Set<TopicPartition> paused();

    // --- Read-only queries ---

	Uuid clientInstanceId(Duration timeout);
    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions,
                                                Duration timeout);
    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions,
                                         Duration timeout);
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
            Map<TopicPartition, Long> timestampsToSearch);
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
            Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
    List<PartitionInfo> partitionsFor(String topic);
    List<PartitionInfo> partitionsFor(String topic, Duration timeout);
    Map<String, List<PartitionInfo>> listTopics();
    Map<String, List<PartitionInfo>> listTopics(Duration timeout);
	OptionalLong currentLag(TopicPartition topicPartition);
    ConsumerGroupMetadata groupMetadata();
}


Included: commitSync, commitAsync, committed, position, seek, seekToBeginning, seekToEnd, assignment, pause, resume, paused, beginningOffsets, endOffsets, offsetsForTimes, partitionsFor, listTopics, metrics, groupMetadata, currentLag

Omitted (not on the interface): poll, close, subscribe*, unsubscribe, assign, wakeup, enforceRebalance,(un)registerMetricForSubscription

Consumer

Consumer
/** Update class javadoc with setRebalanceListener pattern. */

/** Also add a reference to {@link #setRebalanceListener} on existing subscribe() methods, to help with 5.0 migration. */

/**
 * Register a {@link ConsumerRebalanceListener} to be invoked when the consumer's partition assignment
 * changes during a rebalance. This is the preferred way to register a rebalance listener, replacing
 * the previous pattern of passing a listener to the {@code subscribe()} methods (which are now deprecated).
 *
 * <p>
 * The listener can be updated at any time. The change takes effect on the next {@link #poll(Duration)}
 * invocation: any rebalance triggered during that poll (or subsequent polls) will use the most recently
 * set listener.
 *
 * <p>
 * Passing {@code null} removes the current listener, so that no callbacks are invoked on future rebalances.
 *
 * <p>
 * If a listener was previously registered via one of the deprecated {@code subscribe(topics, listener)}
 * variants, calling this method will override it. 
 * 
 * <p>
 * The same is true for multiple registrations via this method. The final value that was set before the next
 * {@code poll()} will receive callback invocations during rebalance events.
 *
 * @param callback The listener to invoke on partition assignment changes, or {@code null} to remove
 *                 the current listener.
 *
 * @see ConsumerRebalanceListener
 * @see RebalanceConsumer
 */
void setRebalanceListener(RebalanceListener callback);

/** Use {@link #setRebalanceListener} instead. */
@Deprecated(since = "4.4", forRemoval = true)
void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback);

/** Use {@link #setRebalanceListener} instead. */
@Deprecated(since = "4.4", forRemoval = true)
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);

/** Use {@link #setRebalanceListener} instead. */
@Deprecated(since = "4.4", forRemoval = true)
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);

Proposed Changes

New setRebalanceListener method on Consumer, relation between RebalanceListener and the legacy ConsumerRebalanceListener

The current ConsumerRebalanceListener now inherits the 2-arg methods with a default implementation pointing to its respective 1-arg legacy method. With the new setRebalanceListener method, new adopters can switch over directly to implementing RebalanceListener, while also supporting the subclass ConsumerRebalanceListener for compatibility.

Callback reference in Consumer implementations

Both setRebalanceListener and subscribe(..., ConsumerRebalanceListener) will coexist. Both AsyncKafkaConsumer and ClassicKafkaConsumer eventually override the reference of rebalanceListener within SubscriptionsState.

There is no change in behaviour with the existing subscribe() methods with the callback argument to ease transition. At the next minor release when users upgrade to the latest version, the javadoc for subscribe() method will point them to the set method.

A future KIP could be to expose addRebalanceListener() to the Consumer API to support multiple listeners as users currently rely on wrapping their internal listeners similar to Producer/ConsumerInterceptors, or support a simple Composite rebalance listener class for this scenario.

Delegation Mechanics

The consumer's internal rebalance method invoker in SubscriptionState switches from calling the one arg methods to calling the two arg methods. Default implementations on the two-arg methods delegate to the one-arg methods, preserving existing behavior for all current implementations. This has already been accepted as the pattern when introducing the onPartitionsLost method.

It is worth going over the mechanics for overriding the various methods:

Call chain for existing implementations (user overrides only one-arg methods)

Internal impl invokes 2-arg method which delegates to the 1-arg method.

Call chain for existing implementations (user overrides all methods since only the 2-arg methods are default)

Internal impl invokes 2-arg method, correctly implemented by user and triggers correctly.

onPartitionsLost delegation behaviour

  • When implementing the new RebalanceListener
    • When only onPartitionsRevoked and onPartitionsAssigned 2-arg methods overridden
      • Impl -> onPartitionsLost(partitions, view) -> onPartitionsRevoked(partitions, view).
    • No override for onPartitionsRevoked 2-arg
      • Impl -> onPartitionsLost(partitions, view) -> onPartitionsLost(partitions) -> onPartitionsRevoked(partitions) which is the current behaviour.
  • When implementing legacy ConsumerRebalanceListener:
    • When only onPartitionsRevoked and onPartitionsAssigned 2-arg methods overridden
      • Impl -> onPartitionsLost(partitions, view) -> onPartitionsLost(partitions) -> onPartitionsRevoked(partitions)
    • No override for onPartitionsRevoked 2-arg
      • Impl -> onPartitionsLost(partitions, view) -> onPartitionsLost(partitions) -> onPartitionsRevoked(partitions) which is the current behaviour.

Internal Implementation

The consumer creates a RebalanceConsumer wrapping this (created at each rebalance, discarded after). Works with both the classic and new consumer group. The internal rebalance machinery changes from:

// Before 
listener.onPartitionsAssigned(partitions); 
listener.onPartitionsRevoked(partitions); 
listener.onPartitionsLost(partitions);

to:

// After
try (var ignored = new DelegatingRebalanceConsumer(this.consumer)) {
    listener.onPartitionsAssigned(partitions, view);
}

Any call to a closed rebalanceconsumer throws IllegalStateException.

Threading is identical to the existing model where callbacks fire on the consumer's poll thread

A Note on the consumer view.

View lifetime: The view is valid only for the duration of the callback invocation. The internal implementation sets a validity flag to false when the callback returns. Any method call on a stale view throws IllegalStateException. This prevents the common misuse of storing the view reference for later use:

expired-view
// This will throw IllegalStateException when someApplicationMethod() is called
private RebalanceConsumer storedView;

void onPartitionsAssigned(Collection<TopicPartition> partitions,
                          RebalanceConsumer consumer) {
    this.storedView = consumer; // Stores reference
}

void someApplicationMethod() {
    storedView.commitSync(); // throws IllegalStateException: view expired
}

Exception Handling

If a callback throws an exception, the behavior matches ConsumerRebalanceListener:

  • The exception propagates to the poll() caller.
  • The rebalance completes regardless & the consumer joins the group with its new assignment.
  • Other callbacks are not affected (e.g., if onPartitionsRevoked throws, onPartitionsAssigned is still called for the new assignment).
  • The consumer remains in a usable state after the exception

Compatibility, Deprecation, and Migration Plan

ConsumerRebalanceListener is marked for deprecation and eventual removal. The new interface RebalanceListener will eventually be the standard interface from AK5.0 to support rebalance callbacks.

  • ConsumerRebalanceListener  for callbacks that don't need consumer access (logging, metrics, notification)
  • onPartitions*(partitions, view) for callbacks that need to commit, seek, pause/resume, or query consumer state

Remove all subscribe() method variants with rebalance listener in AK5.0, supporting the new and deprecated methods in 4.4.+

We plan to use the RebalanceConsumer interface for future KIPs that can provide metadata about the rebalance event itself such as cause, reason string etc.

Test Plan


  • Unit tests for RebalanceConsumer delegation of all permitted operations
  • Unit tests for RebalanceConsumer throwing IllegalStateException after callback returns
  • Unit tests for new ConsumerRebalanceListener lifecycle (assign, revoke, lost)
  • Unit tests for consumer state visibility during each callback (assignment includes/excludes correct partitions)
  • Unit tests for pause state persistence after onPartitionsAssigned returns
  • Unit tests for exception propagation from callbacks
  • Integration tests with cooperative and eager assignors
  • Integration tests for seek and commit from within callbacks
  • Integration tests under incremental cooperative rebalancing

Rejected Alternatives

Default methods in ConsumerRebalanceListener The previous accepted version of this KIP chose to support both 1-arg and 2-arg variants of the callback methods to minimise migration burden with a delegating/fallback implementation by making all methods with a default empty body. However, this was a breaking change for Scala clients that wanted to create a listener object, having to mark each method with an override def. The current proposed solution is a cleaner break that provides for both backwards compatibility that supports both Java and Scala clients.

Provide a RebalanceContext as parameter for callback Providing a Context that captures a Consumer that can operate at rebalance time can be verbose even if it allows extension of such an interface with other metadata about the rebalance. A flat structure 

New interface with new subscribe() overloads A separate ConsumerRebalanceCallback / RebalanceHandler interface with new subscribe() overloads on Consumer. This requires new methods on Consumer, KafkaConsumer, AsyncKafkaConsumer, and MockConsumer. Every future KIP touching rebalance callbacks must update both interfaces. Default methods on the existing interface avoid the parallel hierarchy and leave the Consumer interface untouched. The new callback must also propagated through SubscriptionState for classic and appEventHandler for async consumer which adds unnecessary complexity. 

Pass the full Consumer<K, V>. Relies on documentation to prevent misuse. A restricted view makes dangerous operations a compile error.

Runtime enforcement via UnsupportedOperationException on full Consumer. Compile-time enforcement is safer and self-documenting since IDE shows exactly what's available.

  • No labels