Status

Current state: Accepted

Discussion thread: link

Voting thread: link

JIRA: KAFKA-20385 - Getting issue details... STATUS

Motivation

The ConsumerRebalanceListener Javadoc recommends performing consumer operations during rebalance callbacks:

"It is common for the revocation callback to be used to commit offsets for the partitions being revoked." "The assigned callback could be used to set the initial position of the assigned partitions."

But the callback signatures provide no access to the consumer:


void onPartitionsAssigned(Collection<TopicPartition> partitions);
void onPartitionsRevoked(Collection<TopicPartition> partitions);


The standard workaround is constructor injection: consumer.subscribe(topics, new MyListener(consumer)). This gives the listener an unrestricted Consumer reference and nothing prevents calling poll(), close(), or subscribe() inside a rebalance callback.

Every major framework has independently built the same fix:

  • Spring Kafka (since 2.0, 2017): ConsumerAwareRebalanceListener passes Consumer<?, ?> to every callback. Also splits onPartitionsRevoked into before-commit and after-commit phases.
  • SmallRye Reactive Messaging (Quarkus): KafkaConsumerRebalanceListener passes Consumer<?, ?>. Their docs: "Unlike the ConsumerRebalanceListener from Apache Kafka, the methods pass the Kafka Consumer."
  • Micronaut Kafka: ConsumerAware interface injects the consumer via setter, used alongside ConsumerRebalanceListener.

All three pass the full Consumer. This KIP does the same but through a restricted view that omits operations dangerous during a rebalance (poll(), close(), subscribe(), wakeup()), making misuse a compile error.

Use Cases

1. Offset Commit on Revocation

MyRebalanceListener
// Today
class MyListener implements ConsumerRebalanceListener {
    private final Consumer<?, ?> consumer;
    MyListener(Consumer<?, ?> consumer) { this.consumer = consumer; }
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync(); // unrestricted reference
    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
}

// With this KIP
consumer.subscribe(topics, new ConsumerRebalanceListener() {
    // rest of the methods
    @Override
    public void onPartitionsRevoked(
            Collection<TopicPartition> partitions, RebalanceConsumer consumer) {
        consumer.commitSync();
    }
	@Override
    public void onPartitionsAssigned(
            Collection<TopicPartition> partitions, RebalanceConsumer consumer) {}
	
});

2. Seek to External Offsets on Assignment

consumer.position(), consumer.seek(), consumer.commitSync() are all available through RebalanceConsumer.

3. Partition Pause/Resume on Assignment

Pause newly assigned partitions until caches are warm, then resume outside the callback. In this case, the paused state set during onPartitionsAssigned persists after the callback returns and the consumer does not automatically resume partitions when the callback exits. When partitions are revoked, any paused states on those partitions are cleared by the consumer.

Public Interfaces

New Default methods to ConsumerRebalanceListener

RebalanceHandler
package org.apache.kafka.clients.consumer;

/**
 * {@inheritDoc}
 */
public interface ConsumerRebalanceListener {
 	// -- rest of the methods --

	/**
     * Called after partitions have been assigned to this consumer.
     *
     * <p>The default implementation delegates to
     * {@link #onPartitionsAssigned(Collection)}. Override this method
     * instead of the one-arg variant when the callback needs to interact
     * with the consumer (commit offsets, seek, pause/resume).
     *
     * <p>When this method is overridden, the one-arg
     * {@code onPartitionsAssigned(Collection)} is not called unless the
     * override explicitly invokes it.
     *
     * @param partitions the newly assigned partitions
     * @param consumer   a restricted view of the consumer; supports seek,
     *                   commit, pause/resume, and read-only queries
     */
    default void onPartitionsAssigned(Collection<TopicPartition> partitions,
                                       RebalanceConsumer consumer) {
        onPartitionsAssigned(partitions);
    }

    /**
     * Called before partitions are revoked from this consumer during a
     * cooperative rebalance, or before all partitions are revoked during
     * an eager rebalance.
     *
     * <p>The default implementation delegates to
     * {@link #onPartitionsRevoked(Collection)}.
     *
     * @param partitions the partitions being revoked
     * @param consumer   a restricted view of the consumer
     */
    default void onPartitionsRevoked(Collection<TopicPartition> partitions,
                                      RebalanceConsumer consumer) {
        onPartitionsRevoked(partitions);
    }

    /**
     * Called when partitions have been lost. Unlike revocation, the consumer
     * no longer owns these partitions and commits will fail.
     *
     * <p>The default implementation delegates to
     * {@link #onPartitionsRevoked(Collection, RebalanceConsumer)},
     * preserving the existing delegation chain from KIP-429.
     *
     * @param partitions the partitions that were lost
     * @param consumer   a restricted view of the consumer
     */
    default void onPartitionsLost(Collection<TopicPartition> partitions,
                                   RebalanceConsumer consumer) {
        onPartitionsRevoked(partitions, consumer);
    }
}

RebalanceConsumer

A restricted projection of Consumer<?, ?> exposing only operations safe during a rebalance:

ConsumerRebalanceAdapter
package org.apache.kafka.clients.consumer;

