Highly Available Client Event Mechanism
One of the key feature features of Geode is reliable asynchronous event notification. The publish/subscribe systems system offers a data distribution service where new events are published into the system and routed to all interested subscribers in a reliable manner.
...
The product achieves reliability and HA (high availability) of the subscription event using HARegionQueue.
HARegionQueue
It’s HARegionQueue is an implementation of a queue using a Geode region as the underlying data structure.
When a client connects to the server with subscription enabled flag, a an HARegionQueue for that client-proxy is created on the server side.
Class Diagrams
PlantUML |
---|
skinparam dpi 80
@startuml
hide empty members
title Server to Client Queues - Class Diagram
class CacheClientNotifier {
Singleton for a Cache.
}
class CacheClientProxy {
One for each client. |
...
Keeps track of a the subscriptions for the client.
}
class MessageDispatcher {
Thread that reads events from the queue.
}
class HARegionQueue {
Holds the list of events to |
...
dispatch to a single client. } class HAContainerMap { Holds the actual values for events to dispatch. Used to store only a single copy of each value across all queues. } interface HAContainerMap CacheClientNotifier "1 " o-- "*" CacheClientProxy CacheClientProxy o-- MessageDispatcher CacheClientNotifier o-- HAContainerMap MessageDispatcher o-- HARegionQueue HARegionQueue "*" --> "1" HAContainerMap HARegionQueue o-- HARegion @enduml |
This next diagram focuses on cardinality:
- between
HARegionQueue
andDACE
(DispatchedAndCurrentEvents
) objects (through the mis-namedHARegionQueue.eventsMap
) - between
HARegion
and the two kinds of entries it holds:<Position,Conflatable>
and<ThreadIdentifier,SequenceID>
In both cases, the Java Map
s are depicted as associations in the diagram (the Map
objects are not shown explicitly). So, for instance HARegionQueue.eventsMap
is a Map
. It's depicted as a 1-to-(0..1 per Position)
association from HARegionQueue
to Position
. Similarly for HARegion
to Conflatable
and HARegion
to SequenceID
.
The diagram introduces two types not actually present in the Java code. Both are depicted as subtypes of Long
: SequenceID
to stand for an event's sequence id; and Position
to stand for a position in a queue. In the source code these are both just Long
s.
PlantUML |
---|
@startuml title HARegionQueue—Class Diagram\nHARegion and DACE Cardinalities class SequenceID <<design class>> Long <|-- SequenceID class Position <<design class>> Long <|-- Position class HARegionQueue { Position tailKey } class DispatchedAndCurrentEvents { SequenceID lastDispatchedSequenceID } ' these are together because I want to highlight the similarity (potential subtype relationship) together { ' putting this first causes it to be layed out to the right of ThreadIdentifier, making the ' association (line) from Conflatable shorter class EventID { byte[] membershipId long threadId SequenceID sequenceId int bucketId } class ThreadIdentifier { byte[] membershipId long threadId } } interface Conflatable ' these subtypes of Conflatable are grouped together together { interface ClientMessage class HAEventWrapper class ConflatableObject } Conflatable <|-- HAEventWrapper Conflatable <|-- ConflatableObject Conflatable <|-- ClientMessage interface ClientUpdateMessage ClientMessage <|-- ClientUpdateMessage ClientUpdateMessage <|-- ClientUpdateMessageImpl ClientUpdateMessageImpl <|-- ClientInstantiatorMessage ClientMessage <|-- ClientMarkerMessageImpl Conflatable --> EventID interface RegionQueue RegionQueue <|-- HARegionQueue HARegionQueue <|-- BlockingHARegionQueue BlockingHARegionQueue <|-- DurableHARegionQueue DistributedRegion <|-- HARegion ' while method signatures in HARegionQueue take Object, there's downcasting to Conflatable HARegion "*" *-- "0..1 per Position" Conflatable HARegion "*" *-- "0..1 per ThreadIdentifier" SequenceID HARegionQueue *-- HARegion HARegionQueue "1" *-- "0..1 per ThreadIdentifier" DispatchedAndCurrentEvents DispatchedAndCurrentEvents "1" *-- "0..1 per Position" Position ' this hidden assoc changes layout such that cardinality text on SequenceID and Position isn't overlapping as much ConflatableObject -[hidden]- HARegion @enduml |
Sequence Diagrams
PlantUML |
---|
skinparam dpi 80
title Put into Queue (for Partitioned Regions)
actor user
user -> "PR Primary" : put
"PR Primary" -> "PR Secondary" : UpdateMessage
"PR Primary" -> "Adjunct Receiver": PutMessage
note right
The primary notifies all members that have
clients with interest
end note
"PR Primary" -> CacheClientNotifier: notifyClients()
"Adjunct Receiver" -> CacheClientNotifier: notifyClients()
note right
All members with interest
call notifyClients locally
end note
"PR Secondary" -> CacheClientNotifier: notifyClients()
CacheClientNotifier -> CacheClientProxy: deliverMessage()
CacheClientProxy -> MessageDispatcher : enqueueMessage()
MessageDispatcher -> HARegionQueue: basicPut()
|
...
Adding events to HARegionQueue:
...
Based on the interest satisfied by the client, the ClientMessage is created and added to the HARegionQueue of the interested client (“CacheClientProxy.enqueueMessage()”). Once the event is added to the HARegionQueue the cache operation thread returns to the caller.
There are cases where the events are not simply added to the HARegionQueue. Specifically, there currently are two cases where the event is put into a temporary queue.
1. The server is providing an initial image to a performed (see HARegionQueue.giiQueue)
2. A client is in the process of registering and its message dispatcher/queue are not fully created and initialized
After these operations are completed, the temporary queues are drained and the event is added to the HARegionQueue. The diagrams below show the different put paths in detail.
These diagrams show more detail around the special handling during client queue initialization or providing an initial image.
Delivering/Dispatching Events to Client:
...
The client also acknowledges the server periodically with the received event Id ID (ThreadId, sequence IdID); for removal of events from HARegionQueue.
Server:
The server processes the ack command from client and maintains it in the HARegionQueue.ackedEvents map.
...
The event from HARegionQueue is removed based on the client acknowledgement (“HAregionQueue.remove()”).
Reliable Event Delivery:
The reliable Reliable event delivery (in case of node failure) is achieved by with subscription redundancy level. Client can configure to have queues created on multiple server nodes by setting redundancy level.
With redundancy set; the HARegionQueues are created on multiple servers; one of the queue queues is treated as primary queue and others as secondary queues.
...
The events are dispatched to clients from the primary HARegionQueue, if . If the node hosting the primary queue goes down, one of the nodes hosting secondary queue queues becomes the primary and starts dispatching the events. During this scenario it could so happen that, duplicate events are could be sent to client. At the client side checks are made for duplicate events and which are ignored.
When the message is delivered and successfully removed from the primary HARegionQueue, the acknowledgement is sent to other servers hosting the queues (“QueueRemovalMessage”).
...
When the secondary queues are created; in order to be in sync with primary queue, they perform initial data replication from one of the existing queue queues (using backing region’s GII mechanism - “HARegionQueue.createHARegion()”).
During queue initialization, If there are any events satisfying the interest criteria ; those events are queued temporarily in “CacheClientProxy.queuedEvents”, once . Once the initialization is over, the events from the temporary queue is are added to HARegionQueue for delivery.
After the events are delivered and removed from the primary queue (node), it sends the “QueueRemovalMessage” to secondary queues (nodes); as part of message processing the each secondary queue removes events from its queue(“QueueRemovalMessage.process().removeDispatchedEvents()”.
The diagram below shows a simplified view of the queue removal logic.
Memory Optimization:
Based on number of client queues, and the rate at which events are delivered, the individual queues may consume lot of memory; in . In order to reduce the memory footprint the events are stored in a container (“HAContainer”) and all the client queues will refer to this instead of having their own copy (“HARegionQueue.putEntryConditionallyIntoHAContainer()”).
The reason for doing this is , that in most of the cases clients will have common interests , so having a single copy of the client message will reduce the memory significantly.
Once event is delivered to all the interested client; clients, the event is removed from HAContainer is removed (“HARegionQueue.decAndRemoveFromHAContainer()”).
Code path for adding event to HAContainer:
HARegionQueue.basicPut()->putObject()->putEventInHARegion()->putEntryConditionallyIntoHAContainer().
Event Conflation:
The clients can configure events to be conflated; in this case : the old events in the HARegionQueue will be destroyed and a new event is added to the tail of the Queue.
This is done at the end of “HARegionQueue.putObject()”.