Purpose

The purpose of this article is to explain some details about how the Geode Asynchronous Event Distribution component works under the hood. The goal is to provide some UML diagrams and explanations to improve the knowledge of the reader, along with the main classes and interactions involved in the process to better understand and troubleshoot issues.

Prerequisites

Introduction

It's recommended to have the source code at hand while reading the article, this will allow the reader to check the actual implementation whenever needed. The reader might also want to have a look at WAN Gateway first, it contains the basic diagram and flows about the Geode WAN architecture, the current article goes deeper in detail, though.

The core algorithms and default implementations for both remote asynchronous event distribution and local asynchronous event distribution are shared, the main difference resides within the approach used to distribute the events and remove them from the internal structure after a successful dispatch. For this reason, and to avoid confusions, from now on the name used will be gateway-senders, no matter whether the discussion is about asynchronous event distribution within the local cluster (async-event-queue) or asynchronous event distribution to remote clusters (WAN replication).

Main Classes

The most important classes, along with a UML class diagram with the relevant methods, are listed below; it's important to note that there are two possible implementations for almost all included classes: parallel and serial.

GatewaySender: primary interface used by all gateway-sender implementations.

AbstractGatewaySender: abstract class, contains the basic implementation for both Serial and Parallel senders. The most relevant implementations are SerialGatewaySenderImplSerialAsyncEventQueueImpl, ParallelGatewaySenderImpl and ParallelAsyncEventQueueImpl.

AbstractGatewaySenderEventProcessor: enqueues the events within the internal queue (backed up by an actual Geode region) and process the queue. The class itself extends the java Thread class, so it should be always running within the JVM. There's one instance per dispatcher thread. The main implementations are ParallelGatewaySenderEventProcessor and SerialGatewaySenderEventProcessor.

RegionQueue: primary interface used to describe the semantics of a queue-like structure backed up by actual Geode regions. There is one instance per AbstractGatewaySenderEventProcessor. The main implementations are ParallelGatewaySenderQueue and SerialGatewaySenderQueue.

GatewaySenderEventDispatcher: primary interface to dispatch actual batches of events. There is one instance per AbstractGatewaySenderEventProcessor. The main implementations are GatewaySenderEventRemoteDispatcher (dispatches events through the WAN to another cluster) and GatewaySenderEventCallbackDispatcher (dispatches events locally to the configured AsyncEventListeners).

AckReaderThread: inner class declared entirely within the GatewaySenderEventRemoteDispatcher, its purpose is to receive acks from the gateway receivers from the remote cluster for received batches and delete these already distributed batches from the local region queue.

GatewayReceiver: primary interface used by all gateway receivers implementations.

GatewayReceiverCommand: class responsible of applying the events received through WAN replication from batches sent by the gateway-senders.

Shadow Region Queues

Every time an association is made between a region and a gateway-sender, a shadow region is created on the member hosting the original region. This region acts as the internal queue-like structure used to store, retrieve and delete the events to distribute. The type of this internal region depends on the distribution strategy configured: the region will be created as REPLICATED when using serial distribution (SerialGatewaySenderQueue), and it will be created as PARTITIONED when using parallel distribution (ParallelGatewaySenderQueue).

For the sake of simplicity and to avoid mixing/confusing terms, from now on the name used to refer to this shadow region or internal queue will be shadow-region-queue.

Adding Events to the Shadow Region Queue

Serial Gateway Sender

There can only be one serial gateway-sender actively updating the shadow-region-queue within a cluster during a given time, and it's known as the primary; the rest of the serial gateway-senders configured are also alive and running but they don't actively consume the shadow-region-queue at all, they are known as backups or secondaries.

The process to determine which gateway-sender becomes the primary is not orchestrated by a central entity, on the other hand, every single one has a dedicated thread that keeps trying to acquire a distributed lock, the one that gets it will become the primary (the first to come up generally) and the rest will just keep waiting and trying to acquire this lock forever.

Considering that the shadow-region-queue for a serial gateway-sender is created as a REPLICATED REGION, there are no primary nor secondary buckets, the data is just replicated to all members where the region is hosted. Moreover, and no matter the type of the region to which the gateway-sender is attached, the actual update on the original region doesn't need to be executed on the same member where the primary gateway-sender is running, so Geode needs to handle these situations differently and guarantee that no events are lost. To guarantee this statement, the SerialGatewayEventProcessor running as secondary maintains a map of unprocessedEvents to track the wrapped-events that have been received by the secondary but not yet by the primary. The SerialSecondaryGatewayListener, running only on the secondary gateway-senders and attached to the shadow-region-queue, is responsible for removing the wrapped-events from this map and keep it in synchronization whenever the primary gateway-sender receives and/or removes the wrapped-event from the actual shadow-region-queue.