/**
 * A restricted view of a {@link Consumer} passed to {@link ConsumerRebalanceListener} methods.
 * This object is only valid for the duration of the rebalance callback. Any use outside of this scope,
 * will result in an {@link IllegalStateException}.
 *
 * <p>This interface exposes only operations that are safe and meaningful
 * during partition rebalancing:
 *
 * <ul>
 *   <li><b>Offset management:</b> {@code commitSync}, {@code commitAsync},
 *       {@code committed}, {@code position}</li>
 *   <li><b>Seek:</b> {@code seek}, {@code seekToBeginning},
 *       {@code seekToEnd}</li>
 *   <li><b>Partition state:</b> {@code assignment}, {@code paused},
 *       {@code pause}, {@code resume}</li>
 *   <li><b>Read-only queries:</b> {@code partitionsFor}, {@code listTopics},
 *       {@code offsetsForTimes}, {@code beginningOffsets},
 *       {@code endOffsets}, {@code metrics}, {@code clientInstanceId}, {@code currentLag}, {@code groupMetadata}</li>
 * </ul>
 *
 * <p>Operations that are dangerous, inapplicable during a rebalance are not present
 * on this interface:
 *
 * <ul>
 *   <li>{@code poll()} - would cause re-entrant polling</li>
 *   <li>{@code close()} - would terminate the consumer mid-rebalance</li>
 *   <li>{@code subscribe()}, {@code unsubscribe()} - would corrupt
 *       subscription state during rebalance</li>
 *   <li>{@code registerMetricForSubscription}, {@code unregisterMetricForSubscription} - metric
 *       registration does not belong during rebalance events</li>
 *   <li>{@code assign()} - conflicts with group-managed assignment</li>
 *   <li>{@code wakeup()} - would interrupt the rebalance</li>
 *   <li>{@code enforceRebalance()} - would trigger re-entrant rebalance</li>
 * </ul>
 */
public interface RebalanceConsumer extends Closeable {

    // --- Offset management ---

    void commitSync();
    void commitSync(Duration timeout);
    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout);
    void commitAsync();
    void commitAsync(OffsetCommitCallback callback);
    void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
                     OffsetCommitCallback callback);

    Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);
    Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions,
                                                      Duration timeout);
    long position(TopicPartition partition);
    long position(TopicPartition partition, Duration timeout);

    // --- Seek ---

    void seek(TopicPartition partition, long offset);
    void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
    void seekToBeginning(Collection<TopicPartition> partitions);
    void seekToEnd(Collection<TopicPartition> partitions);

    // --- Partition state ---

    Set<TopicPartition> assignment();
    void pause(Collection<TopicPartition> partitions);
    void resume(Collection<TopicPartition> partitions);
    Set<TopicPartition> paused();

    // --- Read-only queries ---

	Uuid clientInstanceId(Duration timeout);
    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions,
                                                Duration timeout);
    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions,
                                         Duration timeout);
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
            Map<TopicPartition, Long> timestampsToSearch);
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
            Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
    List<PartitionInfo> partitionsFor(String topic);
    List<PartitionInfo> partitionsFor(String topic, Duration timeout);
    Map<String, List<PartitionInfo>> listTopics();
    Map<String, List<PartitionInfo>> listTopics(Duration timeout);
	OptionalLong currentLag(TopicPartition topicPartition);
    ConsumerGroupMetadata groupMetadata();

    @Override
    void close() throws Exception;
}


Included: commitSync, commitAsync, committed, position, seek, seekToBeginning, seekToEnd, assignment, pause, resume, paused, beginningOffsets, endOffsets, offsetsForTimes, partitionsFor, listTopics, metrics, groupMetadata, currentLag

Omitted (not on the interface): poll, close, subscribe*, unsubscribe, assign, wakeup, enforceRebalance,(un)registerMetricForSubscription


Consumer

Consumer
/** Update class javadoc with setRebalanceListener pattern. */

/** Also add a reference to {@link #setConsumerRebalanceListener} on existing subscribe() methods, to help with 5.0 migration. */

/** 
 * Sets the current listener on this consumer for all rebalance events.
 * Rebalance events made to this listener will take effect at the next poll() invocation. 
 * Setting a {@code null} reference removes the currently set listener. 
 */
void setConsumerRebalanceListener(ConsumerRebalanceListener callback);

/** Use {@link #setConsumerRebalanceListener} instead. */
@Deprecated(since = "4.4", forRemoval = true)
void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback);

/** Use {@link #setConsumerRebalanceListener} instead. */
@Deprecated(since = "4.4", forRemoval = true)
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);

/** Use {@link #setConsumerRebalanceListener} instead. */
@Deprecated(since = "4.4", forRemoval = true)
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);

Proposed Changes

Callback reference in Consumer implementations

Since both setConsumerRebalanceListener and subscribe(..., ConsumerRebalanceListener) will coexist until their deprecation. Both AsyncKafkaConsumer and ClassicKafkaConsumer eventually override the reference of rebalanceListener within SubscriptionsState.

Once the subscribe(..., ConsumerRebalanceListener) methods are deprecated, the setConsumerRebalanceListener flow will be the only remnant.

