Overview
The WAN replication feature allows 2 remote data centers, or 2 availability zones, to maintain data consistency. In the case where one data center cannot process incoming events for any reason, the other data center should retain the failed events so that no data is lost. Currently if data center 1 (DC1) is able to connect to data center 2 (DC2) and send it events, those events are removed from the queue on DC1 when the ack from DC2 is received, regardless of what happens to them on DC2. This behavior is controlled by the internal system property REMOVE_FROM_QUEUE_ON_EXCEPTION which defaults to true. Most common exceptions thrown from a receiving site include:
- LowMemoryException - when one or more members is low on memory
- CacheWriterException - when a CacheWriter before* method throws an exception
- PartitionOfflineException - when all the members defining a persistent bucket are offline
- RegionDestroyedException - when the region doesn't exist
- Malformed data exception (unable to deserialize)
Goals
We will provide a mechanism for users to preserve events on the gateway sender that do not get successfully processed on the receiving data center. Our example implementation will store these events on disk at the sending data center and notify the user what events did not get transmitted.
- Deprecate (and later remove) the internal system property REMOVE_FROM_QUEUE_ON_EXCEPTION, but detect if it is set to false and support existing behavior (infinite retries)
- Create a new callback API that will be executed when an exception is returned with the acknowledgement from the receiver
- Provide an example implementation of the callback that saves events with exceptions returned from the receiver in a 'dead-letter' queue on the sender (on disk)
- Add a new property for the gateway sender to specify the path to the custom implementation of the callback.
- If no path is provided, use default, example implementation
- if property is not specified, revert to existing behavior (removing events from the queue when ack is received, ignoring batch exceptions)
- Add 2 new properties for the gateway receiver to control when to send the acknowledgement with the exceptions:
- the number of retries for events failing with an exception
- the wait time between retries
Not in Scope
- Providing the ability to directly replay events from the dead-letter queue.
Approach
Our current design approach is as follows:
- Deprecate existing internal boolean system property: REMOVE_FROM_QUEUE_ON_EXCEPTION
- Continue to support default behavior if boolean set to false by setting # retries on receiver to -1
Create new Java API
Define callback API for senders to set callback to dispatchers
If sender is configured with a callback, invoke the callback if batch exception occurs prior to batch removal
Implement a default callback API (see item 5 below)
Add properties on gateway receiver factory to specify # retries for a failed event and wait time between retries.
Modify Gfsh commands
Add option to gfsh ‘create gateway sender’ command to specify custom callback
Add options to gfsh ‘create gateway receiver’ command to set # retries and wait time between retries
Store new options in cluster config
Sender: callback implementation
Receiver: # of retries and wait time between retries
Add support in cache.xml for specifying new callback for gateway sender and setting new options for gateway receiver
Create example implementation of Sender callback that writes event(s) and associated exceptions to a file
Security features
Define privileges needed to deploy and configure sender callback
With security, callback should only write eventIds and exceptions, i.e. no entry values should be written to disk.
Add logging and statistics for callback
Log messages for gateway receiver for start time and results of retries
Add statistics and MBean for callbacks in-progress, completed, # and duration
New workflow for setting up WAN gateway using gfsh:
- Create gateway receiver including new options for specifying # of retries and wait time between retries
- Deploy jar on gateway sender(s) containing callback implementation
- Create gateway sender with option to add callback
New API + Changes to existing APIs
Java Definition
The new GatewayEventFailureListener interface is defined like:
public interface GatewayEventFailureListener extends CacheCallback { /** * Callback invoked on the GatewaySender when an event fails to be processed by the * GatewayReceiver * * @param event The event that failed * * @param exception The exception that occurred */ void onFailure(GatewayQueueEvent event, Throwable exception); }
Example:
public class LoggingGatewayEventFailureListener implements GatewayEventFailureListener, Declarable { private Cache cache; public void onFailure(GatewayQueueEvent event, Throwable exception) { this.cache.getLogger().warning("LoggingGatewayEventFailureListener onFailure: region=" + event.getRegion().getName() + "; operation=" + event.getOperation() + "; key=" + event.getKey() + "; value=" + event.getDeserializedValue() + "; exception=" + exception); } public void initialize(Cache cache, Properties properties) { this.cache = cache; } }
This LoggingGatewayEventFailureListener will log warnings like:
[warning 2018/11/05 17:30:41.613 PST ln-1 <AckReaderThread for : Event Processor for GatewaySender_ny_3> tid=0x75] LoggingGatewayEventFailureListener onFailure: region=data; operation=CREATE; key=8360; value=Trade[id=8360; cusip=PVTL; shares=100; price=18]; exception=org.apache.geode.cache.persistence.PartitionOfflineException: Region /data bucket 73 has persistent data that is no longer online stored at these locations: [...]
Java Configuration
GatewaySender
The GatewaySenderFactory adds the ability to add a GatewayEventFailureListener:
/** * Sets the provided <code>GatewayEventFailureListener</code> in this GatewaySenderFactory. * * @param listener The <code>GatewayEventFailureListener</code> */ GatewaySenderFactory setGatewayEventFailureListener(GatewayEventFailureListener listener);
The GatewaySender adds the ability to get a GatewayEventFailureListener:
/** * Returns this <code>GatewaySender's</code> <code>GatewayEventFailureListener</code>. * * @return this <code>GatewaySender's</code> <code>GatewayEventFailureListener</code> */ GatewayEventFailureListener getGatewayEventFailureListener();
Example:
GatewaySender sender = cache.createGatewaySenderFactory() .setParallel(true) .setGatewayEventFailureListener(new FileGatewayEventFailureListener(new File(...))) .create("ln", 2);
GatewayReceiver
The GatewayReceiverFactory adds the ability to set retry attempts and wait time between retry attempts:
/** * Sets the number of retry attempts to apply failing events from remote GatewaySenders * * @param retryAttempts The retry attempts */ GatewayReceiverFactory setRetryAttempts(int retryAttempts); /** * Sets the wait time between retry attempts to apply failing events from remote GatewaySenders * * @param waitTimeBetweenRetryAttempts The wait time in milliseconds */ GatewayReceiverFactory setWaitTimeBetweenRetryAttempts(long waitTimeBetweenRetryAttempts);
The GatewayReceiver adds the ability to get retry attempts and wait time between retry attempts:
/** * Returns the number of times to retry a failing event before throwing an exception. * * @return the number of times to retry a failing event before throwing an exception */ int getRetryAttempts(); /** * Returns the amount of time in milliseconds to wait between attempts to apply a failing event. * * @return the amount of time in milliseconds to wait between attempts to apply a failing event */ long getWaitTimeBetweenRetryAttempts();
Example:
GatewayReceiver receiver = cache.createGatewayReceiverFactory() .setRetryAttempts(10) .setWaitTimeBetweenRetryAttempts(100) .create();
Gfsh Configuration
gateway-sender
The create gateway-sender command defines this new parameter:
Name | Description |
---|---|
gateway-event-failure-listener | The fully qualified class name of GatewayEventFailureListener to be set in the GatewaySender |
Example:
Cluster-1 gfsh>create gateway-sender --id=ln --parallel=true --remote-distributed-system-id=2 --gateway-event-failure-listener=LoggingGatewayEventFailureListener Member | Status ------ | ------------------------------------ ny-1 | GatewaySender "ln" created on "ny-1"
gateway-receiver
The create gateway-receiver command defines these new parameters:
Name | Description |
---|---|
retry-attempts | The number of retry attempts for failed events processed by the GatewayReceiver |
wait-time-between-retry-attempts | The amount of time to wait between retry attempts for failed events processed by the GatewayReceiver |
Example:
Cluster-2 gfsh>create gateway-receiver --retry-attempts=10 --wait-time-between-retry-attempts=100 Member | Status | Message ------ | ------ | --------------------------------------------------------------------------- ln-1 | OK | GatewayReceiver created on member "ln-1" and will listen on the port "5296"
XML Configuration
gateway-sender
The <gateway-sender> element defines the <gateway-event-failure-listener> sub-element. The <gateway-event-failure-listener> sub-element is like any other Declarable.
Example:
<gateway-sender id="..."> <gateway-event-failure-listener> <class-name>FileGatewayEventFailureListener</class-name> </gateway-event-failure-listener> </gateway-sender>
gateway-receiver
The <gateway-receiver> element defines the retry-attempts and wait-time-between-retry-attempts attributes.
Example:
<gateway-receiver retry-attempts="5" wait-time-between-retry-attempts="100"/>
Risks and Unknowns
How to handle class not found exception for sender callback
- Default behavior when no callback is provided for sender? - Should be same as current behavior
- Backward compatibility behavior
- old sender connected to new receiver using new options
- new sender with callback implemented connected to old receiver
- Sort out security privileges needed for deploying vs installing with sender vs reading values for failed events written to disk.
Potential Future Enhancements
- Ability to modify batch removal to remove specific events from the batch
- Ability to resend events saved in dead-letter queue
14 Comments
John Blum
Couldn't we just define a general
GatewayEventListener
interface with methods likeonSuccess(..)
andonFailure(..)
, etc, rather than defining multiple interfaces for each possible condition?Barry Oglesby
Great idea. An API like below added to GatewayEventFilter simplifies this configuration a lot.
Udo Kohlmeyer
I don't agree with this proposal.
I see this problem to be more of a management problem. There should be a queue inspector, where the data has to be manually removed... This could be done in batch or single transactions, but this should not happen in an automagical process.
I think we have to take a step back and rethink this whole UC. I believe addressing the 1% case of blocked queues whilst waiting for human intervention is a far better solution than painting all problems the same color.
Diane Hardman
Udo Kohlmeyer I think we agree a lot more than you may think. The intent of this proposal is to address the current WAN gateway behavior which results in data inconsistency between 2 data centers. Today any exception returned with the ack from a gateway receiver is ignored and events are removed from the sender queue which introduces data loss/inconsistency between the sites. This new callback would allow a customer to write code to handle exceptions and preserve those data events, in whatever way they wish.
Perhaps you are recommending the default/example callback implementation do more than simply store the events on disk?
Or are you suggesting we should set the internal system property REMOVE_FROM_QUEUE_ON_EXCEPTION to false by default and allow the sender queue to fill up until a human notices and intervenes? I think that is unacceptable.
Udo Kohlmeyer
I think what I'm trying to say is that....
Every failures scenario is a valid scenario, where if the messages are sent to a dead-letter would potentially result in a data inconsistency.
I believe the solution should be:
None of this should be automagic... at least not initially.
Anthony Baker
WAN replication is already eventually consistent. Adding a DLQ does not change that. If you were an operator, would you rather recover data from a DLQ or deal with a multi-site outage?
The problem this proposal addresses is providing a "circuit-breaker" that can prevent an unhealthy downstream WAN site from causing upstream clusters to also become unhealthy.
I don't think relying on timely intervention from an operator is sufficient.
Udo Kohlmeyer
Correct. WAN is eventually consistent. It is also in order. As soon as you introduce DLQ the "in-order" capability is lost.
I don't understand how the upstream clusters are supposed to become unhealthy, could you explain?
In the current system, the batches are removed from the upstream cluster (as described above), so no data is backing up. Also, the WAN queues are designed to deal with the backing up of data, as downstream clusters could be offline. I have seen these queues grow to many millions deep, without any impact on the running cluster.
With the introduction of DLQ, the world becomes more complicated. e.g Given that Key:123 → Val:987 is moved to the DLQ. (for whatever reason). A new update is received and correctly processed, now Key:123 → Val: 456, but Key:123 → Val:987 is in the DLQ. Now the operator has to decide what the correct value of Key:123 is supposed to be. Now multiply this scenario by 1 mil. That would take a long time for anyone to triage. How does the operator decide what to do?
In the case of LowMemoryExceptions. If the "upstream" site pushes all messages to a DLQ until this problem is resolved, then what happens in the case when the LowMemory exception is resolved. Potentially thousands→millions of messages could be in the DLQ, which needs to be re-played. Also, what happens in the case where there might be some dependencies (colocated regions) of data. The might be some case when there is some data in the upstream DLQ and the data dependant on the data in the DLQ is on the downstream site.
I'm not opposed to the idea of the DLQ.. I just don't believe we've thought about or even covered many of the UCs that result from this. Having a blanket statement of "move messages to DLQ until exception resolved" is not sufficient.
Bruce J Schuchardt
It sounds like we already have consistency issues if the remove-from-queue system property isn't set to false.
As a user I think my next question would be "how I could get back to a consistent state"? If they stored the EventID and WAN VersionTag of the events in the DLQ we might be able to offer a way to load the DLQ events into the remote sites. This information could be given to the DLQ listener in serialized "metadata" form that they should persist with the rest of the event's data.
Diane Hardman
This has been a great discussion, so thank you!
I think Udo and Bruce have pointed out something we chose to avoid in our initial definition and scoping for this project, but now realize we cannot defer in the MVP definition: as a user, what can I do with events in the DLQ.
We had originally left this out of the scope of this project, but this discussion has made it clear we need some strategy to support users reviewing and processing (removing or replaying?) these events.
Ideas and suggestions welcome!
Juan Ramos
I like John Blum's suggestion: since we need to do all this work anyway, wouldn't be better to unify all
GatewayListeners
into just one and provide default empty methods so users don't need to implement any unnecessary logic?.Bruce J Schuchardt
How will this affect PDX-type propagation? This is currently done through propagation of modifications to the PDX type-registry region.
Barry Oglesby
This is a really good question. When I looked into it, I found some issues in the current processing. If processing of a PdxType on the receiver fails, the consequences are pretty bad.
In the normal case, new PdxTypes are added to the front of every batch and sent to the receiver where they are added to the registry. When the sender receives the ack back, it marks each PdxType as acked which causes it to not be sent again. Although, whenever a new connection is created, the PdxTypes are resent.
If an exception occurs whose cause is a PdxRegistryMismatchException (from PeerTypeRegistration.beforeUpdate), the sender's AckReaderThread shuts down effectively killing the sender so data is not processed on the receiver that can't be deserialized.
If an exception occurs whose cause is an IllegalStateException containing the string 'Unknown pdx type', the AckReaderThread logs the message as any other BatchException. Then AckReaderThread.logBatchExceptions sets isAcked=false, but then AckReaderThread.handleSuccessBatchAck sets it to true. So the PdxType is not sent again. Also, AckReaderThread.logBatchExceptions throws a java.lang.ArrayIndexOutOfBoundsException. Any data on the receiver that attempts to use that PdxType will fail to be deserialized.
If an exception occurs whose cause is something else, the same behavior as above occurs except AckReaderThread.logBatchExceptions doesn't set isAcked=false.
This proposal adds retries on the receiver before it gets into the states above.
Regardless of this proposal, we need to clean up the above behavior.
Mike Stolz
I think the best thing to do is to only protect the sending system in the event that it is running low on resources. In other words, we should not ever discard any event from an AsyncEventQueue or GatewayQueue unless and until keeping them is going to cause the sending cluster to become unhealthy.
We have some thresholds on queue management already. Maybe this should be some additional threshold that says "this sender is about to go into an unhealthy state due to receiver exceptions". Once that threshold is crossed, if there's an exception we could do some kind of dead-letter handling like the above.
Udo Kohlmeyer
I still believe that the discarding of events should be manual... Make it a conscious activity where they knowingly remove data.
The monitoring of the product should tell us when they queues have grown too large. Then something can be done about it. Also... Realistically how much data can we back up in the queues before they start impacting the health of the sending side?
The monitoring should have alerted to the unhealthy state LONG before that happens.
In addition to this, if there is a DLQ, and we overflow everything to disk, does someone monitor the disk usage, as it is also a well-known fact, that running out of disk space is almost worse than having to deal with GC and memory pressures.