Highly Available Client Event Mechanism
One of the key features of Geode is reliable asynchronous event notification. The publish/subscribe system offers a data distribution service where new events are published into the system and routed to all interested subscribers in a reliable manner.
The clients can subscribe interests using keys or CQ.
The product achieves reliability and HA (high availability) of the subscription event using HARegionQueue.
HARegionQueue
HARegionQueue is an implementation of a queue using a Geode region as the underlying data structure.
When a client connects to the server with subscription enabled flag, an HARegionQueue for that client-proxy is created on the server side.
Class Diagrams
This next diagram focuses on cardinality:
- between
HARegionQueue
andDACE
(DispatchedAndCurrentEvents
) objects (through the mis-namedHARegionQueue.eventsMap
) - between
HARegion
and the two kinds of entries it holds:<Position,Conflatable>
, and<ThreadIdentifier,SequenceID>
The diagram introduces two types not actually present in the Java code. Both are depicted as subtypes of Long
: SequenceID
to stand for an event's sequence id; and Position
to stand for a position in a queue. In the source code these are both just Long
s.
Sequence Diagrams
Adding events to HARegionQueue:
When an operation is performed on the cache, the “LocalRegion.notifyBridgeClients()” is called to deliver events to the interested clients. The list of interested clients are obtained by calling “generateLocalFilterRouting()” and events are queued through “CacheClientNotifier.notifyClients()”.
Based on the interest satisfied by the client, the ClientMessage is created and added to the HARegionQueue of the interested client (“CacheClientProxy.enqueueMessage()”). Once the event is added to the HARegionQueue the cache operation thread returns to the caller.
There are cases where the events are not simply added to the HARegionQueue. Specifically, there currently are two cases where the event is put into a temporary queue.
1. The server is providing an initial image to a performed (see HARegionQueue.giiQueue)
2. The server is initializing the message dispatcher as part of cliesnt queue initialization logic (see CacheClientProxy.queuedEvents)
After these operations are completed, the temporary queues are drained and the event is added to the HARegionQueue. The diagrams below show the different put paths in detail.
These diagrams show more detail around the special handling during client queue initialization or providing an initial image.
Delivering/Dispatching Events to Client:
The client events are delivered asynchronously to the clients using MessageDispatcher thread ("CacheClientProxy.MessageDispatcher"). The dispatcher thread peeks event from the HARegionQueue and delivers it to the client.
Ack from Client:
Client:
Upon receiving the event, the client updates its local cache or invokes the CqListeners (“CacheClientUpdater.processMessages()”).
The client also acknowledges the server periodically with the received event ID (ThreadId, sequence ID); for removal of events from HARegionQueue.
Server:
The server processes the ack command from client and maintains it in the HARegionQueue.ackedEvents map.
Removing Events:
The event from HARegionQueue is removed based on the client acknowledgement (“HAregionQueue.remove()”).
Reliable Event Delivery:
Reliable event delivery (in case of node failure) is achieved with subscription redundancy level. Client can configure to have queues created on multiple server nodes by setting redundancy level.
With redundancy set; the HARegionQueues are created on multiple servers; one of the queues is treated as primary queue and others as secondary queues.
When the cache operation is performed, it is distributed to other nodes; based on the interest subscription the events are added to primary and secondary HARegionQueues (as explained above). The HARegionQueue’s by themselves don’t replicate the events between them.
The events are dispatched to clients from the primary HARegionQueue. If the node hosting the primary queue goes down, one of the nodes hosting secondary queues becomes the primary and starts dispatching the events. During this scenario duplicate events could be sent to client. At the client side checks are made for duplicate events which are ignored.
When the message is delivered and successfully removed from the primary HARegionQueue, the acknowledgement is sent to other servers hosting the queues (“QueueRemovalMessage”).
Creating Secondary HAQueues:
When the secondary queues are created; in order to be in sync with primary queue, they perform initial data replication from one of the existing queues (using backing region’s GII mechanism - “HARegionQueue.createHARegion()”).
During queue initialization, any events satisfying the interest criteria are queued temporarily in “CacheClientProxy.queuedEvents”. Once the initialization is over, the events from the temporary queue are added to HARegionQueue for delivery.
After the events are delivered and removed from the primary queue (node), it sends the “QueueRemovalMessage” to secondary queues (nodes); as part of message processing each secondary queue removes events from its queue(“QueueRemovalMessage.process().removeDispatchedEvents()”.
The diagram below shows a simplified view of the queue removal logic.
Memory Optimization:
Based on number of client queues, and the rate at which events are delivered, the individual queues may consume lot of memory. In order to reduce the memory footprint the events are stored in a container (“HAContainer”) and all the client queues will refer to this instead of having their own copy (“HARegionQueue.putEntryConditionallyIntoHAContainer()”).
The reason for doing this is that in most of the cases clients will have common interests so having a single copy of the client message will reduce the memory significantly.
Once event is delivered to all the interested clients, the event is removed from HAContainer (“HARegionQueue.decAndRemoveFromHAContainer()”).
Code path for adding event to HAContainer:
HARegionQueue.basicPut()->putObject()->putEventInHARegion()->putEntryConditionallyIntoHAContainer()
Event Conflation:
The clients can configure events to be conflated: the old events in the HARegionQueue will be destroyed and a new event is added to the tail of the Queue.
This is done at the end of “HARegionQueue.putObject()”.