Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Purpose

The purpose of this article is to explain some details about how the Geode WAN Geode Asynchronous Event Distribution component works under the hood. The goal is to provide some UML diagrams and explanations to improve the knowledge of the reader, along with the main classes and interactions involved in the process to better understand and troubleshoot issues.

Prerequisites

Introduction

It's recommended to have the source code at hand while reading the article, this will allow the reader to check the actual implementation whenever needed. The reader might also want to have a look at WAN Gateway first, it contains the basic diagram and flows about the Geode WAN architecture, the current article goes deeper in detail, though.

The core algorithms and default implementations for both remote asynchronous event distribution and local asynchronous event distribution are shared, the main difference resides within the approach used to distribute the events and remove them from the internal structure after a successful dispatch. For this reason, and to avoid confusions, from now on the name used will be gateway-senders, no matter whether the discussion is about asynchronous event distribution within the local cluster (async-event-queue) or asynchronous event distribution to remote clusters (WAN replication).

Main Classes

The most important classes, along with a UML class diagram with the relevant methods, are listed below; it's important to note that there are two possible implementations for almost all included classes: parallel and serial.

...

GatewayReceiverCommand: class responsible of applying the events received through WAN replication from batches sent by the gateway-senders.

Shadow Region Queues

Every time an association is made between a region and a gateway-sender, a shadow region is created on the member hosting the original region. This region acts as the internal queue-like structure used to store, retrieve and delete the events to distribute. The type of this internal region depends on the distribution strategy configured: the region will be created as REPLICATED when using serial distribution (SerialGatewaySenderQueue), and it will be created as PARTITIONED when using parallel distribution (ParallelGatewaySenderQueue).

For the sake of simplicity and to avoid mixing/confusing terms, from now on the name used to refer to this shadow region or internal queue will be shadow-region-queue.

Adding Events to the Shadow Region Queue

Serial Gateway Sender

There can only be one serial gateway-sender actively updating the shadow-region-queue within a cluster during a given time, and it's known as the primary; the rest of the serial gateway-senders configured are also alive and running but they don't actively consume the shadow-region-queue at all, they are known as backups or secondaries.

...

For more details about how the actual logic to avoid any inconsistencies is implemented, the reader can have a look at the SerialGatewaySenderEventProcessor class, specially the methods handleSecondaryEvent (invoked by the secondary gateway-sender whenever the enqueueEvent is called) and handlePrimaryEvent (invoked on the secondary gateway-sender through the SerialSecondaryGatewayListener whenever the replication kicks in after a wrapped-event is added to the shadow-region-queue by the primary gateway-sender).

Parallel Gateway Sender

The parallel gateway-sender is deployed to multiple Geode members: each member hosting primary buckets for a partitioned region will be actively distributing events. This basically means that, at any given time, there can be multiple active gateway-senders consuming and updating the shadow-region-queue within the cluster but only for the primary buckets hosted locally, there's no guarantee about the ordering of the events for operations executed on different buckets.

...

  1. The original event is fired on the primary bucket for a given region hosted by a certain member where there's a gateway-sender running.
  2. The internal replication kicks in and the entry is added to the redundant buckets for the original partitioned region, where there's also a secondary/backup gateway-sender running.
  3. The wrapped-event is added to the redundant bucket for the shadow-region-queue. Note the difference with the default behavior here: the shadow-region-queue is a PARTITIONED REGION, but Geode adds the wrapped-event to the redundant bucket first.
  4. Replication finishes.
  5. The wrapped-event is added to the primary bucket within the shadow-region-queue on the member where the primary gateway-sender is running.

Processing the Shadow Region Queue

Within this section the focus will be on the the interactions occurring within the system to add, process and distribute the events from the shadow-region-queue. Even when the core implementation is shared by WAN Replication and Asynchronous Event Distribution, a sequence diagram will be shown for each scenario to better understand how the process works under the hood.

Local Event Distribution (async-event-queue)

The batches are dispatched to the configured AsyncEventListeners implementations asynchronously after the changes have been applied to the original region, in general this distribution strategy is used as a write-behind cache event handler to synchronize the Geode region with an external data source.

...

Keep in mind that the RegionQueue follows, at some extent, the regular Queue Data Type Semantics, so Geode can only remove the head of the queue, meaning that it can only dequeue or peek the first N element(s). Long story short: the shadow-region-queue doesn't have a real notion of what's stored in it and can't access random entries, it just can enqueue/dequeue a certain amount of elements, instructed by the batch-size and identified by the batchId.

Remote Event Replication (WAN)

The batches are dispatched to remote Geode clusters asynchronously after the changes have been applied to the original region, this distribution strategy is used to scale horizontally between disparate, loosely-coupled distributed systems over a WAN.

...

The above behavior has changed recently and it works like this by design (check GEODE-3730 for details), the new retry-process to make sure Geode doesn't lose events is executed on the receiver side within the GatewayReceiverCommand class, and only for certain exceptions.

Removing Events from the Shadow Region Queue

To remove processed events from the shadow-region-queue the GatewaySenderEventProcessor just executes the RegionQueue.remove() method but, even when the backing structure itself is implemented through an actual Geode region, the internal behavior differs from the default one where the destroy operation is automatically distributed to the rest of the members. The goal of this section is to explore how the removal of events is done for the actual available implementations: Serial and Parallel.

Serial Gateway Sender (SerialGatewaySenderQueue)

By default the underlying REPLICATED region is created using the DISTRIBUTED_ACK scope, meaning that any distributed operation will wait for the remote acknowledgment before returning to the caller, this value can be manually changed through the gemfire.gateway-queue-no-ack system property. The rest of the attributes (persistence, overflow, eviction, etc.) are configured based on the properties set for the actual gateway-sender.

...

The distributed destroy is executed by the BatchRemovalThread instead, which creates an instance of the class BatchDestroyOperation that basically handles the distribution for destroying a batch of entries in the shadow-region-queue. This internal message has a key that represents the lastDestroyedKey and a tailKey that represents the lastDispatchedKey, both fields are used by every member receiving the message to locally destroy the entries between key and tailKey from its copy of the shadow-region-queue.

Parallel Gateway Sender (ParallelGatewaySenderQueue)

The underlying PARTITIONED region will be co-located with the original region to which the gateway-sender is attached, and its attributes (number of buckets, redundant copies, maximum local memory, startup recovery delay, persistence, etc.) will be configured using the values obtained from the original region.

...