In 0.8.1.1 with in-built offset management, server side architecture with the life cycle of the produce/fetch request is summarized below (caller --> callee):
As we can see from above, since delayed produce needs to access KafkaApis.maybeUnblockDelayedFetchRequests(), etc, and delayed fetch needs to fetch the data to form the response. As a result, we ended up keeping the appending and fetching logic inside KafkaAPIs and also keeping purgatories/delayed requests inside Kafka APIs to let them access these functions/variables. The problems for this are:
1) Logic of the append message / read message from Replica Manager leaks into KafkaAPIs, and KafkaAPIs itself becomes very huge containing all purgatory / delayed requests classes.
2) However, logic for satisfying delayed fetch requests are not correct: it needs to be related to HW modifications. Hence it needs to also access Partition, which will lead to more logic leak if we follow current architecture.
3) With inbuild offset management, we have to hack the KafkaAPIs and its corresponding delayed requests as follows:
The architecture diagram is shown below:
Is to refactor the Kafka Apis along with Replica Manager and Offset Manager (Coordinator) such that the produce/fetch purgatories are moved to replica manager, and are isolated from requests (i.e. they may be just purgatories for append and fetch operations). By doing so:
1) Kafka API becomes thinner, only handling request-level application logic and talk to Request Channel.
2) Read message and append message logic is moved to Replica Manager, which handles the logic of "committed" appending and fetching "committed data".
3) Offset Manager (Coordinator) only needs to talk to the Replica Manager for handling offset commits, no need to hack Kafka Apis and Delayed Fetch requests.
This refactoring will also benefit the following new consumer / coordinator development.
Purgatories and Delayed Operations
We refactor the purgatories and delayed requests (now should be called operations) as follows. Here all the module names remain the same, with renaming suggestion in the bracket just for clear indication.
Delayed Request (Kafka Operation)
The base kafka operation just contains:
One note here is that a new callback instance needs to be created for each operation, since it will need to remember the request object that it needs to respond on. The base callback class can be extended, with the basic parameters:
Besides these fields, a kafka operation also have the following interface:
Delayed Produce Request (Message Append Operation)
Maintains the append metadata, including the replicate condition:
In addition, it implements the interface as:
Delayed Fetch Request (Message Fetch Operation)
Maintains the fetch metadata, including the fetch min bytes:
And implements the interface as:
Request Purgatory (Kafka Purgatory)
The base purgatory provides the following APIs:
And its expiry reaper will purge the watch list and for each expired operation trigger operation.expire(). This purgatory is generic and can be actually used for different kinds of operations.
Kafka Server Modules
With these purgatories and operations, server side modules can be refactored as follows:
Replica Manager maintain metadata of partitions, including their local and remote replicas, and talks to Log Manager for log operations such as log truncation, etc. Here is the proposed API:
Coordinator / Offset Manager
Coordinator's offset manager will talk to the replica manager for appending messages.
Now the Kafka APIs becomes:
Request Handling Workflow
With the above refactoring, the request life cycle becomes:
As we can see, with fetch request and offset commit request, a nested callback is used (RepondCallback from Kafka APIs for sending the response through channel, and FetchCallback / OffsetCommitCallback for fetching the data for response / putting offset into cache). This nesting is not ideal, but necessary if want to have the strict layered architecture.
The new architectural diagram will be: