Background
A lengthy discussion on replay in January 2007(page #7, Thread) highlighted a number of requirements and possible implementation options for adding replay to Qpid and AMQP. The requirements come from the desire to speed up the rate a consumer can read messages and to simplify its recovery when it starts. This page is to give some background, a proposal and finally some implementation options for discussion.
History
When a an application updates the state of a single Resource Manager, e.g a database or queue manager, it normally does so within the context of a local Transaction and this transaction exhibits the following ACID properties:
- Atomicity The result of the transaction are either all commited or all rolled back.
- Consistency The completed transaction transformed the resource from one known state to another. Inserting a row into a database or removing a message from a queue are common examples.
- Isolation Changes the the resources state effected by the transaction does not become visible ouside of the transaction until the transaction commits.
- Durability The changes that reasult from the transactions commitment survive subsequent system or media faulures.
Distributed Transaction
A distributed transaction is typically implemented by performing a Two Phase Commit (2PC) over which there are several varients the most well know being the X/Open XA specification. Where both the middleware and the consumer support XA, a separate Transaction Manager isused to coordinate the local transactions. The transaction manager coordinates atomicity at the global level whist each resource manager is responsible for the ACID properties of its local transactions.
These benefits do not come without cost.
- Increased transaction processing latency, typically due to the additional forced disk writes.
- Applications can become blocked pending the resolution of an in-doubt global transaction.
- Reduced concurrency
- Multi-system deadlock
- Administration complexity
- Backing up the transaction manager involves co-ordinating all transaction logs at the same time or processing must be suspended.
Idempotence and Replaying
When a message is being moved from system A to system B (e.g. from WebSphereMQ to ORACLE), distributed transactions can be avoided if;
- System B can handle duplicates (or can detect them and deal with them accordingly) - i.e. it is idempotent.
- System A can replay messages from a known stable point in history.
The most common way of doing this is simply managing the local transactions so that system B commits before system A. The start of replay is them the end of the last transaction on system A.
Typically a MOM will immediately delete a message once it has been commited by all of its consumers.
Commit is Not The End
If messages are made available for replay to a consumer after it has been commited, we can stretch the point in time the consumer recovers from back to any point.
- A few minutes ago
- Trade ID FFS987654321
- Start of day
- End of yesterday
- Friday.
This larger recovery window lets downstream consumers the flexibility to recover from more failure scenarios.
- Retry an end of day batch job.
- Replay due to reference data problem in target system.
- Replay due to database or application failure.
Replay as a First Class Service
When a traditional queue is opened for reading, it is opened and the next message is the oldest one that has not been destructively read (i.e. read and commited).
In isolation, a consumer manages its own local transactions with the message broker to confirm when a message or group of messages is processed, stored and stable. The local transaction leads to a disk write in the queue storage to mark the messages as read.
In this XA free world the consumer relys on the messaging to replay messages from the applications last good known state. As its always reading from a queue, the only extension to the queues semantics is to let it be opened for reading from a known message, irrespective of whether the message has been committed or not and it goes without saying that they should be in the same order in which they were originally delivered.
Many consumers of a guaranteed message flow are writing to a database and this database is the consumers view of its state, it's certainly where the consumer recovers from when it starts up. The traditional model of using a transaction manager, typically XA, to co-ordinate the local transactions on the database and messaging broker is slow and not without its problems.
Another model is to have the messaging infrastructure support replay of messages from a known point in history i.e. to correlate the current state of the consumers database with a last received message that last caused an update to the database from this channel. This is not an all encompassing pattern but rather compliments other ways to synchronize state between a message broker and a database.
Requirements
- Replay messages from a queue from a given message identified by a message ID or a header property.
- Administrative support to purge messages from a queue as part of a business process such as End of Day
- Zero impact on other queues and their consumers.
Proposal: A Replayable Queue
Queues are the storage agents in AMQP so are the logical point to provide replay. A Replayable Queue (RQ) is not the default queue behavior but rather has to be explicitly configured. In may ways an RQ is somewhere between a traditional queue and a transaction log such as HOWL
An RQ has the following properties:
- An RQ can only have a single consumer. Multiple consumers complicate the problem so I propose discounting them for now.
- Messages are not deleted when consumed by a regular consumer. The act of acknowledging the message is just another property on the message. Indeed, the consumer may never acknowledge the message as this implies a write on the message broker to update the messages state.
- When an RQ is opened for reading, the consumer must give a selector that will identify a point in the queue to begin message delivery from.
- Administratively defined points in the queue exist. These points can be defined by an administration API and associated tooling and used as points to replay from.
- Queues are purged of messages by the administration API or associated tooling. This allows external processes such as End of Day to initiate message archiving or deletion when it is safe to do so.
- An RQ can be replicated. Implementation options? SAN replication, dual writes?
Benefits
- An RQ, by virtue of a single consumer, does not need to be written to when a consumer reads messages as it is the responsibility of the consumer to provide the synchronization point when it first connects. This can significantly speed up the consumer as its bottleneck will be its own database write.
- A complete record of all messaging activity is available.
Downsides
- The size of the store needs careful management so any implementation details do not cause performance issues.