Loan Broker ExampleThis example shows how to use Camel to implement the EIP's loan broker example. The example has two versions,one for JMS, one for webservice one. Implementation with message queue (JMS)The queue version of loan broker is based on the camel-jms component, and it shows how to using the message queue to connect the different service models (such as the credit agency , and banks). The example should run if you type mvn exec:java -PQueue.LoanBroker mvn exec:java -PQueue.Client To stop the example hit ctrl + c let's take a look how this service modules are put together. // Put the message from loanRequestQueue to the creditRequestQueue from("jms:queue:loanRequestQueue").to("jms:queue:creditRequestQueue"); // Now we can let the CreditAgency process the request, then the message will be put into creditResponseQueue from("jms:queue:creditRequestQueue").process(new CreditAgency()).to("jms:queue:creditResponseQueue"); // Here we use the multicast pattern to send the message to three different bank queues from("jms:queue:creditResponseQueue").multicast().to("jms:queue:bank1", "jms:queue:bank2", "jms:queue:bank3"); // Each bank processor will process the message and put the response message into the bankReplyQueue from("jms:queue:bank1").process(new Bank("bank1")).to("jms:queue:bankReplyQueue"); from("jms:queue:bank2").process(new Bank("bank2")).to("jms:queue:bankReplyQueue"); from("jms:queue:bank3").process(new Bank("bank3")).to("jms:queue:bankReplyQueue"); // Now we aggregate the response message by using the Constants.PROPERTY_SSN header. // The aggregation will be complete when all the three bank responses are received // In Camel 2.0 the we use AGGERATED_SIZE instead of AGGERATED_COUNT as the header // name of the aggregated message size. from("jms:queue:bankReplyQueue") .aggregate(header(Constants.PROPERTY_SSN), new BankResponseAggregationStrategy()) .completionPredicate(header(Exchange.AGGREGATED_SIZE).isEqualTo(3)) // Here we do some translation and put the message back to loanReplyQueue .process(new Translator()).to("jms:queue:loanReplyQueue"); The CreditAgency , Bank and Translator are all the implementation of Processor interface. We implement the business logical in the void process(Exchange exchange) method. CreditAgency public class CreditAgency implements Processor { private static final transient Log LOG = LogFactory.getLog(CreditAgency.class); public void process(Exchange exchange) throws Exception { LOG.info("Receiving credit agency request"); String ssn = exchange.getIn().getHeader(Constants.PROPERTY_SSN, String.class); int score = (int) (Math.random() * 600 + 300); int hlength = (int) (Math.random() * 19 + 1); exchange.getOut().setHeader(Constants.PROPERTY_SCORE, new Integer(score)); exchange.getOut().setHeader(Constants.PROPERTY_HISTORYLENGTH, new Integer(hlength)); exchange.getOut().setHeader(Constants.PROPERTY_SSN, ssn); exchange.getOut().setBody("CreditAgency processed the request."); } } Bank public class Bank implements Processor { private static final transient Log LOG = LogFactory.getLog(Bank.class); private String bankName; private double primeRate; public Bank(String name) { bankName = name; primeRate = 3.5; } public void process(Exchange exchange) throws Exception { String ssn = exchange.getIn().getHeader(Constants.PROPERTY_SSN, String.class); Integer historyLength = exchange.getIn().getHeader(Constants.PROPERTY_HISTORYLENGTH, Integer.class); double rate = primeRate + (double)(historyLength / 12) / 10 + (double)(Math.random() * 10) / 10; LOG.info("The bank: " + bankName + " for client: " + ssn + " 's rate " + rate); exchange.getOut().setHeader(Constants.PROPERTY_RATE, new Double(rate)); exchange.getOut().setHeader(Constants.PROPERTY_BANK, bankName); exchange.getOut().setHeader(Constants.PROPERTY_SSN, ssn); exchange.getOut().setBody("Bank processed the request."); // Sleep some time try { Thread.sleep((int) (Math.random() * 10) * 100); } catch (InterruptedException e) { // Discard } } } Translator public class Translator implements Processor { public void process(Exchange exchange) throws Exception { String bank = (String)exchange.getIn().getHeader(Constants.PROPERTY_BANK); Double rate = (Double)exchange.getIn().getHeader(Constants.PROPERTY_RATE); String ssn = (String)exchange.getIn().getHeader(Constants.PROPERTY_SSN); exchange.getOut().setBody("Loan quotion for Client " + ssn + "." + " The lowest rate bank is " + bank + ", the rate is " + rate); } } You may found we set a custom aggregation strategy to find out the lowest loan rate from bank response message. public class BankResponseAggregationStrategy implements AggregationStrategy { private static final transient Log LOG = LogFactory.getLog(BankResponseAggregationStrategy.class); private boolean aggregatingOutMessage; public BankResponseAggregationStrategy setAggregatingOutMessage(boolean flag) { aggregatingOutMessage = flag; return this; } // Here we put the bank response together public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { LOG.debug("Get the exchange to aggregate, older: " + oldExchange + " newer:" + newExchange); // the first time we only have the new exchange if (oldExchange == null) { return newExchange; } Message oldMessage; Message newMessage; oldMessage = oldExchange.getIn(); newMessage = newExchange.getIn(); Double oldRate = oldMessage.getHeader(Constants.PROPERTY_RATE, Double.class); Double newRate = newMessage.getHeader(Constants.PROPERTY_RATE, Double.class); Exchange result; if (newRate >= oldRate) { result = oldExchange; } else { result = newExchange; } LOG.debug("Get the lower rate exchange " + result); return result; } } We start the loan broker after we start up the ActiveMq broker and the connection factory of Camel-JMS component. public static void main(String... args) throws Exception { CamelContext context = new DefaultCamelContext(); JmsBroker broker = new JmsBroker(); broker.start(); // Set up the ActiveMQ JMS Components ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:51616"); // Note we can explicitly name the component context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new LoanBroker()); // Start the loan broker context.start(); System.out.println("Server is ready"); Thread.sleep(5 * 60 * 1000); context.stop(); Thread.sleep(1000); broker.stop(); } Now we can send the request from client and pull the response message back public class Client extends RouteBuilder { public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); // Set up the ActiveMQ JMS Components ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:51616"); // Note we can explicit name of the component context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new Client()); ProducerTemplate template = context.createProducerTemplate(); context.start(); // send out the request message for (int i = 0; i < 2; i++) { template.sendBodyAndHeader("jms:queue:loanRequestQueue", "Quote for the lowerst rate of loaning bank", Constants.PROPERTY_SSN, "Client-A" + i); Thread.sleep(100); } // wait for the response Thread.sleep(2000); // send the request and get the response from the same queue Exchange exchange = template.send("jms:queue2:parallelLoanRequestQueue", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); exchange.getIn().setBody("Quote for the lowerst rate of loaning bank"); exchange.getIn().setHeader(Constants.PROPERTY_SSN, "Client-B"); } }); String bank = (String)exchange.getOut().getHeader(Constants.PROPERTY_BANK); Double rate = (Double)exchange.getOut().getHeader(Constants.PROPERTY_RATE); String ssn = (String)exchange.getOut().getHeader(Constants.PROPERTY_SSN); System.out.println("Loan quotion for Client " + ssn + "." + " The lowest rate bank is " + bank + ", the rate is " + rate); // Wait a while before stop the context Thread.sleep(1000 * 5); context.stop(); } /** * Lets configure the Camel routing rules using Java code to pull the response message */ public void configure() { from("jms:queue:loanReplyQueue").process(new Processor() { public void process(Exchange exchange) throws Exception { // Print out the response message System.out.println(exchange.getIn().getBody()); } }); } } Implementation with web serviceThe web service version of loan broker is based on the camel-cxf component which can produce and consume the SOAP message on the wire. It uses the InOut Message exchange pattern, when the client send out the message to the router , it can get the response message back from the same endpoint. The example should run if you type mvn exec:java -PWS.LoanBroker mvn exec:java -PWS.Client To stop the example hit ctrl + c First, let's go through the SEI (Service Endpoint Interface) for LoanBroker, CreditAgency and Bank. LoanBroker // This SEI has no @WebService annotation, we use the simple frontend API to create client and server public interface LoanBrokerWS { String getLoanQuote(String ssn, Double loanAmount, Integer loanDuriation); } CreditAgency @WebService public interface CreditAgencyWS { int getCreditScore(String ssn); int getCreditHistoryLength(String ssn); } Bank // Since we use @WebServices here, please make sure to use JaxWs frontend API create the client and server @WebService public interface BankWS { String getBankName(); BankQuote getQuote(String ssn, double loanAmount, int loanDuration, int creditHistory, int creditScore); } Here are two routing rules in DSL , one is for routing the request to bank sequentially, the other is for calling the bank service parallely. // Router 1 to call the bank endpoints sequentially from(Constants.LOANBROKER_URI) // Using the CreditScoreProcessor to call the credit agency service .process(new CreditScoreProcessor(Constants.CREDITAGENCY_ADDRESS)) // Set the aggregation strategy on the multicast pattern .multicast(new BankResponseAggregationStrategy()) // Send out the request to three different banks sequentially .to(Constants.BANK1_URI, Constants.BANK2_URI, Constants.BANK3_URI); // Router 2 to call the bank endpoints in parallel from(Constants.PARALLEL_LOANBROKER_URI) .process(new CreditScoreProcessor(Constants.CREDITAGENCY_ADDRESS)) // Using the thread pool to send out messages to three different banks in parallel .multicast(new BankResponseAggregationStrategy()) // Camel will create a thread pool with the default size (10) // for sending the message in parallel .parallelProcessing() .to(Constants.BANK1_URI, Constants.BANK2_URI, Constants.BANK3_URI); We use the CreditScoreProcessor to send two request to credit agency to get the credit history length and the credit score and prepare the request message for the bank. class CreditScoreProcessor implements Processor { private String creditAgencyAddress; private CreditAgencyWS proxy; public CreditScoreProcessor(String address) { creditAgencyAddress = address; proxy = getProxy(); } private CreditAgencyWS getProxy() { // Here we use JaxWs front end to create the proxy JaxWsProxyFactoryBean proxyFactory = new JaxWsProxyFactoryBean(); ClientFactoryBean clientBean = proxyFactory.getClientFactoryBean(); clientBean.setAddress(creditAgencyAddress); clientBean.setServiceClass(CreditAgencyWS.class); clientBean.setBus(BusFactory.getDefaultBus()); return (CreditAgencyWS)proxyFactory.create(); } @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { Message requestMessage = exchange.getIn(); List<Object> request = (List<Object>) requestMessage.getBody(); String ssn = (String)request.get(0); Double amount = (Double) request.get(1); Integer loanDuriation = (Integer)request.get(2); int historyLength = proxy.getCreditHistoryLength(ssn); int score = proxy.getCreditScore(ssn); // create the invocation message for Bank client List<Object> bankRequest = new ArrayList<Object>(); bankRequest.add(ssn); bankRequest.add(amount); bankRequest.add(loanDuriation); bankRequest.add(historyLength); bankRequest.add(score); exchange.getOut().setBody(bankRequest); exchange.getOut().setHeader(CxfConstants.OPERATION_NAME, "getQuote"); } } Now we implement the Bank and CreditAgency SEI with the business logical codes. Bank public class Bank implements BankWS { private String bankName; private double primeRate; public Bank(String name) { bankName = name; primeRate = 3.5; } public String getBankName() { return bankName; } public BankQuote getQuote(String ssn, double loanAmount, int loanDuration, int creditHistory, int creditScore) { Double rate = primeRate + (double)(loanDuration / 12) / 10 + (double)(Math.random() * 10) / 10; // Wait for a while try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing here } BankQuote result = new BankQuote(bankName, ssn, rate); return result; } } CreditAgency public class CreditAgency implements CreditAgencyWS { public int getCreditHistoryLength(String ssn) { int creditScore = (int)(Math.random() * 600 + 300); return creditScore; } public int getCreditScore(String ssn) { int creditHistoryLength = (int)(Math.random() * 19 + 1); return creditHistoryLength; } } The below codes show how the start the loan broker. public static void main(String... args) throws Exception { CamelContext context = new DefaultCamelContext(); CreditAgencyServer creditAgencyServer = new CreditAgencyServer(); // Start the credit server creditAgencyServer.start(); // Start the bank server BankServer bankServer = new BankServer(); bankServer.start(); // Start the camel context context.addRoutes(new LoanBroker()); context.start(); // Start the loan broker Thread.sleep(5 * 60 * 1000); context.stop(); Thread.sleep(1000); bankServer.stop(); creditAgencyServer.stop(); } We can send the request by creating a client proxy with the LoanBroker SEI in the client code. BTW, you can compare the two different routing rule's performance by running the client. public class Client { public LoanBrokerWS getProxy(String address) { // Now we use the simple front API to create the client proxy ClientProxyFactoryBean proxyFactory = new ClientProxyFactoryBean(); ClientFactoryBean clientBean = proxyFactory.getClientFactoryBean(); clientBean.setAddress(address); clientBean.setServiceClass(LoanBrokerWS.class); clientBean.setBus(BusFactory.getDefaultBus()); return (LoanBrokerWS) proxyFactory.create(); } public static void main(String[] args) { Client client = new Client(); String result = null; LoanBrokerWS loanBroker = client.getProxy(Constants.LOANBROKER_ADDRESS); long startTime = System.currentTimeMillis(); result = loanBroker.getLoanQuote("Sequential SSN", 1000.54, 10); long endTime = System.currentTimeMillis(); System.out.println("It takes " + (endTime - startTime) + " milliseconds to call the sequential loan broker service"); System.out.println(result); LoanBrokerWS paralleLoanBroker = client.getProxy(Constants.PARALLEL_LOANBROKER_ADDRESS); startTime = System.currentTimeMillis(); result = paralleLoanBroker.getLoanQuote("Parallel SSN", 1000.54, 10); endTime = System.currentTimeMillis(); System.out.println("It takes " + (endTime - startTime) + " milliseconds to call the parallel loan broker service"); System.out.println(result); } } |