You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Current »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Motivation

Kafka consumer pipelines the fetching of data in order to maximise performance. Whenever poll(Duration)/poll(long) is called before any results is returned, another fetch is issued. Albeit benefitting performance, in some circumstances when combined with the use of the pause/resume API, this optimisation can result in transferring quite a bit of duplicate data over the wire. The reason for this to happen is that whenever poll is called any prefetched data is thrown away in case the topic-partition is paused. To illustrate the effect with a simple example, imagine that a single KafkaConsumer instance is assigned two topic partitions TP1 and TP2. Since the client interested in TP1 cannot handle records as fast than the one in TP2, we resort to pausing TP1 whenever we are not interested in receiving records for it. This results in the following behavior:

  1. TP1 is resumed and poll is called on it, where poll returns some data
  2. The consumer issues a fetch request in order to pre-fetch the next batch of records for TP1
  3. TP2 is resumed and TP1 paused (as the consumer of TP1 is not ready for more records)
  4. All prefetched records for TP1 are now thrown away.
  5. This cycle repeats indefinitely

This KIP proposes an improvement that allows us to control whether we want to instead of throwing away the prefetched data, simply return it along with the rest of the records coming from partitions that are not in paused state. This shall allow the client of the API to store that data and process it when ready instead of causing it to be fetched once again.

Public Interfaces

We add a boolean configuration parameter - return.prefetched.data.for.paused, which defaults to false. This will cause the KafkaConsumer to return data that has already been prefetched even if the topic partition is in paused state.

Proposed Changes

The change being proposed here is simply using the configuration parameter in the Fetcher class in order to opt out of aways throwing away prefetched data for partitions that are paused.

Compatibility, Deprecation, and Migration Plan

No breaking changes.

Rejected Alternatives

None

  • No labels