odeservice.java

来自「bpel执行引擎用来执行bpel业务流程」· Java 代码 · 共 389 行 · 第 1/2 页

JAVA
389
字号
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.jbi;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.iapi.Endpoint;import org.apache.ode.bpel.iapi.Message;import org.apache.ode.bpel.iapi.MessageExchange.Status;import org.apache.ode.bpel.iapi.MyRoleMessageExchange;import org.apache.ode.jbi.msgmap.Mapper;import org.apache.ode.jbi.msgmap.MessageTranslationException;import org.w3c.dom.Element;import javax.jbi.JBIException;import javax.jbi.messaging.ExchangeStatus;import javax.jbi.messaging.Fault;import javax.jbi.messaging.InOnly;import javax.jbi.messaging.InOut;import javax.jbi.messaging.MessagingException;import javax.jbi.messaging.NormalizedMessage;import javax.jbi.servicedesc.ServiceEndpoint;import javax.xml.namespace.QName;import java.util.HashMap;import java.util.Map;/** * Bridge JBI (consumer) to ODE (provider). */public class OdeService extends ServiceBridge implements JbiMessageExchangeProcessor {    private static final Log __log = LogFactory.getLog(OdeService.class);    /** utility for tracking outstanding JBI message exchanges. */    private final JbiMexTracker _jbiMexTracker = new JbiMexTracker();    /** JBI-Generated Endpoint */    private ServiceEndpoint _internal;    /** External endpoint. */    private ServiceEndpoint _external;    private OdeContext _ode;    private Element _serviceref;    private Endpoint _endpoint;    public OdeService(OdeContext odeContext, Endpoint endpoint) throws Exception {        _ode = odeContext;        _endpoint = endpoint;    }    /**     * Do the JBI endpoint activation.     *      * @throws JBIException     */    public void activate() throws JBIException {        if (_serviceref == null) {            ServiceEndpoint[] candidates = _ode.getContext().getExternalEndpointsForService(_endpoint.serviceName);            if (candidates.length != 0) {                _external = candidates[0];            }        }        _internal = _ode.getContext().activateEndpoint(_endpoint.serviceName, _endpoint.portName);        if (__log.isDebugEnabled()) {            __log.debug("Activated endpoint " + _endpoint);        }        // TODO: Is there a race situation here?    }    /**     * Deactivate endpoints in JBI.     */    public void deactivate() throws JBIException {        _ode.getContext().deactivateEndpoint(_internal);        __log.debug("Dectivated endpoint " + _endpoint);    }    public ServiceEndpoint getInternalServiceEndpoint() {        return _internal;    }    public ServiceEndpoint getExternalServiceEndpoint() {        return _external;    }    public void onJbiMessageExchange(javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {        if (jbiMex.getRole() != javax.jbi.messaging.MessageExchange.Role.PROVIDER) {            String errmsg = "Message exchange is not in PROVIDER role as expected: " + jbiMex.getExchangeId();            __log.fatal(errmsg);            throw new IllegalArgumentException(errmsg);        }        if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) {            // We can forget about the exchange.            __log.debug("Consuming MEX tracker " + jbiMex.getExchangeId());            _jbiMexTracker.consume(jbiMex.getExchangeId());            return;        }        if (jbiMex.getOperation() == null) {            throw new IllegalArgumentException("Null operation in JBI message exchange id=" + jbiMex.getExchangeId()                                                + " endpoint=" + _endpoint);        }        if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) {            boolean success = false;            Exception err = null;            try {                invokeOde(jbiMex, ((InOnly) jbiMex).getInMessage());                success = true;            } catch (Exception ex) {                __log.error("Error invoking ODE.", ex);                err = ex;            } finally {                if (!success) {                    jbiMex.setStatus(ExchangeStatus.ERROR);                    if (err != null && jbiMex.getError() == null)                        jbiMex.setError(err);                } else {                    if (jbiMex.getStatus() == ExchangeStatus.ACTIVE)                        jbiMex.setStatus(ExchangeStatus.DONE);                }                _ode.getChannel().send(jbiMex);            }        } else if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_OUT)) {            boolean success = false;            Exception err = null;            try {                invokeOde(jbiMex, ((InOut) jbiMex).getInMessage());                success = true;            } catch (Exception ex) {                __log.error("Error invoking ODE.", ex);                err = ex;            } catch (Throwable t) {                __log.error("Unexpected error invoking ODE.", t);                err = new RuntimeException(t);            } finally {                // If we got an error that wasn't sent.                  if (jbiMex.getStatus() == ExchangeStatus.ACTIVE && !success) {                    if (err != null && jbiMex.getError() == null)  {                        jbiMex.setError(err);                    }                    jbiMex.setStatus(ExchangeStatus.ERROR);                         _ode.getChannel().send(jbiMex);                         }                   }        } else {            __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern "                    + jbiMex.getPattern());            jbiMex.setStatus(ExchangeStatus.ERROR);            jbiMex.setError(new Exception("Unknown message exchange pattern: " + jbiMex.getPattern()));        }    }    /**     * Called from     * {@link MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)}     *      * @param mex     *            message exchange     */    public void onResponse(MyRoleMessageExchange mex) {        __log.debug("Consuming MEX tracker " + mex.getClientId());        javax.jbi.messaging.MessageExchange jbiMex = _jbiMexTracker.consume(mex.getClientId());        if (jbiMex == null) {            __log.warn("Ignoring unknown async reply: " + mex);            return;        }        switch (mex.getStatus()) {        case FAULT:            outResponseFault(mex, jbiMex);            break;        case RESPONSE:            outResponse(mex, jbiMex);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?