The diagrams below show the flow of steps for a event fired on a region (the actual type doesn't matter), numbered in sequence to better understand the whole process; keep in mind that the entire flow is processed synchronously with the original event fired on the region except for the listener itself, its execution is delegated to another thread.

  1. The original event is fired on a region hosted by a certain member, where the secondary gateway-sender is running.
  2. The gateway-sender, acting as secondary, adds the wrapped-event to the unprocessedEvents map within the SerialGatewaySenderEventProcessor.
  3. The replication takes place for the original event, until it reaches the member on which the primary gateway-sender is running.
  4. The wrapped-event is added to the shadow-region-queue by the primary SerialGatewaySenderEventProcessor.
  5. Replication for the shadow-region-queue, backed up by a REPLICATED region, kicks in and the wrapper-event is distributed across the members hosting the secondary gateway-senders.
  6. The SerialSecondaryGatewayListener is invoked on all members hosting the secondary gateway-senders.
  7. The SerialSecondaryGatewayListener instructs the SerialGatewaySenderEventProcessor to remove the wrapped-event from the unprocessedEvents map.

  1. The original event is fired on a region hosted by a certain member, where the primary gateway-sender is running.
  2. The wrapped-event is added to the shadow-region-queue by the primary SerialGatewaySenderEventProcessor.
  3. Replication for the shadow-region-queue, backed up by a REPLICATED region, kicks in and the wrapper-event is distributed across the members hosting the secondary gateway-senders.
  4. The SerialSecondaryGatewayListener is invoked on all members hosting the secondary gateway-senders.
  5. The wrapped-event is added to the map of unprocessedEvents by the SerialSecondaryGatewayListener.
  6. The replication takes place for the original event, until it reaches the member on which the secondary gateway-sender is running.
  7. The SerialGatewaySenderEventProcessor, running as secondary, will detect that the wrapped-event has been aded to the shadow-region-queue by the primary gateway-sender already, so it will remove the wrapped-event from the unprocessedEvents map.

Due to the nature of these distributed operations, it's easy to visualize some race conditions within the above flows related to the time at which the wrapped-events are added to the unprocessedEvents map by the secondary gateway-sender and the time at which the wrapped-events are actually processed by the primary gateway-sender. A race condition becomes a bug when the interleaving of actions does not happen in the order the designer intended, which is certainly not the case here: the SerialGatewaySenderEventProcessor class has all the checks in place to avoid any unexpected outcome and event-loss.

For more details about how the actual logic to avoid any inconsistencies is implemented, the reader can have a look at the SerialGatewaySenderEventProcessor class, specially the methods handleSecondaryEvent (invoked by the secondary gateway-sender whenever the enqueueEvent is called) and handlePrimaryEvent (invoked on the secondary gateway-sender through the SerialSecondaryGatewayListener whenever the replication kicks in after a wrapped-event is added to the shadow-region-queue by the primary gateway-sender).

Parallel Gateway Sender

The parallel gateway-sender is deployed to multiple Geode members: each member hosting primary buckets for a partitioned region will be actively distributing events. This basically means that, at any given time, there can be multiple active gateway-senders consuming and updating the shadow-region-queue within the cluster but only for the primary buckets hosted locally, there's no guarantee about the ordering of the events for operations executed on different buckets.

Considering that the shadow-region-queue for a parallel gateway-sender is created as a PARTITIONED REGION, and to guarantee no data-loss, it's worth noticing that the wrapped-events are added first to the redundant/secondary bucket and then to the primary one; this behavior changed recently, previously Geode used to write the wrapped-events to the primary bucket first, as it does for any other regular PARTITIONED REGION. The reader can have a look at the BucketRegion class, specially the method virtualPut, to get a better understanding about how this logic is implemented.

The diagram below shows the flow of steps for a event fired on a region, numbered in sequence to better understand the whole process; keep in mind that the entire flow is processed synchronously with the original event fired on the region.

  1. The original event is fired on the primary bucket for a given region hosted by a certain member where there's a gateway-sender running.
  2. The internal replication kicks in and the entry is added to the redundant buckets for the original partitioned region, where there's also a secondary/backup gateway-sender running.
  3. The wrapped-event is added to the redundant bucket for the shadow-region-queue. Note the difference with the default behavior here: the shadow-region-queue is a PARTITIONED REGION, but Geode adds the wrapped-event to the redundant bucket first.
  4. Replication finishes.
  5. The wrapped-event is added to the primary bucket within the shadow-region-queue on the member where the primary gateway-sender is running.

Processing the Shadow Region Queue

Within this section the focus will be on the the interactions occurring within the system to add, process and distribute the events from the shadow-region-queue. Even when the core implementation is shared by WAN Replication and Asynchronous Event Distribution, a sequence diagram will be shown for each scenario to better understand how the process works under the hood.

Local Event Distribution (async-event-queue)

The batches are dispatched to the configured AsyncEventListeners implementations asynchronously after the changes have been applied to the original region, in general this distribution strategy is used as a write-behind cache event handler to synchronize the Geode region with an external data source.

The diagram below shows the regular interactions between the intervening parts.

It's worth noting that the events are not removed from the queue if the dispatch fails, which means that Geode will basically try to dispatch the same batch over and over again until it succeeds. The implementation of the remove method, on the other hand, is delegated to the actual class implementing the RegionQueue interface and its type depends on the distribution strategy used (Parallel or Serial).

Keep in mind that the RegionQueue follows, at some extent, the regular Queue Data Type Semantics, so Geode can only remove the head of the queue, meaning that it can only dequeue or peek the first N element(s). Long story short: the shadow-region-queue doesn't have a real notion of what's stored in it and can't access random entries, it just can enqueue/dequeue a certain amount of elements, instructed by the batch-size and identified by the batchId.

Remote Event Replication (WAN)

The batches are dispatched to remote Geode clusters asynchronously after the changes have been applied to the original region, this distribution strategy is used to scale horizontally between disparate, loosely-coupled distributed systems over a WAN.

The diagram below shows the regular interactions between the intervening parts.

The most important fact to notice here is that it doesn't matter if the ack contains exceptions or not, nor if the user has set the GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION property to false, the gateway-sender will always remove the events from its queue once the ack for the batch has been received. If no ack is received, for whatever reason, then the gateway-sender will keep trying indefinitely to dispatch the batch.

The above behavior has changed recently and it works like this by design (check GEODE-3730 for details), the new retry-process to make sure Geode doesn't lose events is executed on the receiver side within the GatewayReceiverCommand class, and only for certain exceptions.

Removing Events from the Shadow Region Queue

To remove processed events from the shadow-region-queue the GatewaySenderEventProcessor just executes the RegionQueue.remove() method but, even when the backing structure itself is implemented through an actual Geode region, the internal behavior differs from the default one where the destroy operation is automatically distributed to the rest of the members. The goal of this section is to explore how the removal of events is done for the actual available implementations: Serial and Parallel.

Serial Gateway Sender (SerialGatewaySenderQueue)

By default the underlying REPLICATED region is created using the DISTRIBUTED_ACK scope, meaning that any distributed operation will wait for the remote acknowledgment before returning to the caller, this value can be manually changed through the gemfire.gateway-queue-no-ack system property. The rest of the attributes (persistence, overflow, eviction, etc.) are configured based on the properties set for the actual gateway-sender.

The diagrams below show the regular interactions between the intervening parts for an execution of the SerialGatewaySenderQueue.remove() method on the primary gateway-sender, and how the destroy operation is distributed to the secondary gateway-senders. For the sake of simplicity, either way, only the main interactions will be shown, putting them all in one single diagram might turn it unreadable and hard to understand (specially those used for the internal communication process between the members).

As shown in the figure, the remove call executes the Region.localDestroy() method on the primary gateway-sender, which only destroys the wrapped-event entry locally without applying the regular distribution to delete the entry from the rest of the copies, hosted by other members.

The distributed destroy is executed by the BatchRemovalThread instead, which creates an instance of the class BatchDestroyOperation that basically handles the distribution for destroying a batch of entries in the shadow-region-queue. This internal message has a key that represents the lastDestroyedKey and a tailKey that represents the lastDispatchedKey, both fields are used by every member receiving the message to locally destroy the entries between key and tailKey from its copy of the shadow-region-queue.

Parallel Gateway Sender (ParallelGatewaySenderQueue)

The underlying PARTITIONED region will be co-located with the original region to which the gateway-sender is attached, and its attributes (number of buckets, redundant copies, maximum local memory, startup recovery delay, persistence, etc.) will be configured using the values obtained from the original region.

The diagrams below shows the regular interactions between the intervening parts for an execution of the ParallelGatewaySenderQueue.remove() method. The removal of an event from the ParallelGatewaySenderQueue is way more complex that its counterpart for the SerialGatewaySenderQueue, so two different diagrams will be presented for easier readability: the first one for the member hosting the primary bucket for the shadow-region-queue where the wrapped-event to be removed is located, and the second one for a member hosting a redundant copy of the primary bucket. For the sake of simplicity, however, only the main interactions will be shown, putting them all in the diagrams might turn them unreadable and hard to understand (specially those used for the internal communication process between the members).

As shown in the figure, the remove() call ends up executing the AbstractRegionMap.destroy() method, which calls the LocalRegion.basicDestroyBeforeRemoval() to execute all the required logic before actually removing the entry from the internal map. The default behavior for regular PARTITIONED REGIONS is defined within the BucketRegion class, and the implementation basically ensures that the entry is fully removed from the redundant copies before proceeding to delete the local entry. This behavior, however, is overridden within the AbstractBucketRegionQueue class to actually do nothing in terms of distribution, it just logs a message (only visible when log-level=debug) stating that the distribution of the destroy operation will be executed later by the BatchRemovalThread

The distributed destroy, as explained before, is executed by the BatchRemovalThread instead, which creates an instance of the class ParallelQueueRemovalMessage that basically handles the distribution for destroying a batch of entries in the shadow-region-queue. This internal message has a Map containing the list of dispatched keys per bucket per shadow-region-queue (more than one region can be associated to a gateway-sender) that must be removed, and its used by every receiving member to destroy (locally) the wrapped-event from the redundant local bucket.