DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Overview
Many of the Consumer APIs allow the user to provide a timeout. For example, when a user invokes Consumer.poll(10000), the timeout is set to 10000 milliseconds. The user's expectation is that for 10 seconds the application will block, waiting for a response. After 10 seconds, the operation will time out and control will return to the application.
The user treats the timeout like a black box; it is the upper-bound on the length of time spent executing the API call. Under the covers, all of the operations performed by the client to satisfy the API call must, in aggregate, execute within the timeout provided by the user.
Consumer APIs which all the user to provide a timeout include a Duration object in the API parameters.
Here's an example of an API call that allows for a timeout:
public void commitSync() public void commitSync(Duration timeout);
Affected APIs
Not all of the APIs support timeouts, but the following APIs either require or allow timeouts:
beginningOffsetsclientInstanceIdclosecommitSynccommittedendOffsetslistTopicsoffsetsForTimespartitionsForpollposition
Default Timeouts
Some of the Consumer APIs provide overloaded versions of the method that allow for an optional timeout. What happens when a user invokes the version of he method that doesn't provide a timeout? In this case there is an implied timeout, as provided by the default.api.timeout.ms configuration option:
Specifies the timeout (in milliseconds) for client APIs. This configuration is used as the default timeout for all client operations that do not specify a
timeoutparameter.
So the implementation of method such as commitSync() essentially just calls its sibling version (commitSync(Duration timeout)) like this:
public void commitSync() {
Duration timeout = Duration.ofMillis(defaultApiTimeoutMs);
commitSync(timeout);
}
Design Tenets
The timeout mechanism for the LegacyKafkaConsumer does not appear to be documented. It is up to digging through the documentation and source code, which results in different interpretations of the intended behavior.
This document proposes a design for timeouts for the AsyncKafkaConsumer, with the following requirements.
Timeouts must be respected
Any timeout provided by the user should be enforced. This includes the implicit default timeout configured using default.api.timeout.ms.
Timeout accuracy is not exact
Apache Kafka does not have realtime semantics, so the precision with which the timeout is enforced is undefined. The enforcement of the timeout will be expected to be "close but not exact." For example, a supplied timeout of 0 is impossible to enforce strictly.
Operations will be performed at least once
All operations must be attempted at least once. The ordering is attempt-then-check, not check-then-attempt. The timeout is not checked before attempting an operation, which means that the timeout is effectively ignored before performing the operation.
Additionally, Checking for timeout expiration happens only after an attempt has failed. As long as a series of operations succeeds, the timeout will not be checked or enforced. An example is when performing a Consumer.close()—it's better to take a little longer, if necessary, to close resources properly even if it takes a few more milliseconds than expected.
The network I/O thread determines enforcement
The application thread does not have the information and context to determine the status of ongoing operations, so it falls to the background thread to check and enforce.
When the network I/O thread determines that an event has passed its expiration, the event will be completed by the network thread invoking CompletableFuture.compleExceptionally() with a TimeoutException.
When a RequestManager is used as part of an operation, it will be provided with the timeout for its reference. It is the responsibility of the RequestManager to ensure that any of the network requests it issues respect the timeout. As mentioned above, the first try is allowed to be performed without checking the timeout first, but if the initial attempt fails, the RequestManager must check the timeout before enqueuing/preparing any retries.
Enforcing Timeouts for Events
Throughout AsyncKafkaConsumer we have adopted a pattern when performing time-bound operations:
- Create an instance of a
CompletableApplicationEvent(an event with an attachedCompletableFuture) - Add the event to the application event queue (these events are dequeued and processed by the network I/O thread)
- Call
Future.get(timeout)
However, this introduced a subtle timing difference between the AsyncKafkaConsumer and the LegacyKafkaConsumer. The issue can be easily reproduced via any of the KafkaConsumerTest unit tests that invoke the consumer.poll(0) API. Here's a bit of code that illustrates the difference between the two approaches.
LegacyKafkaConsumer performs most of its network I/O operations directly on the application thread. Here is a little snippet which shows the pattern that the LegacyKafkaConsumer uses:
public int foo(Timer timer) {
do {
final RequestFuture<Integer> future = sendSomeRequest();
client.poll(future, timer);
if (future.isDone())
return future.get();
} while (timer.notExpired());
return -1;
}
The LegacyKafkaConsumer does not wait for an absolute number of milliseconds for an operation in another thread to complete. Instead, it performs the NetworkClient polling and Timer expiration checking directly inline on the same thread. The end effect is that the LegacyKafkaConsumer is more lenient in how it interprets the timeout provided by the user.
When implementing the AsyncKafkaConsumer, we apply similar logic, but structure it like this:
private int foo(Timer timer) {
try {
FooEvent event = new FooEvent();
applicationEventQueue.add(event);
Future<Integer> future = event.future();
return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
return -1;
}
}
The call to add enqueues the network operation, but it then immediately invokes Future.get() with the timeout to implement a time-bounded blocking call. Since this method is called with a timeout value of 0, it immediately throws a TimeoutException.
The key design choice is that to emulate the existing behavior of the LegacyKafkaConsumer in the AsyncKafkaConsumer. But because we have our logic spread across two threads, we don't have the ability in the application thread to know when the event has had a chance to perform at least one NetworkClient polling cycle. That can only be determined by the network I/O thread, so it is now up to the background thread to correctly enforce the timeouts.
To keep the logic together for enforcing timeouts for events, we introduce a new class CompletableEventReaper, which has the following methods:
public class CompletableEventReaper {
public void add(CompletableEvent<?> event);
public void reapExpiredAndCompleted(long currentTimeMs);
public <T> void reapIncomplete(BlockingQueue<T> eventQueue);
}
The CompletableEventReaper is responsible for tracking any events that were processed, making sure to reap them if they complete normally or pass their deadline. This is done so that we enforce an upper bound on the amount of time the event logic will execute.
The reapExpiredAndCompleted "completes" any events that have either expired or completed normally. So this is a two-step process:
- For each event that is not complete, check to see if it has exceeded its deadline. If so, create an instance of
TimeoutExceptionand pass it toCompletableFuture.completeExceptionally(Throwable). - For each event which is already "done", remove from the list of tracked events.
This method should be called at regular intervals in the background thread. Here is ConsumerNetworkThread's main loop:
void runOnce() {
List<ApplicationEvent> events = . . .
for (ApplicationEvent event : events) {
if (event instanceof CompletableEvent) {
applicationEventReaper.add((CompletableEvent<?>) event);
}
}
// Existing processing logic omitted for brevity...
applicationEventReaper.reapExpiredAndCompleted(currentTimeMs);
}
Before we begin the main processing, we make sure to track applicable events. Then, at the end of the method, we invoke the reaper to "complete" any events that have expired. This cleanup step should only be called after the network I/O thread has made at least one call to NetworkClient.poll(). This is done to emulate the behavior of LegacyKafkaConsumer's handling of timeouts. LegacyKafkaConsumer makes at least one attempt to satisfy any network requests before checking if a timeout has expired.
Now, our application thread in AsyncKafkaConsumer looks like this:
private int foo(Timer timer) {
try {
FooEvent event = new FooEvent(timer);
applicationEventQueue.add(event);
Future<Integer> future = event.future();
return future.get();
} catch (TimeoutException e) {
return -1;
}
}
The change here is subtle—instead of invoking Future.get(long, TimeUnit), the code now calls Future.get(). The code in AsyncKafkaConsumer in the application thread doesn't attempt to enforce the timeout at all. Instead, it defers completely to the background thread. The call to Future.get() blocks for an indefinite length of time.
Risky Design!
Might we blocking the application thread forever by accident?
The application thread is completely reliant on the CompletableEventReaper in the background thread to complete the Future in a timely manner. If something happens to the background thread, or there's a subtle bug in the timeout enforcement logic, the user's application could hang forever.
Well, until it's restarted
Enforcing Timeouts for Network Requests
In practice, the user-provided timeouts are largely utilized in order to time-bound network I/O. The communication between the client and brokers over the network is going to constitute the bulk of the time for many operations. Allowing the user to provide an upper bound on the total time of these operations provides some protection against network issues.
For API calls that require network I/O, one or more RequestManager instances will issue network requests to the Kafka cluster. While network requests include their own notion of a timeout, that value is dictated at initialization time via the request.timeout.ms configuration. However, there is interplay between the API timeout and the network request timeout.
Let's look at a couple of examples to highlight the difference between these two timeouts.
In our first example, let's imagine that the user has invoked a Consumer API call with a very generous timeout:
In this example, to perform the API call, the RequestManager makes a network request to the Kafka cluster, noted as attempt #1 in the diagram. Unfortunately, attempt #1 fails to receive a response within the configured network request timeout. The RequestManager receives a callback to its response handler method and notes the failure. At this point, the RequestManager checks to see if there is enough time within the overall API timeout to retry the request. Because there is, the RequestManager submits a second network request, attempt #2. Unfortunately, this attempt also fails. Again, the RequestManager checks the remaining value for the API timeout, and it issues a request for attempt #3. Fortunately, the network issue has resolved itself and the third attempt succeeds attempt.
In our second example, let's imagine that the user has invoked the same Consumer API call, but with a much shorter timeout:
As in the first example, here the RequestManager makes a network request to the Kafka cluster, attempt #1. And again, attempt #1 fails to receive a response within the configured network request timeout. Upon noticing the failure, the RequestManager checks to see if there is enough time to retry the request. Because there is, the RequestManager submits attempt #2. As before, attempt #2 also fails, so the RequestManager checks the remaining value for the API timeout, and it issues a request for attempt #3. Notice, though that the request timeout for attempt #3 is shorter than the previous two attempts' timeouts. Why is that? As mentioned above, under normal conditions the network I/O timeout would be determined by the request.timeout.ms configuration. However, in order to ensure the client abides by the overall API timeout, the request timeout is reduced. When expressed in code, the logic must ensure this:
int requestTimeoutMs = Math.min(apiTimeoutRemainingMs, requestTimeoutMs);
Note that this retry functionality is implemented within each RequestManager as not all requests should be retried in all cases. That said, we should explore ways to enforce the timeout throughout most/all of the RequestManager implementations to improve consistency and correctness. There is now a new class TimedRequestState that is a subclass of RequestState that takes a Timer. The request managers that implement retries can now uniformly use that subclass to keep their expiration logic consistent.
Why Enforce Timeouts in Two Places?
A question about enforcement arose during discussions:
Should the timeout enforcement logic live in the request managers instead of in the event reaper and the request managers?
Given the current design of the AsyncKafkaConsumer, I would argue the answer is "no" for the following reasons:
- Maintaining loose coupling between the application thread and the network thread allows us to more easily reason on the system, eases testing, etc. A consequence of the separation is that events and network requests—while related—are separate entities, have separate lifecycles, and create separate
Futureinstances. In some cases, events may time out before the request manager is invoked. This means that it won't have a chance to handle its expiration logic.
- The current implementations of
RequestManagerare functionally distinct enough regarding handling of times, retries, etc. that expiration would be difficult to implement. - To stay consistent with the
LegacyKafkaConsumerimplementation, each network request uses therequest.timeout.msvalue from the user's configuration, regardless of how much time is left in the event's timeout. Because the expiration is only checked after the response is received, this could be well past the user's timeout.
Despite the foregoing, let's rephrase the question slightly:
Should the timeout enforcement logic live in _________ instead of in the event reaper and the request managers?
In that case, I would say, "yes!" It's best practice to implement cross-cutting concerns (like handling timeouts) in a single, unified chunk of code.
A Quick Note about Timer Reuse
When a user provides a timeout value to a Consumer API, a Timer object is immediately created to track the elapsed and remaining time for the operation. While a Duration object provides an immutable value of the overall timeout, the Timer is mutable. At certain points during processing, the Timer.update() API is invoked to determine the elapsed/remaining time for processing.
The logic in the Timer class does not in itself magically enforce any timeouts. The code that uses the Timer object must interact with it explicitly to update it (update()) and query it (remainingMs(), isExpired(), and notExpired()) to determine the remaining value of the timeout.
The Timer class is not designed to be thread-safe. Although it might be useful to reuse the same Timer object in both the application and network I/O threads, this is currently ill-advised due to the lack of thread safety. Instead of using a single Timer, we effectively need to maintain two instances—one for each thread—which is less than ideal
Specifying Timeouts for Events
For API methods that provide a timeout, a Timer will be created near the start of the method. Most API methods that take a timeout will end up enqueuing one or more CompletableApplicationEvent instances. The CompletableApplicationEvent class will now take a Timer instance in its constructor:
public abstract class CompletableApplicationEvent<T> extends ApplicationEvent implements CompletableEvent<T> {
private final CompletableFuture<T> future;
private final long deadlineMs;
protected CompletableApplicationEvent(Type type, Timer timer) {
super(type);
this.future = new CompletableFuture<>();
this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
}
public CompletableFuture<T> future() {
return future;
}
public long deadlineMs() {
return deadlineMs;
}
public T get() {
return ConsumerUtils.getResult(future);
}
// Rest of class...
}
The CompletableApplicationEvent does not use the Timer except in the constructor. This is done intentionally because we need to ensure that the Timer is not used by the network I/O thread. Instead, we calculate the deadline from the Timer, which is used to reconstruct the timeout.
Specifying Timeouts for Network Requests
In the ApplicationEventProcessor class, we will create a new utility method to create a Timer from the CompletableApplicationEvent:
private Timer eventTimer(final CompletableApplicationEvent<?> event) {
// Prevent the timer from being negative.
long diff = Math.max(0, event.deadlineMs() - time.milliseconds());
return time.timer(diff);
}
Note: we create a new Timer instance and we do not reuse the event's Timer. That is, we've intentionally designed the event to not include the Timer, just to ensure it isn't accidentally reused.
This new Timer should now be passed into a relevant RequestManager to indicate that any operations should be bound by the given Timer.
Conclusion
The proposed design swaps the AsyncKafkaConsumer's current check-then-attempt approach for the more relaxed attempt-then-check approach of the LegacyKafkaConsumer. As a result, we are able to maintain semantics and user expectations around timeouts. With this design, timeouts, expiration, requests, and related logic is more uniformly implemented. While this may not be the final design of timeout management for the AsyncKafkaConsumer, it's a necessary improvement.
The decision to split up the logic into a separate application thread and network thread introduced an inconsistency related to timeouts between AsyncKafkaConsumer and LegacyKafkaConsumer.
Hopefully this document can serve as a "living document" for the AsyncKafkaConsumer implementation for maintainers.
References
See the following Jiras that track this design document and related implementation:
- KAFKA-15848 - Getting issue details... STATUS
- KAFKA-15974 - Getting issue details... STATUS
- KAFKA-16200 - Getting issue details... STATUS