Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info
titleSupported versions

The information on this page applies for Camel 2.4 onwards or later.

Before Camel 2.4 the asynchronous processing is only implemented for JBI where as in Camel 2.4 onwards we have implemented it in many other areas. See more at Asynchronous Routing Engine.

Camel supports a more complex asynchronous processing model. The asynchronous processors implement the org.apache.camel.AsyncProcessor interface which is derived from the more synchronous org.apache.camel.Processor interface. There are advantages and disadvantages when using asynchronous processing when compared to using the standard synchronous processing model.

Advantages:

  • Processing routes that are composed fully of asynchronous processors do not use up threads waiting for processors to complete on blocking calls. This can increase the scalability of your system by reducing the number of threads needed to process the same workload.
  • Processing routes can be broken up into SEDA processing stages where different thread pools can process the different stages. This means that your routes can be processed concurrently.

Disadvantages:

  • Implementing asynchronous processors is more complex than implementing the synchronous versions.

...

We recommend that processors and components be implemented the more simple synchronous APIs unless you identify a performance of scalability requirement that dictates otherwise. A Processor whose whose process() method blocks for a long time would be good candidates for being converted into an asynchronous processor.

Interface Details

Code Block
java
java

public interface AsyncProcessor extends Processor {
   boolean process(Exchange exchange, AsyncCallback callback);
}

The The AsyncProcessor defines a single process() method which is very similar to it's synchronous synchronous Processor.process() brethren.

Here are the differences:

  • A non-null null AsyncCallback MUST be supplied which will be notified when the exchange processing is completed.
  • It MUST not throw any exceptions that occurred while processing the exchange. Any such exceptions must be stored on the exchange's Exception property.
  • It MUST know if it will complete the processing synchronously or asynchronously. The method will return true if it does complete synchronously, otherwise it returns false.
  • When the processor has completed processing the exchange, it must call the callback.done(boolean sync) method.
  • The sync parameter MUST match the value returned by the process() method.

Implementing Processors that Use the AsyncProcessor API

All processors, even synchronous processors that do not implement the the AsyncProcessor interface, can be coerced to implement the the AsyncProcessor interface. This is usually done when you are implementing a Camel component consumer that supports asynchronous completion of the exchanges that it is pushing through the Camel routes. Consumers are provided a Processor object when created. All Processor object can be coerced to a AsyncProcessor using the following API:

Code Block
java
languagejava

Processor processor = ...
AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(processor);

For a route to be fully asynchronous and reap the benefits to lower Thread usage, it must start with the consumer implementation making use of the asynchronous processing API. If it called the synchronous synchronous process() method instead, the consumer's thread would be forced to be blocked and in use for the duration that it takes to process the exchange.

...

Normally, the the process call is passed in an inline inner inner AsyncCallback class instance which can reference the exchange object that was declared final. This allows it to finish up any post processing that is needed when the called processor is done processing the exchange. See below for an example

Example.

Code Block
java
java

final Exchange exchange = ...
AsyncProcessor asyncProcessor = ...
asyncProcessor.process(exchange, new AsyncCallback() {
    public void done(boolean sync) {

        if (exchange.isFailed()) {
            ... // do failure processing.. perhaps rollback etc.
        } else {
            ... // processing completed successfully, finish up 
                // perhaps commit etc.
        }
    }
});

...

Now that we have understood the interface contract of the AsyncProcessor, and have seen how to make use of it when calling processors, lets let's looks a what the thread model/sequence scenarios will look like for some sample routes.

The Jetty component's consumers support async processing by using asynchronous processing through the use of continuations. Suffice to say it can take a http HTTP request and pass it to a camel Camel route for async asynchronous processing. If the processing is indeed asyncasynchronous, it uses a Jetty continuation so that the http HTTP request is 'parked' and the thread is released. Once the camel Camel route finishes processing the request, the jetty Jetty component uses the the AsyncCallback to tell Jetty to 'un-park' the request. Jetty un-parks the request, the http HTTP response returned using the result of the exchange processing.

Notice that the jetty continuations feature is only used "If the processing is indeed async". This is why why AsyncProcessor.process() implementations MUST must accurately report if request is completed synchronously or not.

The The jhc component's producer allows you to make HTTP requests and implement the the AsyncProcessor interface. A route that uses both the jetty asynchronous consumer and the the jhc asynchronous producer will be a fully asynchronous route and has some nice attributes that can be seen if we take a look at a sequence diagram of the processing route.

For the route:

Code Block

from("jetty:http://localhost:8080/service")
    .to("jhc:http://localhost/service-impl");

...

The diagram simplifies things by making it looks like processors implement the the AsyncCallback interface when in reality the the AsyncCallback interfaces are inline inner classes, but it illustrates the processing flow and shows how 2 two separate threads are used to complete the processing of the original http HTTP request. The first thread is synchronous up until processing hits the the jhc producer which issues the http the HTTP request. It then reports that the exchange processing will complete async since it will use a asynchronously using NIO to complete getting get the response back. Once the the jhc component has received a full response it uses AsyncCallback.done() method to notify the caller. These callback notifications continue up until it reaches the original jetty Jetty consumer which then un-parks the http HTTP request and completes it by providing the response.

...

It is totally possible and reasonable to mix the use of synchronous and asynchronous processors/components. The pipeline processor is the backbone of a Camel processing route. It glues all the processing steps together. It is implemented as an an AsyncProcessor and supports interleaving synchronous and asynchronous processors as the processing steps in the pipeline.

Lets Let's say we have 2 two custom asynchronous processors, namely: MyValidator and MyTransformation, both of which are synchronous processors. Lets . Let's say we want to load file from the data/in directory validate them with the the MyValidator() processor, Transform transform them into JPA java Java objects using using MyTransformation and then insert them into the database using the JPA component. Lets Let's say that the transformation process takes quite a bit of time and we want to allocate allocate 20 threads to do parallel transformations of the input files. The solution is to make use of the thread processor. The thread is is AsyncProcessor that forces subsequent processing in asynchronous thread from a thread pool.

The route might look like:

Code Block

from("file:data/in")
  .process(new MyValidator())
  .threads(20)
  .process(new MyTransformation())
  .to("jpa:PurchaseOrder");

...

You would actually have multiple threads executing the 2nd second part of the thread sequence.

Staying

...

Synchronous in

...

an AsyncProcessor

Generally speaking you get better throughput processing when you process things synchronously. This is due to the fact that starting up an asynchronous thread and doing a context switch to it adds a little bit of of overhead. So it is generally encouraged that AsyncProcessors AsyncProcessor's do as much work as they can synchronously. When they get to a step that would block for a long time, at that point they should return from the process call and let the caller know that it will be completing the call asynchronously.