API and user requirement analysis
The chart above analyzes the new consumer API requirements by looking at the requirements that user might have for each request. We are trying to use this chart to exhaustively list
- All the functions consumer can possibly achieve.
- All the potential user requirements
The goal is to have a clean and intuitive APIs design with good reasoning for each user requirement (each grid in the above chart).
Threading Model and Sync/Async Semantics
Currently new consumer follows a single threaded model. While it provides simplicity by saving some synchronization efforts, it is also enforces user to care about the things that they don't need to care about. Several examples are:
- When user try to commit offsets asynchronously, without calling a poll(), offset won't be committed.
- If user does not call poll() frequently enough, broker will consider the consumer dead. If user set session timeout to be larger, the failure detection will be also longer.
- callbacks will not fire.
We fixed some problems in KAFKA-2123, which introduced a task queue and will temporarily reuse the caller thread for an "execution thread" to run against the queue. But it does not solve the fundamental issue which is user have to essentially provide a dedicated thread keep calling poll even if they actually don't want to consume data. So although new consumer is claimed to be single threaded, it is very likely most user have to write their own wrapper with multiple threads.
Async semantic also becomes confusing with the single-threaded model. We are now enforcing user to do something after they fire an async call, which seems to defeat one important purpose of async - user lose the freedom of doing something else after fire an async call. From user point of view, it is as if they are still blocked on that call because they have to keep calling poll().
The main benefit of single threaded model is that we can detect client failure when they stop consuming data. We want to change the threading model a bit so it can still have this benefit but solve the issues we mentioned above.
The major gaining of using a single-threaded model is to associate the consumer liveliness with the actual data fetching. The downside of this issue is that the current threading model is sort of "hijacking" the user thread to act as an execution thread when user thread calls poll().
We want to propose a threading model very similar to what we are using in new producer which has been proven to be welcomed by users. At the same time, we will keep the benefit of single-threaded model with little efforts.
The major changes in this threading model are:
- Asynchronous calls will follow the convention and easy to implement.
- poll() becomes very intuitive - meaning user wants some data.
To solve the liveliness association with data fetching. We can let sender thread detect how long has it been since the user thread last called poll() - generating a DataFetchTask. If user thread hasn't been fetching for session.timeout, the sender thread can choose to stop heartbeat. This feature can be a boolean config to turn on/off, e.g. liveliness.detection.enabled. If it is turned on, the liveliness definition will be the same as current new consumer. If it is turned off, the liveliness definition will be the same as old high level consumer.
Sync or Async, that's a question....
Personally, I think a call should be sync if
- User expects information back, or
- subsequent action depends on the function call.
Otherwise a method can be async.
The assumption is that if user called a method and try to get some information back, they will use that information for further actions. For methods that might have both use cases, we provide both sync/async interface, e.g. commit().
The proposed API follows this reasoning, but I'm not sure if they are correctly defined. So we can discuss about that if there are further concerns.
With the above threading model, implementation of sync or async will be clean.
Throwing Exceptions Or Not?
In current implementation, consumer will try to handle exceptions if possible. For example, if user try to subscribe to a non-existing topic, the consumer will just wait until the topic to be existed. However, after talking to several users, they actually want to know if a topic does not exist. In that case, throwing exception might be better than handle that for user. Because if user wants to ignore it, they can always catch exception and retry, whereas if we handle it for user, some user who cares about non-existing topic might not know they subscribed to a wrong topic. The above interface followed this reasoning. However, arguably with some checking interface, it might be reasonable to say, if user cares about if a topic exists or not, they can always call listTopics() before subscribing. So I am also not sure if we should throw exceptions in that case or not.