You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: Discuss

Discussion thread: here

JIRA: KAFKA-2120

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In old producer/consumer, we have a socket timeout associated with each request, the producer/consumer will send a request and then wait for response. If no response is received from a broker within specified timeout, the request will fail.

In the NetworkClient of new producer/consumer, currently we don't have a similar timeout for requests. Adding a client side request timeout in NetworkClient would be useful for the following reasons:

1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout.

2. Currently the producer sends requests in a non-blocking way and does not wait for response. If a socket is disconnected the network client will fail the requests with a NetworkException. However, It might take very long for TCP connection to timeout. And this mechanism is not sufficient in some cases. For example, if a broker is behind a VIP, it is possibel that the broker is down but the connection between client and VIP is still alive. In this case the client will never be able to receive the response. Adding a client side request timeout will help solve such kind of issues.

Public Interfaces

Add the following new configuration to org.apache.kafka.clients.producer.ProducerConfig and org.apache.kafka.clients.consumer.ConsumerConfig:

public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms";

private static final String REQUEST_TIMEOUT_DOC = "The configuration controls the maximum amount of time the producer will wait for the response of a request. If the "

                                              + "response is not received before the timeout elapses the producer will resend the request if necessary or fail the request if retries are exhausted."

If a request is timeout and retries have not been exhausted, the request will be retried.

When a request is timeout and retries are exhausted, an org.apache.kafka.common.errors.TimeoutException will be put in the returned future of a request and callback will be fired.

Proposed Changes

Because the NetworkClient works in an async way. We need to keep track of the send time for each request. So the implementation will the following:

  1. associate a time stamp with each inflight requests and retry
  2. The poll() will go through the inflight requests and expire those requests if necessary.
  3. The timeout for poll() will need to consider the request timeout for inflight requests.

Compatibility, Deprecation, and Migration Plan

The changes should be backward compatible.

Rejected Alternatives

None

  • No labels