Design of a new PrioritizationAwareBlockingQueue

We propose to change the requestQueue field in the RequestChannel class from ArrayBlockingQueue to a PrioritizationAwareBlockingQueue. To make this class more generic and have a better chance of being reused for other purposes in the future, we DO NOT limit the new class to support only 2 priorities.

More specifically, we think the new class needs to support the following features. In the examples below, a higher priority will be represented with a higher number, that is priority 1 is more urgent than priority 0.

  • One global capacity: Using ONE global capacity that limit the maximum number of items stored in different priorities. The reason we do not want to have separate capacities for items in different priorities is that separate capacities exposes more config options for operators, which maybe unnecessary and confusing.

  • Support filling the queue with items in any priority: To understand this requirement, consider an alternative design where the max number of items in every priority is capped at capacity/(number of priorities), which means if there are no slots for a given priority, e.g. priority 5, but plenty of slots available for other priorities, e.g. priorities 2, 3 and 6, enqueuing an item at the PrioritizationAwareBlockingQueue needs to blocked. We DO NOT want to put such limitations on this new class, meaning as long as there are empty slots anywhere in the PrioritizationAwareBlockingQueue, we should be able to enqueue an item with any priority.

  • Prioritization upon enqueuing: During enqueuing, if the number of items already reaches the capacity, i.e. there are no empty slots anywhere in the PrioritizationAwareBlockingQueue, the current thread needs to be blocked. If multiple threads are blocked trying to enqueue items with different priorities, the thread with the highest priority item would be waken up if a slot becomes available.

  • Prioritization upon dequeuing: During dequeuing, higher priority items are always dequeued before lower priority ones.

  • Returning the count of items in each priority level: This will be useful for monitoring the sub queue size of each priority level.

We propose to implement the new class PrioritizationAwareBlockingQueue using techniques shown in the following diagram:

  • Items with different priorities are stored in different sub-queues. Furthermore, non-empty sub queues are organized in a SubQueuePriorityQueue, so that items can be retrieved according to their priorities. Currently we do not support having multiple sub-queues with the same priority.

  • There is a global variable count representing the total number of items in all sub-queues.

  • For each sub queue, there is a NotFullCondition used for blocking and notification of a thread trying to enqueue an item with the corresponding priority. Furthermore, if a NotFullCondition has threads blocked on it, it would be inserted into a NotFullConditionsPriorityQueue. The purpose of using the priority queue is that when an empty slot becomes available, only the NotFullCondition with the highest priority is signaled. In the diagram above, we use gray circles to represent the NotFullCondititions that have threads blocked on them, and hence are maintained in the NotFullConditionsPriorityQueue.

  • There is one NotEmptyCondition shared by all the sub queues. For polling items out of the PrioritizationAwareBlockingQueue, the polling threads wait until the NotEmpty condition is satisfied, and then polls an item from the sub queue with the highest priority. If the sub queue becomes empty after polling, it will be removed from the SubQueuePriorityQueue. Next, the NotFullCondition with the highest priority in NotFullConditionsPriorityQueue will be signaled.

Using the PrioritizationAwareBlockingQueue

As mentioned above, in the RequestChannel class, we propose to change the type of the requestQueue field from ArrayBlockingQueue to PrioritizationAwareBlockingQueue.

And 3 types of requests, i.e. LeaderAndIsr requests, UpdateMetadata requests, and StopReplica requests, from the controller are enqueued with a higher priority, i.e. priority 1, while all the other types of requests are enqueued with lower priority, i.e. priority 0.

  • No labels