This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • Using Service Stream Handler

Access to add and change pages is restricted. See: https://cwiki.apache.org/confluence/display/OFBIZ/Wiki+access

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Description

The Service Stream handler allows services to use raw streams when receiving requests from clients. The InputStream and OutputStream are the only required parameters.

Developing the service

When developing a service which uses the stream handler; the best way to implement the service is by using a static java method and the java service engine.

The service definition

The service should always implement serviceStreamInterface this provides the two parameters inputStream and outputStream. As always, you can override these parameters if necessary.

services.xml
<services>
    <!-- Routing service to handle all incoming messages and send them to the appropriate processing service -->
    <service name="oagisMessageHandler" engine="java" transaction-timeout="300"
        location="org.ofbiz.oagis.OagisServices" invoke="oagisMessageHandler" auth="false">
        <implements service="serviceStreamInterface"/>
        <implements service="oagisMessageIdOutInterface"/>
        <attribute name="isErrorRetry" type="Boolean" mode="IN" optional="true"/>
        <override name="outputStream" optional="true"/>
    </service>
</services>

The service implementation

OagisServices.java
    public static Map oagisMessageHandler(DispatchContext ctx, Map context) {
        GenericDelegator delegator = ctx.getDelegator();
        LocalDispatcher dispatcher = ctx.getDispatcher();
        InputStream in = (InputStream) context.get("inputStream");
        List errorList = FastList.newInstance();
        Boolean isErrorRetry = (Boolean) context.get("isErrorRetry");

        GenericValue userLogin = null; 
        try {
            userLogin = delegator.findByPrimaryKey("UserLogin", UtilMisc.toMap("userLoginId", "system"));    
        } catch (GenericEntityException e) {
            String errMsg = "Error Getting UserLogin with userLoginId system: "+e.toString();
            Debug.logError(e, errMsg, module);
        }
        
        Document doc = null;
        String xmlText = null;
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8"));
            StringBuffer xmlTextBuf = new StringBuffer();
            String currentLine = null;
            while ((currentLine = br.readLine()) != null) {
                xmlTextBuf.append(currentLine);
                xmlTextBuf.append('\n');
            }
            xmlText = xmlTextBuf.toString();
            
            // DEJ20070804 adding this temporarily for debugging, should be changed to verbose at some point in the future
            Debug.logWarning("Received OAGIS XML message, here is the text: \n" + xmlText, module);

            ByteArrayInputStream bis = new ByteArrayInputStream(xmlText.getBytes("UTF-8"));
            doc = UtilXml.readXmlDocument(bis, true, "OagisMessage");
        } catch (SAXException e) {
            String errMsg = "XML Error parsing the Received Message [" + e.toString() + 
                "]; The text received we could not parse is: [" + xmlText + "]";
            errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "SAXException"));
            Debug.logError(e, errMsg, module);
        } catch (ParserConfigurationException e) {
            String errMsg = "Parser Configuration Error parsing the Received Message: " + e.toString();
            errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "ParserConfigurationException"));
            Debug.logError(e, errMsg, module);
        } catch (IOException e) {
            String errMsg = "IO Error parsing the Received Message: " + e.toString();
            errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "IOException"));
            Debug.logError(e, errMsg, module);
        }

        if (UtilValidate.isNotEmpty(errorList)) {
            return ServiceUtil.returnError("Unable to parse received message");
        }

        Element rootElement = doc.getDocumentElement();
        rootElement.normalize();
        Element controlAreaElement = UtilXml.firstChildElement(rootElement, "os:CNTROLAREA");
        Element bsrElement = UtilXml.firstChildElement(controlAreaElement, "os:BSR");
        String bsrVerb = UtilXml.childElementValue(bsrElement, "of:VERB");
        String bsrNoun = UtilXml.childElementValue(bsrElement, "of:NOUN");
        
        Element senderElement = UtilXml.firstChildElement(controlAreaElement, "os:SENDER");
        String logicalId = UtilXml.childElementValue(senderElement, "of:LOGICALID");
        String component = UtilXml.childElementValue(senderElement, "of:COMPONENT");
        String task = UtilXml.childElementValue(senderElement, "of:TASK");
        String referenceId = UtilXml.childElementValue(senderElement, "of:REFERENCEID");
        
        if (UtilValidate.isEmpty(bsrVerb) || UtilValidate.isEmpty(bsrNoun)) {
            return ServiceUtil.returnError("Was able to receive and parse the XML message, but BSR->NOUN [" + bsrNoun + 
                "] and/or BSR->VERB [" + bsrVerb + "] are empty");
        }
        
        GenericValue oagisMessageInfo = null;
        Map oagisMessageInfoKey = UtilMisc.toMap("logicalId", logicalId, "component", component, "task", task, "referenceId", referenceId);
        try {
            oagisMessageInfo = delegator.findByPrimaryKey("OagisMessageInfo", oagisMessageInfoKey);
        } catch (GenericEntityException e) {
            String errMsg = "Error Getting Entity OagisMessageInfo: " + e.toString();
            Debug.logError(e, errMsg, module);
        }
        
        Map messageProcessContext = UtilMisc.toMap("document", doc, "userLogin", userLogin);
        
        // call async, no additional results to return: Map subServiceResult = FastMap.newInstance();
        if (UtilValidate.isNotEmpty(oagisMessageInfo)) {
            if (Boolean.TRUE.equals(isErrorRetry) || "OAGMP_SYS_ERROR".equals(oagisMessageInfo.getString("processingStatusId"))) {
                // there was an error last time, tell the service this is a retry
                messageProcessContext.put("isErrorRetry", Boolean.TRUE);
            } else {
                String responseMsg = "Message already received with ID: " + oagisMessageInfoKey;
                Debug.logError(responseMsg, module);

                List errorMapList = UtilMisc.toList(UtilMisc.toMap("reasonCode", "MessageAlreadyReceived", "description", responseMsg));

                Map sendConfirmBodCtx = FastMap.newInstance();
                sendConfirmBodCtx.put("logicalId", logicalId);
                sendConfirmBodCtx.put("component", component);
                sendConfirmBodCtx.put("task", task);
                sendConfirmBodCtx.put("referenceId", referenceId);
                sendConfirmBodCtx.put("errorMapList", errorMapList);
                sendConfirmBodCtx.put("userLogin", userLogin);

                try {
                    // run async because this will send a message back to the other server and may take some time, and/or fail
                    dispatcher.runAsync("oagisSendConfirmBod", sendConfirmBodCtx, null, true, 60, true);
                } catch (GenericServiceException e) {
                    String errMsg = "Error sending Confirm BOD: " + e.toString();
                    Debug.logError(e, errMsg, module);
                }
                Map result = ServiceUtil.returnSuccess(responseMsg);
                result.put("contentType", "text/plain");
                return result;
            }
        }
        
        Debug.logInfo("Processing OAGIS message with verb [" + bsrVerb + "] and noun [" + bsrNoun + "] with context: " + messageProcessContext, module);
        
        if (bsrVerb.equalsIgnoreCase("CONFIRM") && bsrNoun.equalsIgnoreCase("BOD")) {
            try {
                // subServiceResult = dispatcher.runSync("oagisReceiveConfirmBod", messageProcessContext);
                dispatcher.runAsync("oagisReceiveConfirmBod", messageProcessContext, true);
            } catch (GenericServiceException e) {
                String errMsg = "Error running service oagisReceiveConfirmBod: " + e.toString();
                errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "GenericServiceException"));
                Debug.logError(e, errMsg, module);
            }
        } else if (bsrVerb.equalsIgnoreCase("SHOW") && bsrNoun.equalsIgnoreCase("SHIPMENT")) {
            try {
                //subServiceResult = dispatcher.runSync("oagisReceiveShowShipment", messageProcessContext);
                // DEJ20070808 changed to run asynchronously and persisted so that if it fails it will retry; for transaction deadlock and other reasons
                dispatcher.runAsync("oagisReceiveShowShipment", messageProcessContext, true);
            } catch (GenericServiceException e) {
                String errMsg = "Error running service oagisReceiveShowShipment: " + e.toString();
                errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "GenericServiceException"));
                Debug.logError(e, errMsg, module);
            }
        } else if (bsrVerb.equalsIgnoreCase("SYNC") && bsrNoun.equalsIgnoreCase("INVENTORY")) {
            try {
                //subServiceResult = dispatcher.runSync("oagisReceiveSyncInventory", messageProcessContext);
                // DEJ20070808 changed to run asynchronously and persisted so that if it fails it will retry; for transaction deadlock and other reasons
                dispatcher.runAsync("oagisReceiveSyncInventory", messageProcessContext, true);
            } catch (GenericServiceException e) {
                String errMsg = "Error running service oagisReceiveSyncInventory: " + e.toString();
                errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "GenericServiceException"));
                Debug.logError(e, errMsg, module);
            }
        } else if (bsrVerb.equalsIgnoreCase("ACKNOWLEDGE") && bsrNoun.equalsIgnoreCase("DELIVERY")) {
            Element dataAreaElement = UtilXml.firstChildElement(rootElement, "ns:DATAAREA");
            Element ackDeliveryElement = UtilXml.firstChildElement(dataAreaElement, "ns:ACKNOWLEDGE_DELIVERY");
            Element receiptlnElement = UtilXml.firstChildElement(ackDeliveryElement, "ns:RECEIPTLN");
            Element docRefElement = UtilXml.firstChildElement(receiptlnElement, "os:DOCUMNTREF");
            String docType = docRefElement != null ? UtilXml.childElementValue(docRefElement, "of:DOCTYPE") : null;
            String disposition = UtilXml.childElementValue(receiptlnElement, "of:DISPOSITN");
            
            if ("PO".equals(docType)) {
                try {
                    //subServiceResult = dispatcher.runSync("oagisReceiveAcknowledgeDeliveryPo", messageProcessContext);
                    dispatcher.runAsync("oagisReceiveAcknowledgeDeliveryPo", messageProcessContext, true);
                } catch (GenericServiceException e) {
                    String errMsg = "Error running service oagisReceiveAcknowledgeDeliveryPo: " + e.toString();
                    errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "GenericServiceException"));
                    Debug.logError(e, errMsg, module);
                }
            } else if ("RMA".equals(docType)) {
                try {
                    //subServiceResult = dispatcher.runSync("oagisReceiveAcknowledgeDeliveryRma", messageProcessContext);
                    dispatcher.runAsync("oagisReceiveAcknowledgeDeliveryRma", messageProcessContext, true);
                } catch (GenericServiceException e) {
                    String errMsg = "Error running service oagisReceiveAcknowledgeDeliveryRma: " + e.toString();
                    errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "GenericServiceException"));
                    Debug.logError(e, errMsg, module);
                }
            } else if (UtilValidate.isEmpty(docType) && ("NotAvailableTOAvailable".equals(disposition) || "AvailableTONotAvailable".equals(disposition))) {
                try {
                    dispatcher.runAsync("oagisReceiveAcknowledgeDeliveryStatus", messageProcessContext, true);
                } catch (GenericServiceException e) {
                    String errMsg = "Error running service oagisReceiveAcknowledgeDeliveryStatus: " + e.toString();
                    errorList.add(UtilMisc.toMap("description", errMsg, "reasonCode", "GenericServiceException"));
                    Debug.logError(e, errMsg, module);
                }
            } else {
                return ServiceUtil.returnError("For Acknowledge Delivery message could not determine if it is for a PO or RMA or Status Change. DOCTYPE from message is [" + docType + "], DISPOSITN is [" + disposition + "]");
            }
        } else {
            String errMsg = "Unknown Message Type Received, verb/noun combination not supported: verb=[" + bsrVerb + "], noun=[" + bsrNoun + "]";
            Debug.logError(errMsg, module);
            return ServiceUtil.returnError(errMsg);
        }
        
        Map result = ServiceUtil.returnSuccess();
        result.put("contentType", "text/plain");

        /* no sub-service error processing to be done here, all handled in the sub-services:
        result.putAll(subServiceResult);
        List errorMapList = (List) subServiceResult.get("errorMapList");
        if (UtilValidate.isNotEmpty(errorList)) {
            Iterator errListItr = errorList.iterator();
            while (errListItr.hasNext()) {
                Map errorMap = (Map) errListItr.next();
                errorMapList.add(UtilMisc.toMap("description", errorMap.get("description"), "reasonCode", errorMap.get("reasonCode")));
            }
            result.put("errorMapList", errorMapList);
        }
        */
        
        return result;
    }

Configuring the controller entry

  1. Be sure the ServiceStreamHandler is set as one of the event handlers.
  2. Configure your events (mount points) to use this handler.
controller.xml
   <handler name="stream" type="request" class="org.ofbiz.webapp.event.ServiceStreamHandler"/>
   ...
   <request-map uri="oagisMessageHandler">
        <security https="true" auth="false" cert="true"/>
        <event type="stream" invoke="oagisMessageHandler"/>
        <response name="success" type="none"/>
    </request-map>
  • No labels