PartitionedRegion Clear()
To be Reviewed By:Dec 20,2019
Authors: Xiaojian Zhou
Status: Draft | Discussion | Development | Active | Dropped | Superseded
Superseded by: N/A
Related: N/A
Problem
To support clear() operation on Partitioned Region (PR - partitioned region will be referred as PR in parts of this doc).
Clear() operation: Removes all entries from this region. Clear will be distributed to other caches if the scope is not Scope.LOCAL (from Region.clear java doc)
Currently non-partitioned regions support clear() operation, supporting this capability on PR enables consistent API support and user experience across the cache regions.
The clear() operation will be supported from client-server and peer-to-peer topology.
Considerations/Challenges in supporting partitioned region clear():
- A PR is comprised of number of distributed bucket regions with 0 or more secondary(redundant) copies; and clear has to be performed on both primary and secondary copies, keeping the data consistent between them.
- During the operation the bucket region could be moved with rebalance operation or from members joining/leaving the cluster OR its state could be changed (primary/secondary).
- Updating/clearing OQL indexes (which are synchronously and asynchronously maintained).
- Clearing persistent/overflowed data and managing data consistency across primary and secondary copies and disk stores which could be offline.
- Handling clear during region initialization operation - from both initial image provider and requester point of view
- Handling concurrent clear and cache operations (put/destroy) - in synchronization with primary and secondary copies.
- Notifying client subscribed to the PR on clear event, and keeping it consistent with server side data.
- Managing Transactional operations with and during clear.
- Updating/clearing Lucene indexes which are asynchronously maintained via AEQ. We rely on GEODE-9133 to allow a regionEvent to be sent to gateway sender. Before that, PR clear will throw UnsupportedException.
Anti-Goals
- Sending clear event for WAN replication.As this has implication on entire region data on remote cluster; this is kept out of the scope of this work; in-line with WAN GII, Region destroy operations.
Solution
When clear() is executed from client or peer member, one of the members is selected as clear message co-ordinator (likely where the command is originated/received). The co-ordinator member propagates the clear message to all the members hosting that region. And the co-ordinator member will become responsible for handling success or failure condition while processing local or distributed clear message.
Summary
At a high level the steps involved in initiating and completing clear operation:
- The co-ordinator member takes distributed-lock at PR level making sure only one clear() operation on that region is in progress.
- Send clear message to other members hosting the PR primary buckets. Re-send/retry clear message if the primary bucket region is moved. If the primary bucket region is not moved, lock it from moving.
- At primary bucket, do local clear and distribute to secondary buckets with region version (RVV) info.
- The RVV info is used to allow, reject concurrent cache operation (operation in-flight during clear), thus keeping primary and secondary bucket data consistency.
- Persist clear event (with version info) for persistent regions.
After Successful clear() operation on the PR:
- Update OQL indexes
- Notify clients at PR level (with RVV?)
- Return to caller
- Before GEODE-9133 is implemented, clear on a PR with lucene index will throw UnsupportedException.
Scenarios/options considered in supporting clear operation:
At communication level:
1) Co-ordinator member sends a single PR clear message to each member hosting the region (data store)
2) Co-ordinator member sending PR clear message for each primary bucket (similar to region.removeAll() operation)
At data store level:
3) Renaming of the PR - rename the PR to a temp PR and perform data removal in the background thread (if needed).
4) Recreate the PRs low level map - clear/remove data from the old map in back ground thread (if needed).
Considering the pros and cons (explained below) of each options, the option-1 and option-2 was picked as probable choices and finally option-2 was chosen as a viable option.
Option-1: Co-ordinator member sends a single PR clear message to each member hosting the region (data store)
In this case the co-ordinator member will send a single clear region message to all other members hosting the region; the receiving member will process this message serially or in-parallel using thread/executor pool. The co-ordinator member gets the primary bucket info, sends the clear messages to remote member with primary bucket list. The co-ordinator member has to obtain and manage the primary bucket list, in order to address any bucket region movement during the clear (due to destroy or rebalance) and retry the clear operation. This is similar to how the query engine executes queries on remote bucket region;
Pros:
Less number of messages communicated between the members (compared to option-2).
Cons/issues:
Processing clear on the receiver with single thread may be slower; potential for dead lock with Transaction.
Processing clear with multiple threads introduces thread management, error handling complexities; potential for dead lock with transaction.
Potential dead lock with TX:
The member contains primary bucket1 and bucket2. Both clear and TX uses RvvLock at bucket level, if clear thread is processing bucket1, then bucket2, and TX thread is doing operation on bucket2 first and then on bucket1 they will end up with dead lock.
Option-2: Co-ordinator member sending clear message for each primary bucket
In this case the co-ordinator member will send a separate clear region message to every primary bucket. And handles success and failure (retry) based on the response to those messages. This is similar to how the region.removeAll() works.
Pros:
Message is handled per bucket level, which allows to handle failure conditions easily.
Don't have to wait for response for all buckets from a single member (failure/retry can be performed sooner).
Most of the removeAll logic related to messaging, failure handling, event routing and client messaging can be leveraged. Which is well tested.
No dead lock scenarios between concurrent clear and transaction operation.
Cons:
Message communication could be large based on number of buckets. But the pay-load (message size) will be small with clear (as there is no data sent as part of this).
Based on the pros and cons between option-1 and option-2; option 2 is the recommended solution to implement.
Note: The complexity of option-3 and option-4 ruled them out as a solution at this time, a brief explanation of the options are included the end of this doc.
Flow Chart - Sending clear message at bucket level
Implementation Details
(addressing the consideration/challenges listed above)
- The member after receiving the clear request:
Acquires the distributed lock; and elects himself as the co-ordinator member. This prevents multiple clear() ops concurrently getting executed.
Gets the primary bucket list. Sends clear message to the primary buckets (members).
The primary buckets upon receiving the message (processing), take a distributed lock to prevent losing primary bucket. Then takes RVV.lockForClear (a local write lock to prevent on-going operations, GII. transactions).
Upon completion of clear on primary bucket, sends the clear message to secondary buckets (members).
When secondary bucket receives RVV, it will wait/check for local RVV to dominate the received RVV, which makes sure the concurrent cache operations are applied/rejected accordingly.
NOTE:
The cache operations are synchronized under the RVV lock; for non off-heap region the clear will call the map.clear() which should have minimal or no impact on performance. For off-heap region map.clear() will iterate over the entries to clear the off-heap entries; this could have impact on cache operation performance. This will be documented. And In future option of iterating over region entires could be done in background thread.
- Handling Transaction (TX) during clear
As the clear and transactions are handled at bucket region level. They will operate by taking rvvLock.
If TX gets lock first, clear thread will wait until TX finishes and releases the rvvLock.
If clear gets the rvvLock first, TX will fail and rollback.
- Updating OQL Index
The index are managed both synchronously and asynchronously. The clear will update both synchronous and asynchronous indexes under lock, by clearing both index data structures and the queues used for asynchronous maintenance.
- Disk Region clear version tag is written to oplog for each bucket region.
- CacheListener.afterRegionClear(), CacheWriter.beforeRegionClear() will be invoked at PR level.
- Update to client queue i.e. notify client
The subscription clients will be notified with clear region event (with version info?) at PR level.
- Off-heap entries: Reuse current non-partitioned regions clear logic to release off-heap entries in an iteration.
- GII: Reuse current non-partitioned regions clear's logic to compete for rvvLock with clear.
- Region Destroy
The clear will throw RegionDestroyedException at bucket region level. The coordinator should collect the exception and throw this exception to caller.
- LRU: Clear will remove all the expiration tasks at bucket.
- PartitionOfflineException If some buckets’ clear failed with PartitionOfflineException, the PR.clear should return the caller with PartialResultException. Let user to decide if to call clear again.
- Updating Lucene Index
As part of clear(), the Lucene indexes need to be recreated via a RegionEvent passing through AEQ. Before GEODE-9133 supported RegionEvent on AEQ, we will throw UnsupportedException for the time being.
Test Cases to consider
- TX in PR
- Test TX with one or more primary buckets in the same or different member, when clear is on-going
- Retry at Accessor (a member is killed, need to find new primary target)
- Retry at client (coordinator is killed)
- PR ends with PartialResult, due to PartitionOfflineException
- When PR.clear is on-going, restart some members.
- When PR.clear is on-going, rebalance to move primary.
- Benchmark test to see how long is used in local clear vs distribution when there’s off-heap setting.
- Region destroy (bucket or the whole PR) when clear is on-going
- When PR.clear is on-going, try to create OQL index on value/key, or try to create Lucene Index, or try to alter region
Changes and Additions to Public Interfaces
Currently PartitionRegion.Clear() throws UnsupportedOperationException. It will be updated to perform clear operation.
Performance Impact
Cache operation on off-heap region during clear could be impacted.
Backwards Compatibility and Upgrade Path
Clear will throw exception, if there is any older version member running in the cluster.
Assumptions
The data consistency is not satisfied If region does not enable concurrency check (there will be no rvv and rvvLock). This is also current behavior with non partitioned region clear().
Prior Art
Besides option-1 and option-2 following options are considered. These are considered keeping the performance impact with data iteration during clear.
Option-3: Rename the PartitionedRegion
This is considered keeping the performance impact with data iteration during clear. The idea is to rename the PR under clear with temp name and create a new PR with old PRs name.
Option-4: Recreate the low level region map
The idea is to destroy/rename the low level bucket region maps and attach a new map.
FAQ
Answers to questions you’ve commonly been asked after requesting comments for this proposal.
Errata
What are minor adjustments that had to be made to the proposal since it was approved?
Change to the initial proposal (above) on clear implementation:
During implementation we came across design limitations/challenges in preserving cache (entry level) operation ordering when there are concurrent entry operations in progress during clear, with client notification and cache listener invocation. Initially the thought process was to adopt approaches similar to existing non-replicated clear and partitioned region destroy operation. As we started understanding these code paths in depth, we realized the same approach can not be adopted for partitioned clear operation. In case of region destroy, concurrent operations are handled by raising region destroy exceptions.
Based on our new understanding, to satisfy the goal of “Notifying clients subscribed to the PR on clear events, and keeping it consistent with server side data”, we are planning to take/adopt “Solution 1” detailed in the following section.
Partitioned Region Clear operation messaging requirement
With the clear operation following callback/event notification needs to be handled:
- Cache Writer invocation (before cache op) - On any one node hosting the PR.
- Cache Listener invocation (after cache op) - On all the nodes hosting the PR and with listeners.
- Client event delivery (notify clients) - Add a clear event to clients HA region queue (both primary and redundant). With region destroy operation, the events are created and added into the client’s queue (if present) in all the nodes hosting the PR.
- AEQ - Clear events will NOT be added to primary and secondary AEQs.
- WAN - No clear event sent across the WAN gateway.
As discussed in the RFC, the plan was to follow/adopt the approach used with bulk operation (putAll, removeAll) where clear is applied on all the primary PR buckets.
The messaging involved with this approach:
Messaging for Cache Writer:
A local CacheWriter is invoked first, if the writer is not found locally, a message is sent to one of the peer node hosting the PR, if it is not successful, the other nodes are tried (one at a time) till its successfully invoked or all the peer nodes hosting the PR is tried (may be optimized to see which node has Cache writer).
Messaging for clear operation:
It is applied on local primary buckets and a clear message for each remote primary bucket is sent from the coordinator node. The primary buckets will take care of sending distribution clear messages to redundant copies.
Messaging for Cache Listener and Notifying Clients
Once the clear is done on primary buckets, the primary bucket holder will send a distribution clear message to the peer nodes.
After the clear is done on all buckets, the coordinator will send a new message to all servers(including accessor) to trigger listeners and notify clients. Upon receiving this new message (PRClearNotificationMessage), the servers will invoke the cache listener (if present) and notify the client queue if the server hosts a client queue.
Challenges:
In this approach, it could so happen that after a clear is finished on a primary bucket (say B1), while clear is still in progress on other primary buckets, A cache op on B1 could trigger Cache Listener invocation and client notification event, before the clear callback is performed, thus changing the ordering of the notification and client out of sync with server.
Solution to address the issue:
Solution 1:
- Take local clear lock (to avoid multiple thread staring clear on same node)
- Take distributed lock (to avoid concurrent clear in the cluster)
The following steps are the new changes from initial RFC
Check if the PR has clients interested in it (interests and CQs) and has listeners:
If has:
- Take a clear-write lock on all primary buckets, local and remote (sending messages to all the remote nodes hosting the PR).
The co-ordinator will become the write lock manager on all primary buckets
This blocks any cache operation on that PR.
- On each node, perform a primary bucket clear, invoke cache listener and add event into the client queue (if exists).
- Release the clear-write lock, local and remote by sending the message.
If not:
- Do local primary buckets clear by taking clear-write locks locally.
- Send a clear message to all the nodes hosting PR. Remote nodes will clear the owning primary bucket regions by taking clear-write locks locally.
In this case the coordinator node doesn’t manage locks on local and remote primary buckets.
Cache operations are not blocked on PR level, only at bucket region level.
Assumption:
This approach blocks any cache operation during PR clear, when the PR has cache listeners or interested clients. The assumption here is the clear will be called mostly when there is no major activity in that region. As the clear is done by calling map.clear (non off-heap), the clear op won’t be clocking the ops for a long time (we will do benchmark/measurement with this).
Pros:
Adding/supporting messages at each task.
Blocks all the cache ops, ordering of the listener and client events are maintained.
Risk:
All the cache ops are blocked on that PR till clear is completed.
Increased complexity. Need to address scenarios like:
- The coordinator node departs before releasing the primary bucket lock that was taken.
- Making sure no new primary buckets are elected and created during the clear.
- Timeouts while trying to lock/unlock remote primary buckets.
- Non coordinator member getting kicked out during clear.
- Clients retrying the clear operation
Solution 2:
- Take local clear lock (to avoid multiple thread staring clear on same node)
- Take distributed lock (to avoid concurrent clear in the cluster)
- Take lock on all local primary buckets, under the lock invoke listener and add client event.
- Send a clear message to remote nodes hosting the PR; on each remote node take local lock on primary buckets, under the lock invoke listener and add client event.
In this approach the primary bucket locks are held by peer nodes, which hosts the buckets.
With this approach the ordering of events within the node is preserved, but the ordering will not be guaranteed when the events are distributed to the redundant buckets and client queues.
- Say clear is in progress on Node/Peer 1. At the same time a cache op (say put) is performed on Node/Peer 2. When the events are replicated between them, it could so happen, Node1 invokes cache listeners in the order of “clear” and then “put” event, and it will be “put” and “clear” on Node2.
Solution 3:
Similar to RegionDestroy, for any concurrent cache operation during clear, an exception is thrown to the caller.
7 Comments
Gang Yan
for "Test cases", will it be considered to run a test "When PR.clear is on-going, try to create index on value/key"? and another case is "When PR.clear is on-going, try to alter region".
Xiaojian Zhou
We can add the 2 test cases.
Dan Smith
Did you consider just removing all of the entries in the region (using batched removals in a function, perhaps)? It might take a little while, but it wouldn't require much in the way of special logic that might introduce new data inconsistencies.
Xiaojian Zhou
We considered removeAll. There're not only the performance issue. RemoveAll is very different with clear(). It will trigger entry level listeners while clear() will not. It will create new entry version for each entry, while clear will only create one version.
RemoveAll is still there for customer to choose as an alternative. But we need to implement a PR.clear() as formal solution.
Dan Smith
Having a central coordinator that is responsible for sending multiple messages for multiple phases seems potentially error prone - what if the coordinator is killed in the middle of the clear? Seems like it is better to send a single message to the primaries, which will then do the rest.
There is already a lock to pause operations on a region -the BucketAdvisor.primaryMoveWriteLock. If you get that lock, you are guaranteed that there are no write operations in progress on the bucket. That might simplify the logic of clearing the bucket - no need for RVVs, or new distributed locks, or anything else like that.
Anilkumar Gingade
Dan Smith Good points and suggestions...About multiple messages, the idea is leveraging the existing removeAll logic, if we find it harder we will look at the single message (that was one of the option we seriously discussed). About the locking we will consider the the movWrite lock; need to make sure this helps in taking care of inflight operations that are slow to arrive (on secondary).
Xiaojian Zhou
The primaryMoveWriteLock will prevent moving primary bucket (and also sync with on-going operation). So we will employee it. This is what is called "new distributed lock" in the design. However we still need rvvLock, which will synchronize with GII and TX.