odeconsumer.java

来自「bpel执行引擎用来执行bpel业务流程」· Java 代码 · 共 264 行

JAVA
264
字号
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.jbi;import org.apache.ode.bpel.iapi.ContextException;import org.apache.ode.bpel.iapi.Message;import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;import org.apache.ode.bpel.iapi.Scheduler;import org.apache.ode.bpel.iapi.MessageExchange.FailureType;import org.apache.ode.jbi.msgmap.Mapper;import org.apache.ode.jbi.msgmap.MessageTranslationException;import java.util.Collection;import java.util.Map;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentHashMap;import javax.jbi.messaging.*;import javax.jbi.servicedesc.ServiceEndpoint;import javax.xml.namespace.QName;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * Bridge between ODE (consumers) and JBI (providers). An single object of this type handles all communications initiated by ODE * that is destined for other JBI providers.  */abstract class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {    private static final Log __log = LogFactory.getLog(OdeConsumer.class);    private static final long DEFAULT_RESPONSE_TIMEOUT = Long.getLong("org.apache.ode.jbi.timeout", 2 * 60 * 1000L);    protected OdeContext _ode;    protected long _responseTimeout = DEFAULT_RESPONSE_TIMEOUT;    protected Map<String, PartnerRoleMessageExchange> _outstandingExchanges = new ConcurrentHashMap<String, PartnerRoleMessageExchange>();    OdeConsumer(OdeContext ode) {        _ode = ode;    }    /**     * This is where we handle invocation where the ODE BPEL engine is the <em>client</em> and some other JBI service is the     * <em>provider</em>.     */    public void invokePartner(final PartnerRoleMessageExchange odeMex) throws ContextException {        // Cast the EndpointReference to a JbiEndpointReference. This is the        // only type it can be (since we control the creation of these things).        JbiEndpointReference targetEndpoint = (JbiEndpointReference) odeMex.getEndpointReference();        if (targetEndpoint == null) {            String errmsg = "No endpoint for mex: " + odeMex;            __log.error(errmsg);            odeMex.replyWithFailure(FailureType.INVALID_ENDPOINT, errmsg, null);            return;        }        ServiceEndpoint se = targetEndpoint.getServiceEndpoint();        boolean isTwoWay = odeMex.getMessageExchangePattern() == org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;        QName opname = new QName(se.getServiceName().getNamespaceURI(), odeMex.getOperation().getName());        MessageExchangeFactory mexf = _ode.getChannel().createExchangeFactory(se);        final MessageExchange jbiMex;        try {            jbiMex = mexf.createExchange(isTwoWay ? MessageExchangePattern.IN_OUT : MessageExchangePattern.IN_ONLY);            jbiMex.setEndpoint(se);            jbiMex.setService(se.getServiceName());            jbiMex.setOperation(opname);        } catch (MessagingException e) {            String errmsg = "Unable to create JBI message exchange for ODE message exchange " + odeMex;            __log.error(errmsg, e);            odeMex.replyWithFailure(FailureType.COMMUNICATION_ERROR, errmsg, null);            return;        }        Mapper mapper = _ode.getDefaultMapper();        odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName());        try {            if (!isTwoWay) {                final InOnly inonly = ((InOnly) jbiMex);                NormalizedMessage nmsg = inonly.createMessage();                mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);                inonly.setInMessage(nmsg);                _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {                    public void afterCompletion(boolean success) {                        if (success) {                            doSendOneWay(odeMex, inonly);                        }                    }                    public void beforeCompletion() {                    }                });                odeMex.replyOneWayOk();            } else {                final InOut inout = (InOut) jbiMex;                NormalizedMessage nmsg = inout.createMessage();                mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);                inout.setInMessage(nmsg);                _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {                    public void afterCompletion(boolean success) {                        if (success) {                            doSendTwoWay(odeMex, inout);                        }                    }                    public void beforeCompletion() {                    }                });                odeMex.replyAsync();            }        } catch (MessagingException me) {            String errmsg = "JBI messaging error for ODE MEX " + odeMex;            __log.error(errmsg, me);            odeMex.replyWithFailure(FailureType.COMMUNICATION_ERROR, errmsg, null);        } catch (MessageTranslationException e) {            String errmsg = "Error converting ODE message to JBI format for mex " + odeMex;            __log.error(errmsg, e);            odeMex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);        }    }    protected abstract void doSendOneWay(PartnerRoleMessageExchange odeMex, InOnly inonly);    protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, InOut inout);    protected abstract void inOutDone(InOut inout);    public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException {        if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&            !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {            __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + jbiMex.getPattern());            return;        }        if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {            if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {                inOutDone((InOut) jbiMex);                outResponse((InOut) jbiMex);            }            jbiMex.setStatus(ExchangeStatus.DONE);            _ode.getChannel().send(jbiMex);        } else if (jbiMex.getStatus() == ExchangeStatus.ERROR) {            inOutDone((InOut) jbiMex);            outFailure((InOut) jbiMex);        } else if (jbiMex.getStatus() == ExchangeStatus.DONE) {            _outstandingExchanges.remove(jbiMex.getExchangeId());        } else {            __log.error("Unexpected status " + jbiMex.getStatus() + " for JBI message exchange: " + jbiMex.getExchangeId());        }    }    private void outFailure(final InOut jbiMex) {        final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(jbiMex.getExchangeId());        if (pmex == null) {            __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());            return;        }        try {            _ode._scheduler.execTransaction(new Callable<Boolean>() {                public Boolean call() throws Exception {                    pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null);                    return null;                }            });        } catch (Exception ex) {            __log.error("error delivering failure: ", ex);        }    }    private void outResponse(final InOut jbiMex) {        final PartnerRoleMessageExchange outstanding = _outstandingExchanges.remove(jbiMex.getExchangeId());        if (outstanding == null) {            __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());            return;        }        try {            _ode._scheduler.execTransaction(new Callable<Boolean>() {                @SuppressWarnings("unchecked")                public Boolean call() throws Exception {                    // need to reload mex since we're in a different transaction                    PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(outstanding.getMessageExchangeId());                    if (pmex == null) {                        __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId());                        return Boolean.FALSE;                    }                    String mapperName = pmex.getProperty(Mapper.class.getName());                    Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);                    if (mapper == null) {                        String errmsg = "Mapper not found.";                        __log.error(errmsg);                        pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);                    } else {                        try {                            Fault jbiFlt = jbiMex.getFault();                            if (jbiFlt != null) {                                javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex.getOperation().getFaults().values());                                if (wsdlFlt == null) {                                    pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Unrecognized fault message.", null);                                } else {                                    if (wsdlFlt.getMessage() != null) {                                        Message faultResponse = pmex.createMessage(wsdlFlt.getMessage().getQName());                                        mapper.toODE(faultResponse, jbiFlt, wsdlFlt.getMessage());                                        pmex.replyWithFault(new QName(pmex.getPortType().getQName().getNamespaceURI(), wsdlFlt                                                .getName()), faultResponse);                                    } else {                                        // Can this even happen?                                        __log.fatal("Internal Error: fault found without a message type: " + wsdlFlt);                                        pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Fault has no message: "                                                + wsdlFlt.getName(), null);                                    }                                }                            } else {                                Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());                                mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());                                pmex.reply(response);                            }                        } catch (MessageTranslationException mte) {                            __log.error("Error translating message.", mte);                            pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);                        }                    }                    return null;                }            });        } catch (Exception ex) {            __log.error("error delivering RESPONSE: ", ex);        }    }    public void setResponseTimeout(long timeout) {    	_responseTimeout = timeout;    }    public long getResponseTimeout() {    	return _responseTimeout;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?