CookBook
This document describes various recipes for working with Camel- Bean Integration describes how to work with beans and Camel in a loosely coupled way so that your beans do not have to depend on any Camel APIs
- Annotation Based Expression Language binds expressions to method parameters
- Bean Binding defines which methods are invoked and how the Message is converted into the parameters of the method when it is invoked
- Bean Injection for injecting Camel related resources into your POJOs
- Parameter Binding Annotations for extracting various headers, properties or payloads from a Message
- POJO Consuming for consuming and possibly routing messages from Camel
- POJO Producing for producing camel messages from your POJOs
- RecipientList Annotation for creating a Recipient List from a POJO method
- Using Exchange Pattern Annotations describes how pattern annotations can be used to change the behaviour of method invocations
- Hiding Middleware describes how to avoid your business logic being coupled to any particular middleware APIs allowing you to easily switch from in JVM SEDA to JMS, ActiveMQ, Hibernate, JPA, JDBC, iBatis or JavaSpace etc.
- Visualisation describes how to visualise your Enterprise Integration Patterns to help you understand your routing rules
- Business Activity Monitoring (BAM) for monitoring business processes across systems
- Extract Transform Load (ETL) to load data into systems or databases
- Testing for testing distributed and asynchronous systems using a messaging approach
- Camel Test for creating test cases using a single Java class for all your configuration and routing
- Spring Testing uses Spring Test together with either XML or Java Config to dependency inject your test classes
- Guice uses Guice to dependency inject your test classes
- Templating is a great way to create service stubs to be able to test your system without some back end system.
- Database for working with databases
- Parallel Processing and Ordering on how using parallel processing and SEDA or JMS based load balancing can be achieved.
- Asynchronous Processing in Camel Routes.
- Implementing Virtual Topics on other JMS providers shows how to get the effect of Virtual Topics and avoid issues with JMS durable topics
- Camel Transport for CXF describes how to put the Camel context into the CXF transport layer.
- Fine Grained Control Over a Channel describes how to deliver a sequence of messages over a single channel and then stopping any more messages being sent over that channel. Typically used for sending data over a socket and then closing the socket.
- EventNotifier to log details about all sent Exchanges shows how to let Camels
EventNotifier
log all sent to endpoint events and how long time it took. - Loading routes from XML files into an existing CamelContext.
- Using MDC logging with Camel
- Running Camel standalone and have it keep running shows how to keep Camel running when you run it standalone.
- Hazelcast Idempotent Repository Tutorial shows how to avoid to consume duplicated messages in a clustered environment.
- How to use Camel as a HTTP proxy between a client and server shows how to use Camel as a HTTP adapter/proxy between a client and HTTP service.
Bean Integration
Camel supports the integration of beans and POJOs in a number of ways
Annotations
If a bean is defined in Spring XML or scanned using the Spring component scanning mechanism and a <camelContext> is used or a CamelBeanPostProcessor
then we process a number of Camel annotations to do various things such as injecting resources or producing, consuming or routing messages.
The following annotations is supported and inject by Camel's CamelBeanPostProcessor
Annotation |
Description |
---|---|
|
To inject an endpoint, see more details at POJO Producing. |
|
Camel 2.13: To inject a bean obtained from the Registry. See Bean Injection. |
|
Camel 2.12: To inject a value using property placeholder. |
|
To inject a producer to send message to an endpoint. See POJO Producing. |
|
To inject a consumer on a method. See POJO Consuming. |
See more details at:
- POJO Consuming to consume and possibly route messages from Camel
- POJO Producing to make it easy to produce camel messages from your POJOs
- DynamicRouter Annotation for creating a Dynamic Router from a POJO method
- RecipientList Annotation for creating a Recipient List from a POJO method
- RoutingSlip Annotation for creating a Routing Slip for a POJO method
- Bean Injection to inject Camel related resources into your POJOs
- Using Exchange Pattern Annotations describes how the pattern annotations can be used to change the behaviour of method invocations with Spring Remoting or POJO Producing
Example
See the POJO Messaging Example for how to use the annotations for routing and messaging.
Bean Component
The Bean component allows one to invoke a particular method. Alternately the Bean component supports the creation of a proxy via ProxyHelper to a Java interface; which the implementation just sends a message containing a BeanInvocation to some Camel endpoint.
Spring Remoting
We support a Spring Remoting provider which uses Camel as the underlying transport mechanism. The nice thing about this approach is we can use any of the Camel transport Components to communicate between beans. It also means we can use Content Based Router and the other Enterprise Integration Patterns in between the beans; in particular we can use Message Translator to be able to convert what the on-the-wire messages look like in addition to adding various headers and so forth.
Bean binding
Whenever Camel invokes a bean method via one of the above methods (Bean component, Spring Remoting or POJO Consuming) then the Bean Binding mechanism is used to figure out what method to use (if it is not explicit) and how to bind the Message to the parameters possibly using the Parameter Binding Annotations or using a method name option.
Annotation Based Expression Language
You can also use any of the Languages supported in Camel to bind expressions to method parameters when using Bean Integration. For example you can use any of these annotations:
Annotation |
Description |
---|---|
Inject a Bean expression |
|
Inject a BeanShell expression |
|
Inject a Constant expression |
|
Inject an EL expression |
|
Inject a Groovy expression |
|
Inject a Header expression |
|
Inject a JavaScript expression |
|
Inject a MVEL expression |
|
Inject an OGNL expression |
|
Inject a PHP expression |
|
Inject a Python expression |
|
Inject a Ruby expression |
|
Inject an Simple expression |
|
Inject an XPath expression |
|
Inject an XQuery expression |
Example:
public class Foo { @MessageDriven(uri = "activemq:my.queue") public void doSomething(@XPath("/foo/bar/text()") String correlationID, @Body String body) { // process the inbound message here } }
Advanced example using @Bean
And an example of using the the @Bean binding annotation, where you can use a POJO where you can do whatever java code you like:
public class Foo { @MessageDriven(uri = "activemq:my.queue") public void doSomething(@Bean("myCorrelationIdGenerator") String correlationID, @Body String body) { // process the inbound message here } }
And then we can have a spring bean with the id myCorrelationIdGenerator where we can compute the id.
public class MyIdGenerator { private UserManager userManager; public String generate(@Header(name = "user") String user, @Body String payload) throws Exception { User user = userManager.lookupUser(user); String userId = user.getPrimaryId(); String id = userId + generateHashCodeForPayload(payload); return id; } }
The POJO MyIdGenerator has one public method that accepts two parameters. However we have also annotated this one with the @Header and @Body annotation to help Camel know what to bind here from the Message from the Exchange being processed.
Of course this could be simplified a lot if you for instance just have a simple id generator. But we wanted to demonstrate that you can use the Bean Binding annotations anywhere.
public class MySimpleIdGenerator { public static int generate() { // generate a unique id return 123; } }
And finally we just need to remember to have our bean registered in the Spring Registry:
<bean id="myCorrelationIdGenerator" class="com.mycompany.MySimpleIdGenerator"/>
Example using Groovy
In this example we have an Exchange that has a User object stored in the in header. This User object has methods to get some user information. We want to use Groovy to inject an expression that extracts and concats the fullname of the user into the fullName parameter.
public void doSomething(@Groovy("$request.header['user'].firstName $request.header['user'].familyName) String fullName, @Body String body) { // process the inbound message here }
Groovy supports GStrings that is like a template where we can insert $ placeholders that will be evaluated by Groovy.
Visualisation
This functionality is deprecated and to be removed in future Camel releases.
Camel supports the visualisation of your Enterprise Integration Patterns using the GraphViz DOT files which can either be rendered directly via a suitable GraphViz tool or turned into HTML, PNG or SVG files via the Camel Maven Plugin.
Here is a typical example of the kind of thing we can generate
If you click on the actual generated htmlyou will see that you can navigate from an EIP node to its pattern page, along with getting hover-over tool tips ec.
How to generate
See Camel Dot Maven Goal or the other maven goals Camel Maven Plugin
For OS X users
If you are using OS X then you can open the DOT file using graphviz which will then automatically re-render if it changes, so you end up with a real time graphical representation of the topic and queue hierarchies!
Also if you want to edit the layout a little before adding it to a wiki to distribute to your team, open the DOT file with OmniGraffle then just edit away
Mock Component
The Mock component provides a powerful declarative testing mechanism, which is similar to jMock in that it allows declarative expectations to be created on any Mock endpoint before a test begins. Then the test is run, which typically fires messages to one or more endpoints, and finally the expectations can be asserted in a test case to ensure the system worked as expected.
This allows you to test various things like:
- The correct number of messages are received on each endpoint,
- The correct payloads are received, in the right order,
- Messages arrive on an endpoint in order, using some Expression to create an order testing function,
- Messages arrive match some kind of Predicate such as that specific headers have certain values, or that parts of the messages match some predicate, such as by evaluating an XPath or XQuery Expression.
Note that there is also the Test endpoint which is a Mock endpoint, but which uses a second endpoint to provide the list of expected message bodies and automatically sets up the Mock endpoint assertions. In other words, it's a Mock endpoint that automatically sets up its assertions from some sample messages in a File or database, for example.
Remember that Mock is designed for testing. When you add Mock endpoints to a route, each Exchange sent to the endpoint will be stored (to allow for later validation) in memory until explicitly reset or the JVM is restarted. If you are sending high volume and/or large messages, this may cause excessive memory use. If your goal is to test deployable routes inline, consider using NotifyBuilder or AdviceWith in your tests instead of adding Mock endpoints to routes directly.
From Camel 2.10 onwards there are two new options retainFirst
, and retainLast
that can be used to limit the number of messages the Mock endpoints keep in memory.
URI format
Where someName can be any string that uniquely identifies the endpoint.
You can append query options to the URI in the following format, ?option=value&option=value&...
Options
Option | Default | Description |
---|---|---|
|
| A size to use a throughput logger for reporting |
|
| Camel 2.10: To only keep first X number of messages in memory. |
|
| Camel 2.10: To only keep last X number of messages in memory. |
Simple Example
Here's a simple example of Mock endpoint in use. First, the endpoint is resolved on the context. Then we set an expectation, and then, after the test has run, we assert that our expectations have been met.
You typically always call the assertIsSatisfied() method to test that the expectations were met after running a test.
Camel will by default wait 10 seconds when the assertIsSatisfied()
is invoked. This can be configured by setting the setResultWaitTime(millis)
method.
Using assertPeriod
Available as of Camel 2.7
When the assertion is satisfied then Camel will stop waiting and continue from the assertIsSatisfied
method. That means if a new message arrives on the mock endpoint, just a bit later, that arrival will not affect the outcome of the assertion. Suppose you do want to test that no new messages arrives after a period thereafter, then you can do that by setting the setAssertPeriod
method, for example:
Setting expectations
You can see from the javadoc of MockEndpoint the various helper methods you can use to set expectations. The main methods are as follows:
Method | Description |
---|---|
To define the expected message count on the endpoint. | |
To define the minimum number of expected messages on the endpoint. | |
To define the expected bodies that should be received (in order). | |
To define the expected header that should be received | |
To add an expectation that messages are received in order, using the given Expression to compare messages. | |
To add an expectation that messages are received in order, using the given Expression to compare messages. | |
To add an expectation that no duplicate messages are received; using an Expression to calculate a unique identifier for each message. This could be something like the |
Here's another example:
Adding expectations to specific messages
In addition, you can use the message(int messageIndex) method to add assertions about a specific message that is received.
For example, to add expectations of the headers or body of the first message (using zero-based indexing like java.util.List
), you can use the following code:
There are some examples of the Mock endpoint in use in the camel-core processor tests.
Mocking existing endpoints
Available as of Camel 2.7
Camel now allows you to automatically mock existing endpoints in your Camel routes.
Important: The endpoints are still in action. What happens differently is that a Mock endpoint is injected and receives the message first and then delegates the message to the target endpoint. You can view this as a kind of intercept and delegate or endpoint listener.
Suppose you have the given route below:
You can then use the adviceWith
feature in Camel to mock all the endpoints in a given route from your unit test, as shown below:
Notice that the mock endpoints is given the uri mock:<endpoint>
, for example mock:direct:foo
. Camel logs at INFO
level the endpoints being mocked:
Endpoints which are mocked will have their parameters stripped off. For example the endpoint "log:foo?showAll=true" will be mocked to the following endpoint "mock:log:foo". Notice the parameters have been removed.
Its also possible to only mock certain endpoints using a pattern. For example to mock all log
endpoints you do as shown:
The pattern supported can be a wildcard or a regular expression. See more details about this at Intercept as its the same matching function used by Camel.
Mind that mocking endpoints causes the messages to be copied when they arrive on the mock.
That means Camel will use more memory. This may not be suitable when you send in a lot of messages.
Mocking existing endpoints using the camel-test
component
Instead of using the adviceWith
to instruct Camel to mock endpoints, you can easily enable this behavior when using the camel-test
Test Kit.
The same route can be tested as follows. Notice that we return "*"
from the isMockEndpoints
method, which tells Camel to mock all endpoints.
If you only want to mock all log
endpoints you can return "log*"
instead.
Mocking existing endpoints with XML DSL
If you do not use the camel-test
component for unit testing (as shown above) you can use a different approach when using XML files for routes.
The solution is to create a new XML file used by the unit test and then include the intended XML file which has the route you want to test.
Suppose we have the route in the camel-route.xml
file:
Then we create a new XML file as follows, where we include the camel-route.xml
file and define a spring bean with the class org.apache.camel.impl.InterceptSendToMockEndpointStrategy
which tells Camel to mock all endpoints:
Then in your unit test you load the new XML file (test-camel-route.xml
) instead of camel-route.xml
.
To only mock all Log endpoints you can define the pattern in the constructor for the bean:
Mocking endpoints and skip sending to original endpoint
Available as of Camel 2.10
Sometimes you want to easily mock and skip sending to a certain endpoints. So the message is detoured and send to the mock endpoint only. From Camel 2.10 onwards you can now use the mockEndpointsAndSkip
method using AdviceWith or the Test Kit. The example below will skip sending to the two endpoints "direct:foo"
, and "direct:bar"
.
The same example using the Test Kit
Limiting the number of messages to keep
Available as of Camel 2.10
The Mock endpoints will by default keep a copy of every Exchange that it received. So if you test with a lot of messages, then it will consume memory.
From Camel 2.10 onwards we have introduced two options retainFirst
and retainLast
that can be used to specify to only keep N'th of the first and/or last Exchanges.
For example in the code below, we only want to retain a copy of the first 5 and last 5 Exchanges the mock receives.
Using this has some limitations. The getExchanges()
and getReceivedExchanges()
methods on the MockEndpoint
will return only the retained copies of the Exchanges. So in the example above, the list will contain 10 Exchanges; the first five, and the last five.
The retainFirst
and retainLast
options also have limitations on which expectation methods you can use. For example the expectedXXX methods that work on message bodies, headers, etc. will only operate on the retained messages. In the example above they can test only the expectations on the 10 retained messages.
Testing with arrival times
Available as of Camel 2.7
The Mock endpoint stores the arrival time of the message as a property on the Exchange.
You can use this information to know when the message arrived on the mock. But it also provides foundation to know the time interval between the previous and next message arrived on the mock. You can use this to set expectations using the arrives
DSL on the Mock endpoint.
For example to say that the first message should arrive between 0-2 seconds before the next you can do:
You can also define this as that 2nd message (0 index based) should arrive no later than 0-2 seconds after the previous:
You can also use between to set a lower bound. For example suppose that it should be between 1-4 seconds:
You can also set the expectation on all messages, for example to say that the gap between them should be at most 1 second:
In the example above we use seconds
as the time unit, but Camel offers milliseconds
, and minutes
as well.
Testing
Testing is a crucial activity in any piece of software development or integration. Typically Camel Riders use various different technologies wired together in a variety of patterns with different expression languages together with different forms of Bean Integration and Dependency Injection so its very easy for things to go wrong! . Testing is the crucial weapon to ensure that things work as you would expect.
Camel is a Java library so you can easily wire up tests in whatever unit testing framework you use (JUnit 3.x (deprecated), 4.x, or TestNG). However the Camel project has tried to make the testing of Camel as easy and powerful as possible so we have introduced the following features.
Testing Mechanisms
The following mechanisms are supported:
Name | Component | Description |
---|---|---|
| Is a standalone Java library letting you easily create Camel test cases using a single Java class for all your configuration and routing without using CDI, Spring or Guice for Dependency Injection which does not require an in-depth knowledge of Spring + Spring Test or Guice. Supports JUnit 3.x (deprecated) and JUnit 4.x based tests. | |
CDI Testing | camel-test-cdi | Provides a JUnit 4 runner that bootstraps a test environment using CDI so that you don't have to be familiar with any CDI testing frameworks and can concentrate on the testing logic of your Camel CDI applications. Testing frameworks like Arquillian or PAX Exam, can be used for more advanced test cases, where you need to configure your system under test in a very fine-grained way or target specific CDI containers. |
| Supports JUnit 3.x (deprecated) or JUnit 4.x based tests that bootstrap a test environment using Spring without needing to be familiar with Spring Test. The plain JUnit 3.x/4.x based tests work very similar to the test support classes in Also supports Spring Test based tests that use the declarative style of test configuration and injection common in Spring Test. The Spring Test based tests provide feature parity with the plain JUnit 3.x/4.x based testing approach. Note: | |
| Camel 2.10: Provides the ability to do unit testing on blueprint configurations | |
| Deprecated Uses Guice to dependency inject your test classes | |
Camel TestNG |
| Deprecated Supports plain TestNG based tests with or without CDI, Spring or Guice for Dependency Injection which does not require an in-depth knowledge of CDI, Spring + Spring Test or Guice. From Camel 2.10: this component supports Spring Test based tests that use the declarative style of test configuration and injection common in Spring Test and described in more detail under Spring Testing. |
In all approaches the test classes look pretty much the same in that they all reuse the Camel binding and injection annotations.
Camel Test Example
Here is the Camel Test example:CamelTestSupport
but has no CDI, Spring or Guice dependency injection configuration but instead overrides the createRouteBuilder()
method.
CDI Test Example
Here is the CDI Testing example:camel-example-cdi-test
example and the test classes that come with it.
Spring Test with XML Config Example
Here is the Spring Testing example using XML Config:@DirtiesContext
on the test methods to force Spring Testing to automatically reload the CamelContext
after each test method - this ensures that the tests don't clash with each other, e.g., one test method sending to an endpoint that is then reused in another test method.
Also note the use of @ContextConfiguration
to indicate that by default we should look for the FilterTest-context.xml
on the classpath to configure the test case which looks like this:
Spring Test with Java Config Example
Here is the Spring Testing example using Java Config.
For more information see Spring Java Config.ContextConfig
class does all of the configuration; so your entire test case is contained in a single Java class. We currently have to reference by class name this class in the @ContextConfiguration
which is a bit ugly. Please vote for SJC-238 to address this and make Spring Test work more cleanly with Spring JavaConfig.
Its totally optional but for the ContextConfig
implementation we derive from SingleRouteCamelConfiguration
which is a helper Spring Java Config class which will configure the CamelContext
for us and then register the RouteBuilder
we create.
Since Camel 2.11.0 you can use the CamelSpringJUnit4ClassRunner
with CamelSpringDelegatingTestContextLoader
like example using Java Config with CamelSpringJUnit4ClassRunner
:
Spring Test with XML Config and Declarative Configuration Example
Here is a Camel test support enhanced Spring Testing example using XML Config and pure Spring Test based configuration of the Camel Context:@RunWith
annotation to support the features of CamelTestSupport
through annotations on the test class. See Spring Testing for a list of annotations you can use in your tests.
Blueprint Test
Here is the Blueprint Testing example using XML Config:getBlueprintDescriptors
to indicate that by default we should look for the camelContext.xml
in the package to configure the test case which looks like this:
Testing Endpoints
Camel provides a number of endpoints which can make testing easier.
Name | Description |
---|---|
For load & soak testing this endpoint provides a way to create huge numbers of messages for sending to Components and asserting that they are consumed correctly | |
For testing routes and mediation rules using mocks and allowing assertions to be added to an endpoint | |
Creates a Mock endpoint which expects to receive all the message bodies that could be polled from the given underlying endpoint |
The main endpoint is the Mock endpoint which allows expectations to be added to different endpoints; you can then run your tests and assert that your expectations are met at the end.
Stubbing out physical transport technologies
If you wish to test out a route but want to avoid actually using a real physical transport (for example to unit test a transformation route rather than performing a full integration test) then the following endpoints can be useful.
Name | Description |
---|---|
Direct invocation of the consumer from the producer so that single threaded (non-SEDA) in VM invocation is performed which can be useful to mock out physical transports | |
Delivers messages asynchronously to consumers via a java.util.concurrent.BlockingQueue which is good for testing asynchronous transports | |
Works like SEDA but does not validate the endpoint URI, which makes stubbing much easier. |
Testing existing routes
Camel provides some features to aid during testing of existing routes where you cannot or will not use Mock etc. For example you may have a production ready route which you want to test with some 3rd party API which sends messages into this route.
Name | Description |
---|---|
Allows you to be notified when a certain condition has occurred. For example when the route has completed five messages. You can build complex expressions to match your criteria when to be notified. | |
Allows you to advice or enhance an existing route using a RouteBuilder style. For example you can add interceptors to intercept sending outgoing messages to assert those messages are as expected. |
Asynchronous Processing
Overview
Supported versions
The information on this page applies for Camel 2.4 or later.
Before Camel 2.4 the asynchronous processing is only implemented for JBI where as in Camel 2.4 we have implemented it in many other areas. See more at Asynchronous Routing Engine.
Camel supports a more complex asynchronous processing model. The asynchronous processors implement the org.apache.camel.AsyncProcessor
interface which is derived from the more synchronous org.apache.camel.Processor
interface. There are advantages and disadvantages when using asynchronous processing when compared to using the standard synchronous processing model.
Advantages:
- Processing routes that are composed fully of asynchronous processors do not use up threads waiting for processors to complete on blocking calls. This can increase the scalability of your system by reducing the number of threads needed to process the same workload.
- Processing routes can be broken up into SEDA processing stages where different thread pools can process the different stages. This means that your routes can be processed concurrently.
Disadvantages:
- Implementing asynchronous processors is more complex than implementing the synchronous versions.
When to Use
We recommend that processors and components be implemented the more simple synchronous APIs unless you identify a performance of scalability requirement that dictates otherwise. A Processor whose process()
method blocks for a long time would be good candidates for being converted into an asynchronous processor.
Interface Details
public interface AsyncProcessor extends Processor { boolean process(Exchange exchange, AsyncCallback callback); }
The AsyncProcessor
defines a single process()
method which is very similar to it's synchronous Processor.process()
brethren.
Here are the differences:
- A non-null
AsyncCallback
MUST be supplied which will be notified when the exchange processing is completed. - It MUST not throw any exceptions that occurred while processing the exchange. Any such exceptions must be stored on the exchange's
Exception
property. - It MUST know if it will complete the processing synchronously or asynchronously. The method will return
true
if it does complete synchronously, otherwise it returnsfalse
. - When the processor has completed processing the exchange, it must call the
callback.done(boolean sync)
method. - The sync parameter MUST match the value returned by the
process()
method.
Implementing Processors that Use the AsyncProcessor API
All processors, even synchronous processors that do not implement the AsyncProcessor
interface, can be coerced to implement the AsyncProcessor
interface. This is usually done when you are implementing a Camel component consumer that supports asynchronous completion of the exchanges that it is pushing through the Camel routes. Consumers are provided a Processor
object when created. All Processor object can be coerced to a AsyncProcessor
using the following API:
Processor processor = ... AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
For a route to be fully asynchronous and reap the benefits to lower Thread usage, it must start with the consumer implementation making use of the asynchronous processing API. If it called the synchronous process()
method instead, the consumer's thread would be forced to be blocked and in use for the duration that it takes to process the exchange.
It is important to take note that just because you call the asynchronous API, it does not mean that the processing will take place asynchronously. It only allows the possibility that it can be done without tying up the caller's thread. If the processing happens asynchronously is dependent on the configuration of the Camel route.
Normally, the the process call is passed in an inline inner AsyncCallback
class instance which can reference the exchange object that was declared final. This allows it to finish up any post processing that is needed when the called processor is done processing the exchange.
Example.
final Exchange exchange = ... AsyncProcessor asyncProcessor = ... asyncProcessor.process(exchange, new AsyncCallback() { public void done(boolean sync) { if (exchange.isFailed()) { ... // do failure processing.. perhaps rollback etc. } else { ... // processing completed successfully, finish up // perhaps commit etc. } } });
Asynchronous Route Sequence Scenarios
Now that we have understood the interface contract of the AsyncProcessor
, and have seen how to make use of it when calling processors, let's looks a what the thread model/sequence scenarios will look like for some sample routes.
The Jetty component's consumers support asynchronous processing through the use of continuations. Suffice to say it can take a HTTP request and pass it to a Camel route for asynchronous processing. If the processing is indeed asynchronous, it uses a Jetty continuation so that the HTTP request is 'parked' and the thread is released. Once the Camel route finishes processing the request, the Jetty component uses the AsyncCallback
to tell Jetty to 'un-park' the request. Jetty un-parks the request, the HTTP response returned using the result of the exchange processing.
Notice that the jetty continuations feature is only used "If the processing is indeed async". This is why AsyncProcessor.process()
implementations must accurately report if request is completed synchronously or not.
The jhc
component's producer allows you to make HTTP requests and implement the AsyncProcessor
interface. A route that uses both the jetty asynchronous consumer and the jhc
asynchronous producer will be a fully asynchronous route and has some nice attributes that can be seen if we take a look at a sequence diagram of the processing route.
For the route:
from("jetty:http://localhost:8080/service") .to("jhc:http://localhost/service-impl");
The sequence diagram would look something like this:
The diagram simplifies things by making it looks like processors implement the AsyncCallback
interface when in reality the AsyncCallback
interfaces are inline inner classes, but it illustrates the processing flow and shows how two separate threads are used to complete the processing of the original HTTP request. The first thread is synchronous up until processing hits the jhc
producer which issues the HTTP request. It then reports that the exchange processing will complete asynchronously using NIO to get the response back. Once the jhc
component has received a full response it uses AsyncCallback.done()
method to notify the caller. These callback notifications continue up until it reaches the original Jetty consumer which then un-parks the HTTP request and completes it by providing the response.
Mixing Synchronous and Asynchronous Processors
It is totally possible and reasonable to mix the use of synchronous and asynchronous processors/components. The pipeline processor is the backbone of a Camel processing route. It glues all the processing steps together. It is implemented as an AsyncProcessor
and supports interleaving synchronous and asynchronous processors as the processing steps in the pipeline.
Let's say we have two custom asynchronous processors, namely: MyValidator
and MyTransformation
. Let's say we want to load file from the data/in directory validate them with the MyValidator()
processor, transform them into JPA Java objects using MyTransformation
and then insert them into the database using the JPA component. Let's say that the transformation process takes quite a bit of time and we want to allocate 20
threads to do parallel transformations of the input files. The solution is to make use of the thread processor. The thread is AsyncProcessor
that forces subsequent processing in asynchronous thread from a thread pool.
The route might look like:
from("file:data/in") .process(new MyValidator()) .threads(20) .process(new MyTransformation()) .to("jpa:PurchaseOrder");
The sequence diagram would look something like this:
You would actually have multiple threads executing the second part of the thread sequence.
Staying Synchronous in an AsyncProcessor
Generally speaking you get better throughput processing when you process things synchronously. This is due to the fact that starting up an asynchronous thread and doing a context switch to it adds a little bit of of overhead. So it is generally encouraged that AsyncProcessor
's do as much work as they can synchronously. When they get to a step that would block for a long time, at that point they should return from the process call and let the caller know that it will be completing the call asynchronously.
Implementing Virtual Topics on other JMS providers
ActiveMQ supports Virtual Topics since durable topic subscriptions kinda suck (see this page for more detail) mostly since they don't support Competing Consumers.
Most folks want Queue semantics when consuming messages; so that you can support Competing Consumers for load balancing along with things like Message Groups and Exclusive Consumers to preserve ordering or partition the queue across consumers.
However if you are using another JMS provider you can implement Virtual Topics by switching to ActiveMQ or you can use the following Camel pattern.
First here's the ActiveMQ approach.
- send to activemq:topic:VirtualTopic.Orders
- for consumer A consume from activemq:Consumer.A.VirtualTopic.Orders
When using another message broker use the following pattern
- send to jms:Orders
- add this route with a to() for each logical durable topic subscriber
from("jms:Orders").to("jms:Consumer.A", "jms:Consumer.B", ...);
- for consumer A consume from jms:Consumer.A
What's the Camel Transport for CXF
In CXF you offer or consume a webservice by defining its address. The first part of the address specifies the protocol to use. For example address="http://localhost:9000" in an endpoint configuration means your service will be offered using the http protocol on port 9000 of localhost. When you integrate Camel Tranport into CXF you get a new transport "camel". So you can specify address="camel://direct:MyEndpointName" to bind the CXF service address to a camel direct endpoint.
Technically speaking Camel transport for CXF is a component which implements the CXF transport API with the Camel core library. This allows you to easily use Camel's routing engine and integration patterns support together with your CXF services.
Integrate Camel into CXF transport layer
To include the Camel Tranport into your CXF bus you use the CamelTransportFactory. You can do this in Java as well as in Spring.
Setting up the Camel Transport in Spring
You can use the following snippet in your applicationcontext if you want to configure anything special. If you only want to activate the camel transport you do not have to do anything in your application context. As soon as you include the camel-cxf-transport jar (or camel-cxf.jar if your camel version is less than 2.7.x) in your app, cxf will scan the jar and load a CamelTransportFactory for you.
Integrating the Camel Transport in a programmatic way
Camel transport provides a setContext method that you could use to set the Camel context into the transport factory. If you want this factory take effect, you need to register the factory into the CXF bus. Here is a full example for you.
Configure the destination and conduit with Spring
Namespace
The elements used to configure an Camel transport endpoint are defined in the namespace http://cxf.apache.org/transports/camel
. It is commonly referred to using the prefix camel
. In order to use the Camel transport configuration elements, you will need to add the lines shown below to the beans element of your endpoint's configuration file. In addition, you will need to add the configuration elements' namespace to the xsi:schemaLocation
attribute.
The destination
element
You configure an Camel transport server endpoint using the camel:destination
element and its children. The camel:destination
element takes a single attribute, name
, that specifies the WSDL port element that corresponds to the endpoint. The value for the name
attribute takes the form portQName.camel-destination
. The example below shows the camel:destination
element that would be used to add configuration for an endpoint that was specified by the WSDL fragment <port binding="widgetSOAPBinding" name="widgetSOAPPort">
if the endpoint's target namespace was http://widgets.widgetvendor.net
.
The camel:destination
element for Spring has a number of child elements that specify configuration information. They are described below.
Element | Description |
---|---|
| You can specify the camel context in the camel destination |
| The camel context id which you want inject into the camel destination |
The conduit
element
You configure a Camel transport client using the camel:conduit
element and its children. The camel:conduit
element takes a single attribute, name
, that specifies the WSDL port element that corresponds to the endpoint. The value for the name
attribute takes the form portQName.camel-conduit
. For example, the code below shows the camel:conduit
element that would be used to add configuration for an endpoint that was specified by the WSDL fragment <port binding="widgetSOAPBinding" name="widgetSOAPPort">
if the endpoint's target namespace was http://widgets.widgetvendor.net
.
The camel:conduit
element has a number of child elements that specify configuration information. They are described below.
Element | Description |
---|---|
| You can specify the camel context in the camel conduit |
| The camel context id which you want inject into the camel conduit |
Configure the destination and conduit with Blueprint
From Camel 2.11.x, Camel Transport supports to be configured with Blueprint.
If you are using blueprint, you should use the the namespace http://cxf.apache.org/transports/camel/blueprint
and import the schema like the blow.
In blueprint camel:conduit
camel:destination
only has one camelContextId attribute, they doesn't support to specify the camel context in the camel destination.
Example Using Camel as a load balancer for CXF
This example shows how to use the camel load balancing feature in CXF. You need to load the configuration file in CXF and publish the endpoints on the address "camel://direct:EndpointA" and "camel://direct:EndpointB"
Complete Howto and Example for attaching Camel to CXF
Introduction
When sending an Exchange to an Endpoint you can either use a Route or a ProducerTemplate. This works fine in many scenarios. However you may need to guarantee that an exchange is delivered to the same endpoint that you delivered a previous exchange on. For example in the case of delivering a batch of exchanges to a MINA socket you may need to ensure that they are all delivered through the same socket connection. Furthermore once the batch of exchanges have been delivered the protocol requirements may be such that you are responsible for closing the socket.
Using a Producer
To achieve fine grained control over sending exchanges you will need to program directly to a Producer. Your code will look similar to:
CamelContext camelContext = ... // Obtain an endpoint and create the producer we will be using. Endpoint endpoint = camelContext.getEndpoint("someuri:etc"); Producer producer = endpoint.createProducer(); producer.start(); try { // For each message to send... Object requestMessage = ... Exchange exchangeToSend = producer.createExchange(); exchangeToSend().setBody(requestMessage); producer.process(exchangeToSend); ... } finally { // Tidy the producer up. producer.stop(); }
In the case of using Apache MINA the producer.stop() invocation will cause the socket to be closed.