Status
Current state: ACCEPTED
Discussion thread: here
JIRA:
-
KAFKA-17600Getting issue details...
STATUS
related tickets:
-
KAFKA-16248Getting issue details...
STATUS
-
KAFKA-17622Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In this KIP, we intend to expose the next offset to be fetched after an invocation of Consumer.poll()
. The new ConsumerRecords
constructor initializes the next offsets as well as the leader epoch wrapped in an OffsetAndMetadata
object.
An example of a use case: When a Kafka Streams application commits offsets after processing all records in its internal buffers, the straightforward approach would be to commit the offset of the last record in the ConsumerRecords
object plus one. However, this approach results in a constant non-zero lag for the consumer because the last data record is usually followed by one or more control records, preventing the end offset of the topic from being committed. To mitigate this, Kafka Streams commits the offset returned by Consumer.position()
, which is the next offset to be fetched after the last fetched batch, including any potential commit control records. However, Consumer.position()
does not include the leader epoch. More over, in some special cases, due to some race condition between the application/StreamThread and consumer heartbeat threads, the Consumer.position()
does not return the correct next offset.
Public Interfaces
In this KIP, we propose to add a new constructor to the ConsumerRecords
class. Also, the method nextOffsets()
that has been suggested and accepted in KIP-320 will be implemented for the same class. In order to prevent introducing potential bugs in future, the current ConsumerRecords
constructor should be deprecated.
Proposed Changes
// the current constructor @Deprecated public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records, Map<TopicPartition, OffsetAndMetadata> nextOffsets) {} /** * Get the next offsets that the consumer will consume. */ public Map<TopicPartition, OffsetAndMetadata> nextOffsets();
The nextOffsets
object is a map of TopicPartition
to OffsetAndMetadata
which contains the nextoffset and the last leader epoch. Both values (the nextoffset and the last leader) are retrieved from the CompletedFetch
object.
The ShareConsumerImpl
uses the current constructor (without nextOffsets
which is actually an empty map), since consumers in share groups do not have a position.
Compatibility, Deprecation, and Migration Plan
The current constructor is deprecated but not removed yet. Therefore, there will be no compatibility issue for now. The new constructor will be used across all AK.
Test Plan
Some unit tests will be added to the affected classes. More over passing the existing integration tests indicates that the idea works fine.
Rejected Alternatives
- Caching the leader epoch offset ranges inside the consumer is not sufficient, because it is the
Producer
that commits the offset in the case of EOS - Adding a new
Consumer.positionWithMetadata()
could work, but complicates the consumer interface, and also suffers from a race condition (by the time we want to commit, the position may be gone).