Camel supports a more complex asynchronous processing model. The asynchronous processors implement the
org.apache.camel.AsyncProcessor interface which is derived from the more synchronous
org.apache.camel.Processor interface. There are advantages and disadvantages when using asynchronous processing when compared to using the standard synchronous processing model.
- Processing routes that are composed fully of asynchronous processors do not use up threads waiting for processors to complete on blocking calls. This can increase the scalability of your system by reducing the number of threads needed to process the same workload.
- Processing routes can be broken up into SEDA processing stages where different thread pools can process the different stages. This means that your routes can be processed concurrently.
- Implementing asynchronous processors is more complex than implementing the synchronous versions.
When to Use
We recommend that processors and components be implemented the more simple synchronous APIs unless you identify a performance of scalability requirement that dictates otherwise. A Processor whose
process() method blocks for a long time would be good candidates for being converted into an asynchronous processor.
AsyncProcessor defines a single
process() method which is very similar to it's synchronous
Here are the differences:
- A non-null
AsyncCallbackMUST be supplied which will be notified when the exchange processing is completed.
- It MUST not throw any exceptions that occurred while processing the exchange. Any such exceptions must be stored on the exchange's
- It MUST know if it will complete the processing synchronously or asynchronously. The method will return
trueif it does complete synchronously, otherwise it returns
- When the processor has completed processing the exchange, it must call the
- The sync parameter MUST match the value returned by the
Implementing Processors that Use the AsyncProcessor API
All processors, even synchronous processors that do not implement the
AsyncProcessor interface, can be coerced to implement the
AsyncProcessor interface. This is usually done when you are implementing a Camel component consumer that supports asynchronous completion of the exchanges that it is pushing through the Camel routes. Consumers are provided a
Processor object when created. All Processor object can be coerced to a
AsyncProcessor using the following API:
For a route to be fully asynchronous and reap the benefits to lower Thread usage, it must start with the consumer implementation making use of the asynchronous processing API. If it called the synchronous
process() method instead, the consumer's thread would be forced to be blocked and in use for the duration that it takes to process the exchange.
It is important to take note that just because you call the asynchronous API, it does not mean that the processing will take place asynchronously. It only allows the possibility that it can be done without tying up the caller's thread. If the processing happens asynchronously is dependent on the configuration of the Camel route.
Normally, the the process call is passed in an inline inner
AsyncCallback class instance which can reference the exchange object that was declared final. This allows it to finish up any post processing that is needed when the called processor is done processing the exchange.
Asynchronous Route Sequence Scenarios
Now that we have understood the interface contract of the
AsyncProcessor, and have seen how to make use of it when calling processors, let's looks a what the thread model/sequence scenarios will look like for some sample routes.
The Jetty component's consumers support asynchronous processing through the use of continuations. Suffice to say it can take a HTTP request and pass it to a Camel route for asynchronous processing. If the processing is indeed asynchronous, it uses a Jetty continuation so that the HTTP request is 'parked' and the thread is released. Once the Camel route finishes processing the request, the Jetty component uses the
AsyncCallback to tell Jetty to 'un-park' the request. Jetty un-parks the request, the HTTP response returned using the result of the exchange processing.
Notice that the jetty continuations feature is only used "If the processing is indeed async". This is why
AsyncProcessor.process() implementations must accurately report if request is completed synchronously or not.
jhc component's producer allows you to make HTTP requests and implement the
AsyncProcessor interface. A route that uses both the jetty asynchronous consumer and the
jhc asynchronous producer will be a fully asynchronous route and has some nice attributes that can be seen if we take a look at a sequence diagram of the processing route.
For the route:
The sequence diagram would look something like this:
The diagram simplifies things by making it looks like processors implement the
AsyncCallback interface when in reality the
AsyncCallback interfaces are inline inner classes, but it illustrates the processing flow and shows how two separate threads are used to complete the processing of the original HTTP request. The first thread is synchronous up until processing hits the
jhc producer which issues the HTTP request. It then reports that the exchange processing will complete asynchronously using NIO to get the response back. Once the
jhc component has received a full response it uses
AsyncCallback.done() method to notify the caller. These callback notifications continue up until it reaches the original Jetty consumer which then un-parks the HTTP request and completes it by providing the response.
Mixing Synchronous and Asynchronous Processors
It is totally possible and reasonable to mix the use of synchronous and asynchronous processors/components. The pipeline processor is the backbone of a Camel processing route. It glues all the processing steps together. It is implemented as an
AsyncProcessor and supports interleaving synchronous and asynchronous processors as the processing steps in the pipeline.
Let's say we have two custom asynchronous processors, namely:
MyTransformation. Let's say we want to load file from the data/in directory validate them with the
MyValidator() processor, transform them into JPA Java objects using
MyTransformation and then insert them into the database using the JPA component. Let's say that the transformation process takes quite a bit of time and we want to allocate
20 threads to do parallel transformations of the input files. The solution is to make use of the thread processor. The thread is
AsyncProcessor that forces subsequent processing in asynchronous thread from a thread pool.
The route might look like:
The sequence diagram would look something like this:
You would actually have multiple threads executing the second part of the thread sequence.
Staying Synchronous in an
Generally speaking you get better throughput processing when you process things synchronously. This is due to the fact that starting up an asynchronous thread and doing a context switch to it adds a little bit of of overhead. So it is generally encouraged that
AsyncProcessor's do as much work as they can synchronously. When they get to a step that would block for a long time, at that point they should return from the process call and let the caller know that it will be completing the call asynchronously.