Requirements
We should support to deploy cxf client(as service consumer) and cxf server (as service provider) easilly into servicemix.
We need a cxf service engine where we can deploy cxf client and server into. As a service engine (SE) provides some type of logic inside the JBI environment and only communicates with the NMR. If a Cxf SE needs to communicate outside the JBI environment (e.g. the client inside servicemix but server outside servicemix or vice verse), it must send a message to a binding component (via the NMR), so we also need a cxf binding component to bridge the incoming/outgoing message. Also we should leverage ws-* feature of CXF by cxf binding component.
key components
cxf service engine
overview
ServiceMix Cxf Se component is a JBI Service Engine exposing (annotated) POJO as services on the JBI Bus.
It reuses JBI binding and JBI transport of apache cxf internally to perform service invocations and xml marshaling.
Here for how-to-use cxf se.
features for cxf service engine
- jsr181 annotations
- jaxb2 binding
- wsdl auto generation
- java proxy support
- MTOM / attachments support
main class for cxf service engine
CxfSeComponent
We use maven-xbean-plugin to generate component xbean schema, so we need add comment to specify the xml element name firstly
/** * * @org.apache.xbean.XBean element="component" */
This class just extends servicemix DefaultComponent
Which is JBI spec compitable component. By extending servicemix DefaultComponent, we can only focus cxf specific issue for our cxf service engine since all JBI spec details is handled by servicemix DefaultComponent. Here we just init cxf bus when CxfSeComponent init.
@Override protected void doInit() throws Exception { bus = BusFactory.getDefaultBus(); super.doInit(); }
CxfSeEndpoint
We use maven-xbean-plugin to generate endpoint xbean schema, so we need add comment to specify the xml element name firstly
/** * * @org.apache.xbean.XBean element="endpoint" */
In this class we init and publish jaxws endpoint with cxf JBI transport and JBI binding, and also register this endpoint to servicemix.
/* * (non-Javadoc) * * @see org.apache.servicemix.common.Endpoint#validate() */ @Override public void validate() throws DeploymentException { if (getPojo() == null) { throw new DeploymentException("pojo must be set"); } JaxWsServiceFactoryBean serviceFactory = new JaxWsServiceFactoryBean(); serviceFactory.setPopulateFromClass(true); endpoint = new EndpointImpl(getBus(), getPojo(), new JaxWsServerFactoryBean(serviceFactory)); endpoint.setBindingUri(org.apache.cxf.binding.jbi.JBIConstants.NS_JBI_BINDING);//using cxf jbi binding, so the message use JBI wrapper endpoint.setInInterceptors(getInInterceptors()); endpoint.setInFaultInterceptors(getInFaultInterceptors()); endpoint.setOutInterceptors(getOutInterceptors()); endpoint.setOutFaultInterceptors(getOutFaultInterceptors()); endpoint.getOutInterceptors().add(new WrapperOutInterceptor()); if (isMtomEnabled()) { endpoint.getInInterceptors().add(new AttachmentInInterceptor()); } JaxWsImplementorInfo implInfo = new JaxWsImplementorInfo(getPojo() .getClass()); setService(implInfo.getServiceName()); // register servicename to NMR setInterfaceName(implInfo.getInterfaceName()); // register interfacename(portType) to NMR setEndpoint(implInfo.getEndpointName().getLocalPart()); // register Endpoint(Port) name to NMR super.validate(); }
We publish the endpoint when endpoint start in servicemix
/* * (non-Javadoc) * * @see org.apache.servicemix.common.endpoints.ProviderEndpoint#start() */ @Override public void start() throws Exception { super.start(); address = "jbi://" + ID_GENERATOR.generateSanitizedId();//use jbi prefix for the endpoint address to pick up cxf JBI transport endpoint.publish(address); setService(endpoint.getServer().getEndpoint().getService().getName()); setEndpoint(endpoint.getServer().getEndpoint().getEndpointInfo() .getName().getLocalPart()); }
We just get MessageExchange from Servicemix NMR and delegate it to cxf JBI transport
/* * (non-Javadoc) * * @see org.apache.servicemix.common.endpoints.ProviderEndpoint#process(javax.jbi.messaging.MessageExchange) */ @Override public void process(MessageExchange exchange) throws Exception { JBITransportFactory jbiTransportFactory = (JBITransportFactory) getBus() .getExtension(ConduitInitiatorManager.class) .getConduitInitiator(CxfSeComponent.JBI_TRANSPORT_ID);//load JBITransportFactory from cxf bus QName serviceName = exchange.getService(); if (serviceName == null) { serviceName = getService(); exchange.setService(serviceName); } QName interfaceName = exchange.getInterfaceName(); if (interfaceName == null) { interfaceName = getInterfaceName(); exchange.setInterfaceName(interfaceName); } JBIDestination jbiDestination = jbiTransportFactory .getDestination(serviceName.toString() + interfaceName.toString());// each endpoint should have its own JBIDestination, so just query it by serviceName and interfaceName DeliveryChannel dc = getContext().getDeliveryChannel(); jbiTransportFactory.setDeliveryChannel(dc); jbiDestination.setDeliveryChannel(dc); if (exchange.getStatus() == ExchangeStatus.ACTIVE) { jbiDestination.getJBIDispatcherUtil().dispatch(exchange);//delegate MessageExchange to cxf JBI transport, cxf in charge of message unmarshalling and service invocation } }
CxfSeProxyFactoryBean
We use maven-xbean-plugin to generate proxy xbean schema, so we need add comment to specify the xml element name firstly
/** * * @org.apache.xbean.XBean element="proxy" description="A CXF proxy" * */
We create client proxy exactly the same as we do in cxf
private Object createProxy() throws Exception { JaxWsProxyFactoryBean cf = new JaxWsProxyFactoryBean(); cf.setServiceName(getService()); cf.setServiceClass(type); cf.setAddress("jbi://" + new IdGenerator().generateSanitizedId());//use jbi prefix for the endpoint address to pick up cxf JBI transport cf.setBindingId(org.apache.cxf.binding.jbi.JBIConstants.NS_JBI_BINDING);//use cxf JBI binding Bus bus = BusFactory.getDefaultBus(); JBITransportFactory jbiTransportFactory = (JBITransportFactory) bus .getExtension(ConduitInitiatorManager.class) .getConduitInitiator(CxfSeComponent.JBI_TRANSPORT_ID); if (getInternalContext() != null) { DeliveryChannel dc = getInternalContext().getDeliveryChannel(); if (dc != null) { jbiTransportFactory.setDeliveryChannel(dc); } } return cf.create(); }
configureation
interceptors configuration
Since cxfse is using apache cxf internally, so customer can configure cxf se endpoint with inteceptors which follow cxf inteceptor api.
example per as below
<cxfse:endpoint> <cxfse:pojo> <bean class="org.apache.cxf.calculator.CalculatorImpl"> </bean> </cxfse:pojo> <cxfse:inInterceptors> <bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/> </cxfse:inInterceptors> <cxfse:outInterceptors> <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/> </cxfse:outInterceptors> <cxfse:inFaultInterceptors> <bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/> </cxfse:inFaultInterceptors> <cxfse:outFaultInterceptors> <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/> </cxfse:outFaultInterceptors> </cxfse:endpoint>
proxy configuration
Customer can use proxy from one of the client bean, or from inside another component, and call the JBI endpoint as a plain Java object.
<bean class="org.apache.servicemix.cxfse.GreeterImplForClientProxy"> <property name="calculator"> <cxfse:proxy service="calculator:CalculatorService" type="org.apache.cxf.calculator.CalculatorPortType" /> </property> </bean>
cxf binding component
overview
ServiceMix ships with a JBI compliant HTTP/SOAP or JMS/SOAP binding component named servicemix-cxf-bc which use apache cxf internally.
Here for how-to-use cxf bc.
features for cxf binding component
- JBI compliant Binding Component
- SOAP 1.1 and 1.2 support
- MIME attachments
- Support for InOut or InOnly MEPs as consumers or providers
- SSL support
- WS-Security support
- WS-Policy support
- WS-RM support
- WS-Addressing support
main class for cxf binding component
CxfBcComponent
We use maven-xbean-plugin to generate component xbean schema, so we need add comment to specify the xml element name firstly
/** * * @org.apache.xbean.XBean element="component" */
This class just extends servicemix DefaultComponent
Which is JBI spec compitable component. By extends servicemix DefaultComponent, we can only focus cxf specific issue for our cxf binding component since all JBI spec details is handled by servicemix DefaultComponent. Here we just init cxf bus when CxfBcComponent init.
@Override protected void doInit() throws Exception { if (getBusConfig() != null) {//for the cxf bc, we need pass in cxf bus configuration file sometimes, e.g. we need configure bus to support ws-* SpringBusFactory bf = new SpringBusFactory(); bus = bf.createBus(getBusConfig()); } else { bus = BusFactory.newInstance().createBus(); } super.doInit(); }
CxfBcConsumer
We use maven-xbean-plugin to generate consumer xbean schema, so we need add comment to specify the xml element name firstly
/** * * @org.apache.xbean.XBean element="consumer" */
Consumer of binding component means expose JBI internal endpoint to outside client, for example
standalone cxf client ===> cxf bc consumer===> service in cxf se
CxfBcConsumer play two role in the message process
- cxf server
- servicemix consumer
As cxf server, just init a dummy server without real service class. This cxf server init the proper transport(http or jms) from the service model which is built from the wsdl. Since there is no service class for this server, this server will do no real service invocation, just transform message format between soap binding and jbi binding.
For a request message, need transform soap message to jbi message, two main interceptors get involved
JbiInInterceptor
JbiInInterceptor mainly create JBI NormalizedMessage and copy headers and attachments from soap message to JBI message
JbiInWsdl1Interceptor
JbiInWsdl1Interceptor transform soap messasge to jbi message according to the service model
For a reponse message, need transform jbi message to soap message, also two main interceptors get involved
JbiOutInterceptor
JbiOutInterceptor mainly copy attachments and headers from jbi message to soap message
JbiOutWsdl1Interceptor
JbiOutWsdl1Interceptor transform jbi mesage to soap message based on the service model
As servicemix consumer, send jbi message which transformed from the soap message to NMR and need process the response jbi message from the NMR.
CxfBcConsumer use its own JbiInvokerInterceptor to wire these two role together.
Main part of JbiInvokerInterceptorAlso the code for processing response jbi message looks likeublic class JbiInvokerInterceptor extends AbstractPhaseInterceptor<Message> { public JbiInvokerInterceptor() { super(Phase.INVOKE); } public void handleMessage(final Message message) throws Fault { final Exchange cxfExchange = message.getExchange(); final Endpoint endpoint = cxfExchange.get(Endpoint.class); final Service service = endpoint.getService(); final Invoker invoker = service.getInvoker(); MessageExchange exchange = message .getContent(MessageExchange.class); ComponentContext context = message.getExchange().get( ComponentContext.class); CxfBcConsumer.this.configureExchangeTarget(exchange); CxfBcConsumer.this.messages.put(exchange.getExchangeId(), message); CxfBcConsumer.this.isOneway = message.getExchange().get( BindingOperationInfo.class).getOperationInfo().isOneWay(); message.getExchange().setOneWay(CxfBcConsumer.this.isOneway); try { if (CxfBcConsumer.this.synchronous && !CxfBcConsumer.this.isOneway) { message.getInterceptorChain().pause();//pause the incoming intercepor chain of cxf server, the mesage transformation from soap to jbi is done now context.getDeliveryChannel().sendSync(exchange, timeout*1000);//send exchange which contain jbi message to NMR process(exchange); } else { context.getDeliveryChannel().send(exchange); } } catch (Exception e) { throw new Fault(e); } } }
public void process(MessageExchange exchange) throws Exception { Message message = messages.remove(exchange.getExchangeId()); message.getInterceptorChain().resume();//resume the cxf outgoing interceptor chain, will do the message transformation from jbi to soap if (exchange.getStatus() == ExchangeStatus.ACTIVE) { exchange.setStatus(ExchangeStatus.DONE); message.getExchange().get(ComponentContext.class) .getDeliveryChannel().send(exchange); } }
CxfBcProvider
We use maven-xbean-plugin to generate provider xbean schema, so we need add comment to specify the xml element name first
/** * * @org.apache.xbean.XBean element="provider" */
Provider of binding component means bridge internal request to outside server, for example
cxf proxy inside servicemix ===> cxf bc provider===> standalone cxf server outside servicemix
Similiar as CxfBcConsumer, CxfBcProvider also play two role in the message process
- servicemix provider
- cxf client
The message process looks like
provider get incoming jbi message from NMR==> transform it to soap message==> send out to external service==> in the registered MessageObserver, get response soap message from external service==> transform it to jbi message==> send back to the NMR.
This process splitted into two classes,
The first half in CxfBcProviderpublic void process(MessageExchange exchange) throws Exception { NormalizedMessage nm = exchange.getMessage("in");//get incoming jbi message CxfBcProviderMessageObserver obs = new CxfBcProviderMessageObserver(exchange, this); conduit.setMessageObserver(obs);// register message observer for cxf client Message message = ep.getBinding().createMessage();//create outgoing cxf message based on the service model, both soap11 and soap12 is supported message.put(MessageExchange.class, exchange); Exchange cxfExchange = new ExchangeImpl(); message.setExchange(cxfExchange); cxfExchange.setOutMessage(message); ... ... message.setInterceptorChain(outChain); InputStream is = JBIMessageHelper.convertMessageToInputStream(nm.getContent()); StreamSource source = new StreamSource(is); message.setContent(Source.class, source); message.setContent(InputStream.class, is); conduit.prepare(message); OutputStream os = message.getContent(OutputStream.class); XMLStreamWriter writer = message.getContent(XMLStreamWriter.class); String encoding = getEncoding(message); try { writer = StaxOutInterceptor.getXMLOutputFactory(message).createXMLStreamWriter(os, encoding); } catch (XMLStreamException e) { // } message.setContent(XMLStreamWriter.class, writer); message.put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true); outChain.doIntercept(message);//transform jbi message to soap message XMLStreamWriter xtw = message.getContent(XMLStreamWriter.class); if (xtw != null) { xtw.writeEndDocument(); xtw.close(); } os.flush(); is.close(); os.close();//send out message to external service }
The second half in CxfBcProviderMessageObserver
public void onMessage(Message message) { try { ... ... inChain.add(providerEndpoint.getInInterceptors()); inChain.add(providerEndpoint.getInFaultInterceptors()); soapMessage.setInterceptorChain(inChain); inChain.doIntercept(soapMessage);//transform soap message to jbi message //send back jbi message to nmr if (boi.getOperationInfo().isOneWay()) { messageExchange.setStatus(ExchangeStatus.DONE); } else if (soapMessage.get("jbiFault") != null && soapMessage.get("jbiFault").equals(true)) { ... ... } else { messageExchange.setStatus(ExchangeStatus.DONE); } boolean txSync = messageExchange.getStatus() == ExchangeStatus.ACTIVE && messageExchange.isTransacted() && Boolean.TRUE.equals(messageExchange .getProperty(JbiConstants.SEND_SYNC)); if (txSync) { providerEndpoint.getContext().getDeliveryChannel().sendSync( messageExchange); } else { providerEndpoint.getContext().getDeliveryChannel().send( messageExchange); } } catch (Exception e) { e.printStackTrace(); } ... ... }
configuration
Since cxfbc is using apache cxf internally, so customer can configure cxf bc provider or consumer with inteceptors which follow cxf inteceptor api. A full example
typical use case
We have test cases almost cover all features of cxf bc and se, such as
- proxy
- ws-policy
- ws-security
- ws-rm
- ws-addressing
- MTOM
- multiple transport support
- oneway
- Exception
Go through these tests for servicemix-cxf-se and servicemix-cxf-bcsome tips
what JBI binding message looks like
<jbi:message xmlns:jbi="http://java.sun.com/xml/ns/jbi/wsdl-11-wrapper" xmlns:msg="http://apache.org/cxf/calculator" name="add" type="msg:add" version="1.0"> <jbi:part> <add xmlns="http://apache.org/cxf/calculator/types"> <arg0>1</arg0> <arg1>2</arg1> </add> </jbi:part> </jbi:message>
what SOAP binding message looks like
<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope"> <soap:Body> <add xmlns="http://apache.org/cxf/calculator/types"> <arg0>1</arg0> <arg1>2</arg1> </add> </soap:Body> </soap:Envelope>
can cxf component work with other component inside servicemix
Yes, provided those components can hanle the normalized JBI message.