ToAsync
Available as of Camel 2.1
To be removed
Unfortunately this feature did not work out, and is going to be replaced with a new fully asynchronous routing engine in Camel 2.4.
ToAsync will be removed from Camel 2.4 onwards. Do not use this feature or prepare to migrate when upgrading to future Camel releases!
Background
ToAsync is a new feature build into the core of Camel 2.1. It sets the foundation base for non blocking asynchronous Request Reply messaging with Camel. By foundation we mean that it has the moving pieces that makes Camel Components leverage this in case they natively supports non blocking request reply. At this time of writing its only the Jetty component that offers this. Over time we will incorporate this into other components. However don't despair as Camel have fallback support for simulating asynchronous request/reply in the Camel core, in cases where its not supported natively in the Camel component used.
May change in the future
How does it work?
Camel now has a new DSL named toAsync
in the Java DSL. And in Spring XML there is a new async
boolean attribute on the <to>
XML tag. So in a Camel route you can leverage this by just using a using this when sending to an endpoint when you do Request Reply messaging.
The route below is from a unit test in Camel. Notice the new toAsync
where we use this in action. What happens is that the message is send asynchronously to the direct:bar
endpoint. When the reply comes back the message is continued being routed on the other side where its routed to the mock:result
endpoint. The number 5 indicates the thread pool size, so we have 5 concurrent consumers to route the replies when they come back.
from("direct:start").to("mock:a").toAsync("direct:bar", 5).to("mock:result"); from("direct:bar").to("mock:b").transform(constant("Bye World"));
This unit test is using the direct endpoint when sending using toAsync
and the direct component does not support non blocking request reply messaging natively. And therefore Camel fallback to simulate this by transferring the Exchange to an internal thread pool which handles the task of sending the Exchange. This then frees the burden of the original thread to send the request and thus its work is offloaded and it can continue. What it means is the that original thread is not blocked. The internal thread pool will in the mean time send the request and handover the exchange to the completed task queue when the reply comes back, and then the Exchange can be continued routed in the route path. In this example it will just route to the mock:result
endpoint.
By using a component that truly supports non blocking Request Reply such as Jetty, its in fact Jetty itself that handles the Request Reply so the internal thread pool in Camel is not used.
The HTTP Async Example
In this example we use the following route:
<route> <!-- we route from a direct endpoint --> <from uri="direct:start"/> <!-- log the request --> <to uri="log:+++ request +++"/> <!-- then doing a non blocking async request/reply to the http server with a pool of 10 threads --> <to uri="jetty://http://0.0.0.0:9123/myapp" async="true" poolSize="10"/> <!-- the reply from the server is logged --> <to uri="log:+++ reply +++"/> <!-- and to our mock so we can assert we got all responses --> <to ref="result"/> </route>
Notice how we have specified async="true"
and poolSize="10"
in the <to>
tag to leverage this.
To learn more see and try this example yourself.
How this works inside Camel
Camel uses the org.apache.camel.AsyncProcessor
which is responsible for processing the non blocking message.
void process(Exchange exchange, AsyncCallback callback) throws Exception;
The idea is that you invoke the callback when the reply is ready.
The org.apache.camel.AsyncCallback
has the following method
void onTaskCompleted(Exchange exchange);
which is the method to invoke when the reply is ready.
All the other moving parts is build directly into the core of Camel. So you do not have to worry about that.
If you want to take a look its implemented in the org.apache.camel.processor.SendAsyncProcessor
class.
JettyHttpProducer
This class is currently the only implementation of this feature.
Basically you do implement the AsyncProcessor
interface
public class JettyHttpProducer extends DefaultProducer implements AsyncProcessor
And then invoke the callback when the reply is ready and populated on the Exchange.
See the class JettyContentExchange
how this is implemented in Jetty.
What if a Component does not support non blocking?
Many components does not naturally support non blocking Request Reply. The ones that does need to be improved in Camel to support the AsyncProcessor
to integrate with ToAsync. If a component does not support 'non blocking' (i.e. does not use AsyncProcessor
then Camel will automatic fallback and simulate this by using an internal thread pool that sends the request and blocks until the reply is ready. Just as it did in the very first example on this page using the direct endpoint.
Configuring thread pooling
You configure the thread pool on the toAsync
by either supply a pool size as the 2nd parameter as we saw in the first example.
But you can also refer to a java.util.concurrent.ExecutorService
as follows:
from("direct:start").to("mock:a").toAsync("direct:bar").executorServiceRef("mySharedPool").to("mock:result");
In the Spring XML there is an attribute to refer to the thread pool
<to uri="jetty://http://0.0.0.0:9123/myapp" async="true" executorServiceRef="mySharedPool"/>
See also
- HTTP Async Example which shows the non blocking asynchronous in action using the Jetty component. (natively supported)
- CXF Async Example which shows the non blocking asynchronous in action using the CXF component. (simulated in Camel)
- Async for synchronous support in Camel routes and as Camel client API