Status

Current state: ACCEPTED

Discussion thread: here 

JIRA: KAFKA-17600 - Getting issue details... STATUS
related tickets:  KAFKA-16248 - Getting issue details... STATUS KAFKA-17622 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In this KIP, we intend to expose the next offset to be fetched after an invocation of Consumer.poll() . The new ConsumerRecords constructor initializes the next offsets as well as the leader epoch wrapped in an OffsetAndMetadata object.

An example of a use case: When a Kafka Streams application commits offsets after processing all records in its internal buffers, the straightforward approach would be to commit the offset of the last record in the ConsumerRecords object plus one. However, this approach results in a constant non-zero lag for the consumer because the last data record is usually followed by one or more control records, preventing the end offset of the topic from being committed. To mitigate this, Kafka Streams commits the offset returned by Consumer.position(), which is the next offset to be fetched after the last fetched batch, including any potential commit control records. However, Consumer.position() does not include the leader epoch. More over, in some special cases, due to some race condition between the application/StreamThread and consumer heartbeat threads, the Consumer.position() does not return the correct next offset.

Public Interfaces

In this KIP, we propose to add a new constructor to the ConsumerRecords class. Also, the method nextOffsets() that has been suggested and accepted in KIP-320 will be implemented for the same class. In order to prevent introducing potential bugs in future, the current ConsumerRecords  constructor should be deprecated.

Proposed Changes

ConsumerRecords.java
// the current constructor
@Deprecated
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records)
 
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records, Map<TopicPartition, OffsetAndMetadata> nextOffsets) {}

/**
* Get the next offsets that the consumer will consume.
*/ 
public Map<TopicPartition, OffsetAndMetadata> nextOffsets();


The nextOffsets object is a map of TopicPartition to  OffsetAndMetadata which contains the nextoffset and the last leader epoch. Both values (the nextoffset and the last leader) are retrieved from the CompletedFetch  object.  
The ShareConsumerImpl uses the current constructor (without nextOffsets which is actually an empty map), since consumers in share groups do not have a position. 

Compatibility, Deprecation, and Migration Plan

The current constructor is deprecated but not removed yet. Therefore, there will be no compatibility issue for now. The new constructor will be used across all AK.

Test Plan

Some unit tests will be added to the affected classes. More over passing the existing integration tests indicates that the idea works fine.

Rejected Alternatives

  • Caching the leader epoch offset ranges inside the consumer is not sufficient, because it is the Producer that commits the offset in the case of EOS
  • Adding a new Consumer.positionWithMetadata() could work, but complicates the consumer interface, and also suffers from a race condition (by the time we want to commit, the position may be gone).


  • No labels