Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Discuss

Discussion thread: here

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In old producer/consumer, we have a socket timeout associated with each request, the producer/consumer will send a request and then wait for response. If no response is received from a broker within specified timeout, the request will fail.

...

  1. Clear timeout configurations - Configurations should be intuitive and and do exactly what they mean.
  2. Have bounded blocking time for send() - user should be able to have a bounded blocking time when they call send()
  3. No message/batch/requests will be able to reside in producer without a configurable timeout - currently batches can sit in accumulator for long time (KAFKA-1788), requests can be in-flight until TCP timeout (KAFKA-2120). In this KIP, we want to see how can we make sure we expire messages/batches/requests.

Public Interfaces

From discussion on the mailing list and within LinkedIn, we now have three options of the end state of timeout configuration for KafkaProduce:

Option 1

  1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The max time to block when buffer is full.
  2. metadata.fetch.timeout.ms  - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available.
  3. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

...

  • Need to educate user about metadata and buffer.
  • The request timeout is less explicit but will be affected by retries, linger.ms, retry backoff time, request timeout, etc.

Option 2

  1. max.enqueue.block.ms - the maximum time producer.send() will block, including:
    1. metadata fetch time
    2. buffer full block time
    3. serialization time (customized serializer)
    4. partitioning time (customized partitioner)
  2. request.timeout.ms - the maximum time to wait for the response of a message after message has been appended to the accumulator. Including:
    1. linger.ms
    2. actual network RTT
    3. server replication time
    4. retries

...

  • Less accurate expiry time - message can be timeout out between (2) and 2*(2) + (1).
  • Might be a little bit difficult to tune for some user (e.g.. Some user might be willing to wait for buffer but not metadata)

Option 3

Similar to option 2 but a little bit different on the request.timeout.ms

...

3. Remove METADATA_FETCH_TIMEOUT_CONFIG, TIMEOUT_CONFIG, BLOCK_ON_BUFFER_FULL_CONFIG.

Proposed Changes

Because the NetworkClient works in an async way. We need to keep track of the send time for each request. So the implementation will the following:

...

The request timeout will be set to a reasonable value, say 60 seconds.

Actions after request timeout

When the request timeout has been reached, we do the following:

...

In most cases, metadata refresh should be able to pick up the new leader if a broker is down. If a broker was not down but just slow, as long as request timeout is set to a reasonable value, we should not see dramatic increase in TCP connections when a broker was not down but just slow.

Compatibility, Deprecation, and Migration Plan

To remove these configurations, we will do the following;

  1. In 0.8.2.2 or 0.8.3, we will :
    1. Keep all the deprecated configuration there.
    2. If user sets TIMEOUT_CONFIG, we override REQUEST_TIMEOUT_CONFIG and show a deprecation warning and advertise the new configuration.
    3. If user set METADATA_FETCH_TIMEOUT_CONFIG, we override MAX_ENQUEUE_TIMEOUT_CONFIG with METADATA_FETCH_TIMEOUT_CONFIG and warn about the deprecation.
    4. if user set BLOCK_ON_BUFFER_FULL_CONFIG to true, we override the MAX_ENQUEUE_TIMEOUT_CONFIG with Long.MAX_VALUE and warn about the deprecation.
  2. In 0.9, we will remove TIMEOUT_CONFIG, METADATA_FETCH_TIMEOUT_CONFIG and BLOCK_ON_BUFFER_FULL_CONFIG.

Rejected Alternatives

batch.timeout.ms - In some cases, the leader broker information of a batch could be unavailable after the batch is ready to be sent (i.e. batch is full or reached linger.ms). This configuration define the maximum time the producer will wait for the necessary broker information before it fail the batches.