Highly Available Client Event Mechanism
One of the key feature features of Geode is reliable asynchronous event notification. The publish/subscribe systems system offers a data distribution service where new events are published into the system and routed to all interested subscribers in a reliable manner.
...
The product achieves reliability and HA (high availability) of the subscription event using HARegionQueue.
HARegionQueue
It’s HARegionQueue is an implementation of a queue using a Geode region as the underlying data structure.
When a client connects to the server with subscription enabled flag, a an HARegionQueue for that client-proxy is created on the server side.
Class Diagrams
PlantUML |
---|
skinparam dpi 80
@startuml
hide empty members
title Server to Client Queues - Class Diagram
class CacheClientNotifier {
Singleton for a Cache.
}
class CacheClientProxy {
One for each client. |
...
Keeps track of a the subscriptions for the client.
}
class MessageDispatcher {
Thread that reads events from the queue.
}
class HARegionQueue {
Holds the list of events to |
...
dispatch to a single client. } class HAContainerMap { Holds the actual values for events to dispatch. Used to store only a single copy of each value across all queues. } interface HAContainerMap CacheClientNotifier "1 " o-- "*" CacheClientProxy CacheClientProxy o-- MessageDispatcher CacheClientNotifier o-- HAContainerMap MessageDispatcher o-- HARegionQueue HARegionQueue "*" --> "1" HAContainerMap HARegionQueue o-- HARegion @enduml |
This next diagram focuses on cardinality:
- between
HARegionQueue
andDACE
(DispatchedAndCurrentEvents
) objects (through the mis-namedHARegionQueue.eventsMap
) - between
HARegion
and the two kinds of entries it holds:<Position,Conflatable>
and<ThreadIdentifier,SequenceID>
In both cases, the Java Map
s are depicted as associations in the diagram (the Map
objects are not shown explicitly). So, for instance HARegionQueue.eventsMap
is a Map
. It's depicted as a 1-to-(0..1 per Position)
association from HARegionQueue
to Position
. Similarly for HARegion
to Conflatable
and HARegion
to SequenceID
.
The diagram introduces two types not actually present in the Java code. Both are depicted as subtypes of Long
: SequenceID
to stand for an event's sequence id; and Position
to stand for a position in a queue. In the source code these are both just Long
s.
PlantUML |
---|
@startuml
title HARegionQueue—Class Diagram\nHARegion and DACE Cardinalities
class SequenceID <<design class>>
Long <|-- SequenceID
class Position <<design class>>
Long <|-- Position
class HARegionQueue {
Position tailKey
}
class DispatchedAndCurrentEvents {
SequenceID lastDispatchedSequenceID
}
' these are together because I want to highlight the similarity (potential subtype relationship)
together {
' putting this first causes it to be layed out to the right of ThreadIdentifier, making the
' association (line) from Conflatable shorter
class EventID {
byte[] membershipId
long threadId
SequenceID sequenceId
int bucketId
}
class ThreadIdentifier {
byte[] membershipId
long threadId
}
}
interface Conflatable
' these subtypes of Conflatable are grouped together
together {
interface ClientMessage
class HAEventWrapper
class ConflatableObject
}
Conflatable <|-- HAEventWrapper
Conflatable <|-- ConflatableObject
Conflatable <|-- ClientMessage
interface ClientUpdateMessage
ClientMessage <|-- ClientUpdateMessage
ClientUpdateMessage <|-- ClientUpdateMessageImpl
ClientUpdateMessageImpl <|-- ClientInstantiatorMessage
ClientMessage <|-- ClientMarkerMessageImpl
Conflatable --> EventID
interface RegionQueue
RegionQueue <|-- HARegionQueue
HARegionQueue <|-- BlockingHARegionQueue
BlockingHARegionQueue <|-- DurableHARegionQueue
DistributedRegion <|-- HARegion
' while method signatures in HARegionQueue take Object, there's downcasting to Conflatable
HARegion "*" *-- "0..1 per Position" Conflatable
HARegion "*" *-- "0..1 per ThreadIdentifier" SequenceID
HARegionQueue *-- HARegion
HARegionQueue "1" *-- "0..1 per ThreadIdentifier" DispatchedAndCurrentEvents
DispatchedAndCurrentEvents "1" *-- "0..1 per Position" Position
' this hidden assoc changes layout such that cardinality text on SequenceID and Position isn't overlapping as much
ConflatableObject -[hidden]- HARegion
@enduml |
Sequence Diagrams
PlantUML |
---|
skinparam dpi 80
title Put into Queue (for Partitioned Regions)
actor user
user -> "PR Primary" : put
"PR Primary" -> "PR Secondary" : UpdateMessage
"PR Primary" -> "Adjunct Receiver": PutMessage
note right
The primary notifies all members that have
clients with interest
end note
"PR Primary" -> CacheClientNotifier: notifyClients()
"Adjunct Receiver" -> CacheClientNotifier: notifyClients()
note right
All members with interest
call notifyClients locally
end note
"PR Secondary" -> CacheClientNotifier: notifyClients()
CacheClientNotifier -> CacheClientProxy: deliverMessage()
CacheClientProxy -> MessageDispatcher : enqueueMessage()
MessageDispatcher -> HARegionQueue: basicPut()
|
...
Adding events to HARegionQueue:
...
Based on the interest satisfied by the client, the ClientMessage is created and added to the HARegionQueue of the interested client (“CacheClientProxy.enqueueMessage()”). Once the event is added to the HARegionQueue the cache operation thread returns to the caller.
There are cases where the events are not simply added to the HARegionQueue. Specifically, there currently are two cases where the event is put into a temporary queue.
1. The server is providing an initial image to a performed (see HARegionQueue.giiQueue)
2. A client is in the process of registering and its message dispatcher/queue are not fully created and initialized
After these operations are completed, the temporary queues are drained and the event is added to the HARegionQueue. The diagrams below show the different put paths in detail.
These diagrams show more detail around the special handling during client queue initialization or providing an initial image.
Delivering/Dispatching Events to Client:
...
The client also acknowledges the server periodically with the received event Id ID (ThreadId, sequence IdID); for removal of events from HARegionQueue.
Server:
The server processes the ack command from client and maintains it in the HARegionQueue.ackedEvents map.
...
The event from HARegionQueue is removed based on the client acknowledgement (“HAregionQueue.remove()”).
Reliable Event Delivery:
The reliable Reliable event delivery (in case of node failure) is achieved by with subscription redundancy level. Client can configure to have queues created on multiple server nodes by setting redundancy level.
With redundancy set; the HARegionQueues are created on multiple servers; one of the queue queues is treated as primary queue and others as secondary queues.
...
The events are dispatched to clients from the primary HARegionQueue, if . If the node hosting the primary queue goes down, one of the nodes hosting secondary queue queues becomes the primary and starts dispatching the events. During this scenario it could so happen that, duplicate events are could be sent to client. At the client side checks are made for duplicate events and which are ignored.
When the message is delivered and successfully removed from the primary HARegionQueue, the acknowledgement is sent to other servers hosting the queues (“QueueRemovalMessage”).
...
When the secondary queues are created; in order to be in sync with primary queue, they perform initial data replication from one of the existing queue queues (using backing region’s GII mechanism - “HARegionQueue.createHARegion()”).
During queue initialization, If there are any events satisfying the interest criteria ; those events are queued temporarily in “CacheClientProxy.queuedEvents”, once . Once the initialization is over, the events from the temporary queue is are added to HARegionQueue for delivery.
After the events are delivered and removed from the primary queue (node), it sends the “QueueRemovalMessage” to secondary queues (nodes); as part of message processing the each secondary queue removes events from its queue(“QueueRemovalMessage.process().removeDispatchedEvents()”.
The diagram below shows a simplified view of the queue removal logic.
Memory Optimization:
Based on number of client queues, and the rate at which events are delivered, the individual queues may consume lot of memory; in . In order to reduce the memory footprint the events are stored in a container (“HAContainer”) and all the client queues will refer to this instead of having their own copy (“HARegionQueue.putEntryConditionallyIntoHAContainer()”).
The reason for doing this is , that in most of the cases clients will have common interests , so having a single copy of the client message will reduce the memory significantly.
Once event is delivered to all the interested client; clients, the event is removed from HAContainer is removed (“HARegionQueue.decAndRemoveFromHAContainer()”).
Code path for adding event to HAContainer:
HARegionQueue.basicPut()->putObject()->putEventInHARegion()->putEntryConditionallyIntoHAContainer().
Event Conflation:
The clients can configure events to be conflated; in this case : the old events in the HARegionQueue will be destroyed and a new event is added to the tail of the Queue.
This is done at the end of “HARegionQueue.putObject()”.