DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
Note that this retry functionality is implemented within each RequestManager as not all requests should be retried in all cases. That said, we should explore ways to enforce the timeout throughout most/all of the RequestManager implementations to improve consistency and correctness. One suggestion is to add an optional Timer to the RequestState class and provide logic therein so that the timeout is enforced automatically by default, or overridden/substituted if an alternate approach is needed.
A Quick Note about Timer Reuse
When a user provides a timeout value to a Consumer API, a Timer object is immediately created to track the elapsed and remaining time for the operation. While a Duration object provides an immutable value of the overall timeout, the Timer is mutable. At certain points during processing, the Timer.update() API is invoked to determine the elapsed/remaining time for processing.
The logic in the Timer class does not in itself magically enforce any timeouts. The code that uses the Timer object must interact with it explicitly to update it (update()) and query it (remainingMs(), isExpired(), and notExpired()) to determine the remaining value of the timeout.
The Timer class is not designed to be thread-safe. Although it might be useful to reuse the same Timer object in both the application and network I/O threads, this is currently ill-advised due to the lack of thread safety. Instead of using a single Timer, we effectively need to maintain two instances—one for each thread—which is less than ideal
Specifying Timeouts for Events
For API methods that provide a timeout, a Timer will be created near the start of the method. Most API methods that take a timeout will end up enqueuing one or more CompletableApplicationEvent instances. The CompletableApplicationEvent class will now take a Timer instance in its constructor:
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
public abstract class CompletableApplicationEvent<T> extends ApplicationEvent implements CompletableEvent<T> {
private final CompletableFuture<T> future;
private final long deadlineMs;
protected CompletableApplicationEvent(Type type, Timer timer) {
super(type);
this.future = new CompletableFuture<>();
this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
}
public CompletableFuture<T> future() {
return future;
}
public long deadlineMs() {
return deadlineMs;
}
public T get() {
return ConsumerUtils.getResult(future);
}
// Rest of class...
}
|
The CompletableApplicationEvent does not use the Timer except in the constructor. This is done intentionally because we need to ensure that the Timer is not used by the network I/O thread. Instead, we calculate the deadline from the Timer, which is used to reconstruct the timeout.
Specifying Timeouts for Network Requests
In the ApplicationEventProcessor class, we will create a new utility method to create a Timer from the CompletableApplicationEvent:
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
private Timer eventTimer(final CompletableApplicationEvent<?> event) {
// Prevent the timer from being negative.
long diff = Math.max(0, event.deadlineMs() - time.milliseconds());
return time.timer(diff);
} |
Note: we create a new Timer instance and we do not reuse the event's Timer. That is, we've intentionally designed the event to not include the Timer, just to ensure it isn't accidentally reused.
This new Timer should now be passed into a relevant RequestManager to indicate that any operations should be bound by the given Timer.
Conclusion
The decision to split up the logic into a separate application thread and network thread introduced an inconsistency related to timeouts between AsyncKafkaConsumer and LegacyKafkaConsumer.
A negative consequence of this split is that the logic for timeouts is now spread across multiple areas of the code:
Consumerimplementation- Network thread polling loop
RequestManagerimplementations
Why Enforce Timeouts in Two Places?
A question about enforcement arose during discussions:
Should the timeout enforcement logic live in the request managers instead of in the event reaper and the request managers?
As proposed, I would argue the answer is "no" for the following reasons:
- Maintaining loose coupling between the application thread and the network thread is preferred (reasoning on the system, testing, etc.). A consequence of this is that events and network requests—while related—are separate entities and have separate lifecycles; they create separate
Futureinstances. Passing the event or the event'sFuturedirectly to the request manager would muddy that separation. Events may time out before the request manager is invoked. This means that it won't have a chance to handle its expiration logic.
- The current implementations of
RequestManagerare functionally distinct enough regarding request management that expiration would be difficult to implement solely at that layer. - To keep with the current consumer, each network request uses the
request.timeout.msvalue from the user's configuration, regardless of how much time is left in the event's timeout. Because the expiration is only checked after the response is received, this could be well past the user's timeout. Note the pathological case: the default value ofrequest.timeout.msis 30 seconds and the defaultConsumer.close()method has a timeout of 0. It's technically possible for the request to take the full 30 seconds before timing out, calling the response handler, and then checking the expiration.
Despite the foregoing, let's rephrase the question slightly:
Should the timeout enforcement logic live in X, instead of in the event reaper and the request managers?
In that case, I would say, "yes!" It's usually not the best practice to have cross-cutting concerns like handling timeouts implemented in multiple places.
There is now a new class TimedRequestState that is a subclass of RequestState that takes a Timer. The request managers that implement retries can now uniformly use that subclass to keep their expiration logic consistent.
Why Enforce Timeouts in Two Places?
A question about enforcement arose during discussions:
Should the timeout enforcement logic live in the request managers instead of in the event reaper and the request managers?
Given the current design of the AsyncKafkaConsumer, I would argue the answer is "no" for the following reasons:
- Maintaining loose coupling between the application thread and the network thread allows us to more easily reason on the system, eases testing, etc. A consequence of the separation is that events and network requests—while related—are separate entities, have separate lifecycles, and create separate
Futureinstances. In some cases, events may time out before the request manager is invoked. This means that it won't have a chance to handle its expiration logic.
- The current implementations of
RequestManagerare functionally distinct enough regarding handling of times, retries, etc. that expiration would be difficult to implement. - To stay consistent with the
LegacyKafkaConsumerimplementation, each network request uses therequest.timeout.msvalue from the user's configuration, regardless of how much time is left in the event's timeout. Because the expiration is only checked after the response is received, this could be well past the user's timeout.
Despite the foregoing, let's rephrase the question slightly:
Should the timeout enforcement logic live in _________ instead of in the event reaper and the request managers?
In that case, I would say, "yes!" It's best practice to implement cross-cutting concerns (like handling timeouts) in a single, unified chunk of code.
A Quick Note about Timer Reuse
When a user provides a timeout value to a Consumer API, a Timer object is immediately created to track the elapsed and remaining time for the operation. While a Duration object provides an immutable value of the overall timeout, the Timer is mutable. At certain points during processing, the Timer.update() API is invoked to determine the elapsed/remaining time for processing.
The logic in the Timer class does not in itself magically enforce any timeouts. The code that uses the Timer object must interact with it explicitly to update it (update()) and query it (remainingMs(), isExpired(), and notExpired()) to determine the remaining value of the timeout.
The Timer class is not designed to be thread-safe. Although it might be useful to reuse the same Timer object in both the application and network I/O threads, this is currently ill-advised due to the lack of thread safety. Instead of using a single Timer, we effectively need to maintain two instances—one for each thread—which is less than ideal
Specifying Timeouts for Events
For API methods that provide a timeout, a Timer will be created near the start of the method. Most API methods that take a timeout will end up enqueuing one or more CompletableApplicationEvent instances. The CompletableApplicationEvent class will now take a Timer instance in its constructor:
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
public abstract class CompletableApplicationEvent<T> extends ApplicationEvent implements CompletableEvent<T> {
private final CompletableFuture<T> future;
private final long deadlineMs;
protected CompletableApplicationEvent(Type type, Timer timer) {
super(type);
this.future = new CompletableFuture<>();
this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
}
public CompletableFuture<T> future() {
return future;
}
public long deadlineMs() {
return deadlineMs;
}
public T get() {
return ConsumerUtils.getResult(future);
}
// Rest of class...
}
|
The CompletableApplicationEvent does not use the Timer except in the constructor. This is done intentionally because we need to ensure that the Timer is not used by the network I/O thread. Instead, we calculate the deadline from the Timer, which is used to reconstruct the timeout.
Specifying Timeouts for Network Requests
In the ApplicationEventProcessor class, we will create a new utility method to create a Timer from the CompletableApplicationEvent:
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
private Timer eventTimer(final CompletableApplicationEvent<?> event) {
// Prevent the timer from being negative.
long diff = Math.max(0, event.deadlineMs() - time.milliseconds());
return time.timer(diff);
} |
Note: we create a new Timer instance and we do not reuse the event's Timer. That is, we've intentionally designed the event to not include the Timer, just to ensure it isn't accidentally reused.
This new Timer should now be passed into a relevant RequestManager to indicate that any operations should be bound by the given Timer.
Conclusion
The proposed design swaps the AsyncKafkaConsumer's current check-then-attempt approach for the more relaxed attempt-then-check approach of the LegacyKafkaConsumer. As a result, we are able to maintain semantics and user expectations around timeouts. With this design, timeouts, expiration, requests, and related logic is more uniformly implemented. While this may not be the final design of timeout management for the AsyncKafkaConsumer, it's a necessary improvement.
The decision to split up the logic into a separate application thread and network thread introduced an inconsistency related to timeouts between AsyncKafkaConsumer and LegacyKafkaConsumerTaking perspective of the original issue, we're addressing the problem where calls to the Consumer API calls are too strict regarding timeouts with the new consumer vs. the existing consumer. I posit the design as stated is not perfect, but it is an improvement. positive movement in that direction. The proposed design allows events to process at least once before being expired. The expiration logic in request managers that allow retries has been refactored to be more uniform. This may not be the final design of the new consumer, but it's a step in the right direction.
Hopefully this document can serve as a "living document" for the AsyncKafkaConsumer implementation for maintainers.
...