There is no change in behaviour with the existing subscribe() methods with the callback argument to ease transition. At the next major release when users upgrade to the latest version, the javadoc for subscribe() method will point them to the set method.

A future KIP could be to expose addConsumerRebalanceListener() to the Consumer API to support multiple listeners as users currently rely on wrapping their internal listeners similar to Producer/ConsumerInterceptors.

Delegation Mechanics

The consumer's internal rebalance machinery switches from calling the one arg methods to calling the two arg methods. Default implementations on the two-arg methods delegate to the one-arg methods, preserving existing behavior for all current implementations. This has already been accepted as the pattern when introducing the onPartitionsLost method.

It is worth going over the mechanics for overriding the various methods:

Call chain for existing implementations (user overrides only one-arg methods)

Internal impl invokes 2-arg method which delegates to the 1-arg method.

Call chain for existing implementations (user overrides all methods since only the 2-arg methods are default)

Internal impl invokes 2-arg method, correctly implemented by user and triggers correctly.

onPartitionsLost delegation behaviour

When only onPartitionsRevoked and onPartitionsAssigned 2-arg methods overridden

Impl → onPartitionsLost(partitions, view) → onPartitionsRevoked(partitions, view).

No override for onPartitionsRevoked

Impl → onPartitionsLost(partitions, view) → onPartitionsRevoked(partitions, view) → onPartitionsRevoked(partitions) which is the current behaviour.

Internal Implementation

The consumer creates a RebalanceConsumer wrapping this (created at each rebalance, discarded after). Works with both the classic and new consumer group. The internal rebalance machinery changes from:

// Before 
listener.onPartitionsAssigned(partitions); 
listener.onPartitionsRevoked(partitions); 
listener.onPartitionsLost(partitions);

to:

// After
try (var ignored = createNewRebalanceConsumer()) {
    listener.onPartitionsAssigned(partitions, view);
}

For AsyncKafkaConsumer, this is to change the calls in invokeRebalanceCallbacks* within ConsumerRebalanceListenerInvoker.

Any call to a closed rebalanceconsumer throws IllegalStateException.

Threading is identical to the existing model where callbacks fire on the consumer's poll thread

A Note on the consumer view.

View lifetime: The view is valid only for the duration of the callback invocation. The internal implementation sets a validity flag to false when the callback returns. Any method call on a stale view throws IllegalStateException. This prevents the common misuse of storing the view reference for later use:

expired-view
// This will throw IllegalStateException when someApplicationMethod() is called
private RebalanceConsumer storedView;

void onPartitionsAssigned(Collection<TopicPartition> partitions,
                          RebalanceConsumer consumer) {
    this.storedView = consumer; // Stores reference
}

void someApplicationMethod() {
    storedView.commitSync(); // throws IllegalStateException: view expired
}

Exception Handling

If a callback throws an exception, the behavior matches ConsumerRebalanceListener:

  • The exception propagates to the poll() caller.
  • The rebalance completes regardless & the consumer joins the group with its new assignment.
  • Other callbacks are not affected (e.g., if onPartitionsRevoked throws, onPartitionsAssigned is still called for the new assignment).
  • The consumer remains in a usable state after the exception

Compatibility, Deprecation, and Migration Plan

No deprecation of existing ConsumerRebalanceListener methods. Both methods coexist indefinitely:

  • ConsumerRebalanceListener  for callbacks that don't need consumer access (logging, metrics, notification)
  • onPartitions*(partitions, view) for callbacks that need to commit, seek, pause/resume, or query consumer state

Remove all subscribe() method variants with rebalance listener in AK5.0, supporting the new and deprecated methods in 4.4.+

Test Plan

  • Unit tests for

    RebalanceConsumer delegation of all permitted operations
  • Unit tests for RebalanceConsumer throwing IllegalStateException after callback returns
  • Unit tests for new ConsumerRebalanceListener lifecycle (assign, revoke, lost)
  • Unit tests for consumer state visibility during each callback (assignment includes/excludes correct partitions)
  • Unit tests for pause state persistence after onPartitionsAssigned returns
  • Unit tests for exception propagation from callbacks
  • Integration tests with cooperative and eager assignors
  • Integration tests for seek and commit from within callbacks
  • Integration tests under incremental cooperative rebalancing

Rejected Alternatives

New interface with new subscribe() overloads A separate ConsumerRebalanceCallback / RebalanceHandler interface with new subscribe() overloads on Consumer. This requires new methods on Consumer, KafkaConsumer, AsyncKafkaConsumer, and MockConsumer. Every future KIP touching rebalance callbacks must update both interfaces. Default methods on the existing interface avoid the parallel hierarchy and leave the Consumer interface untouched. The new callback must also propagated through SubscriptionState for classic and appEventHandler for async consumer which adds unnecessary complexity. 

Pass the full Consumer<K, V>. Relies on documentation to prevent misuse. A restricted view makes dangerous operations a compile error.

Runtime enforcement via UnsupportedOperationException on full Consumer. Compile-time enforcement is safer and self-documenting since IDE shows exactly what's available.

  • No labels