Camel RX
Available as of Camel 2.11
The camel-rx library provides Camel support for the Reactive Extensions (RX) using the RxJava library so that:
- Camel users can use the RxJava API for processing messages on endpoints using a typesafe composable API
- RxJava users get to use all of the Camel transports and protocols from within the RxJava API
Background on RX
For a more in depth background on RX check out the RxJava wiki on Observable and the Reactive pattern or the Microsoft RX documentation.
You can think of RX as providing an API similar to Java 8 / Groovy / Scala collections (methods like filter, forEach, map, reduce, zip etc) - but which operates on an asynchronous stream of events rather than a collection. So you could think of RX as like working with asynchronous push based collections (rather than the traditional synchronous pull based collections).
In RX you work with an Observable<T> which behaves quite like a Collection<T> in Java 8 so you can filter/map/concat and so forth. The Observable<T> then acts as a typesafe composable API for working with asynchronous events in a collection-like way.
Once you have an Observable<T> you can then
Observing events on Camel endpoints
You can create an Observable<Message> from any endpoint using the ReactiveCamel helper class and the toObservable() method.
import org.apache.camel.rx.*; ReactiveCamel rx = new ReactiveCamel(camelContext); Observable<Message> observable = rx.toObservable("activemq:MyMessages"); // we can now call filter/map/concat etc filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody());
If you know the type of the message payload (its body), you can use an overloaded version of toObservable() to pass in the class and get a typesafe Observable<T> back:
import org.apache.camel.rx.*; ReactiveCamel rx = new ReactiveCamel(camelContext); Observable<Order> observable = rx.toObservable("seda:orders", Order.class); // now lets filter and map using Java 7 Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() { public Boolean call(Order order) { return order.getAmount() > 100.0; } }).map(new Func1<Order, String>() { public String call(Order order) { return order.getId(); } });
Sending Observable<T> events to Camel endpoints
If you have an Observable<T> from some other library; or have created one from a Future<T> using RxJava and you wish to send the events on the observable to a Camel endpoint you can use the sendTo() method on ReactiveCamel:
import org.apache.camel.rx.*; // take some observable from somewhere Observable<T> observable = ...; ReactiveCamel rx = new ReactiveCamel(camelContext); // lets send the events to a message queue rx.sendTo(observable, "activemq:MyQueue");
Embedding some RxJava processing inside a Camel route
Sometimes you may wish to use a Camel route to consume messages, perform content based routing, transformation, deal with data format marshalling and so forth and then within the route invoke some typesafe RxJava event processing.
One approach is to just send messages from inside the Camel route to an endpoint; then use the toObservable() method to bind the endpoint to an Observable<T>.
However if you prefer to embed the RxJava processing of messages inside your route there are 2 helper classes which can be used to wrap up the RxJava processing as a Camel Processor that can be easily embed into a Camel route.
You can use the ObservableMessage or ObservableBody classes which both have an abstract configure() method like RouteBuilder. In the configure method you can then process the Observable<T> for the Camel Message or the message body.
e.g.
public class MyObservableBody extends ObservableBody<String> { public MyObservableBody() { super(String.class); } protected void configure(Observable<String> observable) { // lets process the messages using the RX API observable.map(new Func1<String, String>() { public String call(String body) { return "Hello " + body; } }).subscribe(new Action1<String>() { public void call(String body) { template.sendBody(resultEndpoint, body); } }); } } ... // now lets use this inside a route... from("seda:foo").process(new MyObservableBody());
Another approach, if you are consuming directly from Camel using the Bean Integration is to just use the RxJava Subject directly:
import rx.subjects.Subject; public class MyThing { private final Subject<String> observable = Subject.create(); public MyThing() { // now process the observable somehow.... } @Consume(uri="activemq:myqueue") public void onMessageBody(String body) { subject.onNext(body); } }
Though using the toObservable on ReactiveCamel is maybe a little simpler.