Composed Message Processor

The Composed Message Processor from the EIP patterns allows you to process a composite message by splitting it up, routing the sub-messages to appropriate destinations and the re-aggregating the responses back into a single message.

In Camel we provide two solutions

The difference is when using only a Splitter it aggregates back all the splitted messages into the same aggregation group, eg like a fork/join pattern.
Whereas using the Aggregator allows you group into multiple groups, a pattern which provides more options.

Using the splitter alone is often easier and possibly a better solution. So take a look at this first, before involving the aggregator.

Example using both Splitter and Aggregator

In this example we want to check that a multipart order can be filled. Each part of the order requires a check at a different inventory.

Error formatting macro: snippet: java.lang.NullPointerException

Using the Spring XML Extensions

<route>
  <from uri="direct:start"/>
  <split>
    <simple>body</simple>
    <choice>
      <when>
        <method bean="orderItemHelper" method="isWidget"/>
	<to uri="bean:widgetInventory"/>
      </when>
      <otherwise>
	<to uri="bean:gadgetInventory"/>
      </otherwise>
    </choice>
    <to uri="seda:aggregate"/>
  </split>
</route>

<route>
  <from uri="seda:aggregate"/>
  <aggregate strategyRef="myOrderAggregatorStrategy" completionTimeout="1000">
    <correlationExpression>
      <simple>header.orderId</simple>
    </correlationExpression>
    <to uri="mock:result"/>
  </aggregate>
</route>

To do this we split up the order using a Splitter. The Splitter then sends individual OrderItems to a Content Based Router which checks the item type. Widget items get sent for checking in the widgetInventory bean and gadgets get sent to the gadgetInventory bean. Once these OrderItems have been validated by the appropriate bean, they are sent on to the Aggregator which collects and re-assembles the validated OrderItems into an order again.

When an order is sent it contains a header with the order id. We use this fact when we aggregate, as we configure this .header("orderId") on the aggregate DSL to instruct Camel to use the header with the key orderId as correlation expression.

For full details, check the example source here:

camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java

Example using only Splitter

In this example we want to split an incoming order using the Splitter eip, transform each order line, and then combine the order lines into a new order message.

Error formatting macro: snippet: java.lang.NullPointerException

Using XML

If you use XML, then the <split> tag offers the strategyRef attribute to refer to your custom AggregationStrategy

The bean with the methods to transform the order line and process the order as well:

Error formatting macro: snippet: java.lang.NullPointerException

And the AggregationStrategy we use with the Splitter eip to combine the orders back again (eg fork/join):

Error formatting macro: snippet: java.lang.NullPointerException

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

  • No labels