The page intends to collect all ideas and proposals around the idea of a Message Store as a architectural concept in Camel. No implementation has started yet. You can participate by sharing your input here or post it to the dev mailing list.

For the initial brainstorming on the dev mailing list, please check this thread.

From the Camel 3.0 ideas page

(+1: hadrian, claus)
We should make this EIP easier to use for end users, but offering a better public API. And also have a pluggable message store, with filters that can filter what should be stored. As well pluggable marshallers so people can marshal data from Exchange into a format the message store can store (BLOB, XML, JSon etc.).
A Message Store could provide transparent persistence to various EIP patterns (or used directly). Implementations would handle the mapping to the underlying database or file system or NoSQL or memory or whatever.
Message Store implementations are already there in various places, using different approaches, like in Stream Caching (only file system), AggregationRepository or IdempotentRepository. A Message Store is requested for in other places like Reliable stream resequencing (CAMEL-949) or Persistent Dead Letter Queue (CAMEL-4575).

Summary

In Camel 2.x, a Message Store is not a "first class citizen" in the sense of a architectural concept, which can be applied consistently whereever needed.
Camel 3.0 should introduce a unified, generic, pluggable Message Store that consolidates the different approaches and allows to similarly parameterize persistence to various EIP patterns.
It should be usable independently of EIP patterns as well.
Implementations would handle the mapping to the underlying persistence layer, which can be InMemory, RDBMS, NoSQL-based etc. and can be chosen based on the requirements towards reliability (survive system shutdowns with losing messages) and scalability (reduce memory consumption when processing many/big messages).

Side note: Spring Integration provides a corresponding concept.

List of proposed features

Feel free to add, edit, comment.... At some time we probably need to assign priorities as to what is indispensable for Camel 3.0 and what could be delivered in a later patch release

Message Store Data

In order to disambiguate stored exchanges and make them retrievable again, message store entries must carry attributes in addition to the marshaled exchange.

Item

Description/Reason

ID

Generated unique ID of the entry

Exchange

exchange (or parts thereof), usually in some marshalled form (except in-memory stores)

CamelContext

A message store may be used by several CamelContext instances

Correlation

Correlation ID to be able to aggregate related exchanges

Source

Identifier that uniquely describes the point in the route from which an exchange was stored. E.g. there might be more than one aggregator processor in a route. Could also be used to manually or automatically refeed exchanges back into the route

Status

Status of an exchange (e.g. PROCESSING, DONE, FAILED)

CreationTime

Timestamp when the entry was created

ExpirationTime

Timestamp when the entry can be considered as expired and picked up by some cleaning process

...

... more?

Code examples

Sometimes it is easier to express thoughts by providing a fictional piece of code along with some comments....

This section intends to demonstrate the usage of a Message Store by providing hypothetical code snippets, e.g.

...
from(...)
   .aggregate()
       .correlationExpression(header(id))
       .aggregationStrategy(myStrategy)
       .completionTimeout(10000)
       .messageStore(myStore)
...
Claim check

THe claim check pattern temporarily reduces the data volume of the message by storing content in a message store in exchange for a claim check token. The content is retrieved later on before it's needed again.


// Optionally: override default store from context
// (ohr:) IMHO I don't think that this configuration level is really necessary
defaultMessageStore(myStore);

// 1) Store body.
// 2) Set body to null.
// 3) Set Exchange.CLAIM_CHECK header to unique claim id.
from(...)
   .checkIn()  // store body in default store
   // .checkIn(header('bigHeader'), customStore) : store header in custom store 
   .to(...);
// 1) Lookup for the Exchange.CLAIM_CHECK header value.
// 2) Read the message.
// 3) Set body to the value fetched from the store.
from(...)
   // .setHeader(Exchange.CLAIM_CHECK, const("id")) : header should still contain the claim id 
   .reclaim() // read body from default store
   // .reclaim().aggregationStrategy(myStrategy) : more generically using a aggregation strategy
   // .reclaim(customStore).aggregationStrategy(myStrategy) : reclaim from custom store using a aggregation strategy
   .to(...);

Open issues: