Current state: Accepted
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Consumer API currently has one close() method without a timeout. KAFKA-3703 added graceful close of the consumer with a hard-coded timeout of 5 seconds to ensure that consumers complete pending commits and leave the group gracefully before closing connection to the coordinator.
Producer interface has two close() methods.
public void close(); public void close(long timeout, TimeUnit unit);
The first variant waits for graceful close without a time limit (
timeout=Long.MAX_VALUE). The second attempts to close gracefully, but terminates the produce, aborting any pending requests after the timeout.
For consistency with producers, consumers should also expose the same two variants for close.
A new close method with timeout will be added to the
Consumer interface. The existing
close() method will attempt to close gracefully with a default timeout of 30 seconds.
/** * Tries to close the consumer cleanly within the specified timeout. If the close does not complete within the * timeout, force close the consumer. */ public void close(long timeout, TimeUnit unit);
close methods will attempt to commit offsets if necessary and leave group before the coordinator connection is closed. The maximum time allowed to complete each request initiated during close will be the request timeout. This ensures that close with large timeout doesn't wait forever if requests are never completed, for example when brokers are shutdown. Note that this KIP only adds timeouts to the close code path. Unbounded waits in the consumer during normal operation are not addressed by this KIP.
close(long timeout, TimeUnit unit) method will use the specified timeout as the maximum time for attempting to close gracefully. If commit or leave group request is not complete within the timeout, the connection is closed and the requests may not be sent to the broker.
close() method without a timeout will attempt to close the consumer gracefully with a default timeout of 30 seconds. This is different from the producer default of Long.MAX_VALUE since consumers don't have large requests. Applications can specify a timeout of
Long.MAX_VALUE to attempt graceful close without a timeout. Since commit and leave group requests are timed out after the request timeout, the upper bound will be approximately
2*request.timeout.ms (around 10 minutes by default).
The hard-coded 5 second timeout to send requests to the broker before closing the coordinator connection will be removed. This will be replaced with the lower of request timeout or the time remaining from the close timeout specified.
Compatibility, Deprecation, and Migration Plan
What impact (if any) will there be on existing users?
Versions 0.10.1.0 and earlier closed consumers without waiting for commits or leave group requests to be sent. With this change,
consumer.close() method waits for all pending requests to be sent for upto 30 seconds and hence
close() could take longer to complete.
Unit tests added in KAFKA-3703 to test the hard-coded timeout will be altered to work with the new close timeout. Existing integration and system tests already test close with default timeout.
Use a small hard-coded timeout
A single close method with a small hard-coded timeout can be used to attempt to close gracefully if possible. But this is inconsistent with the
Producer where applications can choose to wait for a longer time or not wait at all for graceful close.
Use a Long.MAX_VALUE as default timeout
Producer.close() uses Long.MAX_VALUE as default timeout. Consumers could also use the same default. But this could result in waits of upto 10 minutes for the default close() method to complete with the default request timeout of 5 minutes. Since consumers only attempt to commit offsets and leave group during close() on a best-effort basis, a smaller default value is more suitable.