odeservice.java
来自「bpel执行引擎用来执行bpel业务流程」· Java 代码 · 共 389 行 · 第 1/2 页
JAVA
389 行
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.jbi;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.iapi.Endpoint;import org.apache.ode.bpel.iapi.Message;import org.apache.ode.bpel.iapi.MessageExchange.Status;import org.apache.ode.bpel.iapi.MyRoleMessageExchange;import org.apache.ode.jbi.msgmap.Mapper;import org.apache.ode.jbi.msgmap.MessageTranslationException;import org.w3c.dom.Element;import javax.jbi.JBIException;import javax.jbi.messaging.ExchangeStatus;import javax.jbi.messaging.Fault;import javax.jbi.messaging.InOnly;import javax.jbi.messaging.InOut;import javax.jbi.messaging.MessagingException;import javax.jbi.messaging.NormalizedMessage;import javax.jbi.servicedesc.ServiceEndpoint;import javax.xml.namespace.QName;import java.util.HashMap;import java.util.Map;/** * Bridge JBI (consumer) to ODE (provider). */public class OdeService extends ServiceBridge implements JbiMessageExchangeProcessor { private static final Log __log = LogFactory.getLog(OdeService.class); /** utility for tracking outstanding JBI message exchanges. */ private final JbiMexTracker _jbiMexTracker = new JbiMexTracker(); /** JBI-Generated Endpoint */ private ServiceEndpoint _internal; /** External endpoint. */ private ServiceEndpoint _external; private OdeContext _ode; private Element _serviceref; private Endpoint _endpoint; public OdeService(OdeContext odeContext, Endpoint endpoint) throws Exception { _ode = odeContext; _endpoint = endpoint; } /** * Do the JBI endpoint activation. * * @throws JBIException */ public void activate() throws JBIException { if (_serviceref == null) { ServiceEndpoint[] candidates = _ode.getContext().getExternalEndpointsForService(_endpoint.serviceName); if (candidates.length != 0) { _external = candidates[0]; } } _internal = _ode.getContext().activateEndpoint(_endpoint.serviceName, _endpoint.portName); if (__log.isDebugEnabled()) { __log.debug("Activated endpoint " + _endpoint); } // TODO: Is there a race situation here? } /** * Deactivate endpoints in JBI. */ public void deactivate() throws JBIException { _ode.getContext().deactivateEndpoint(_internal); __log.debug("Dectivated endpoint " + _endpoint); } public ServiceEndpoint getInternalServiceEndpoint() { return _internal; } public ServiceEndpoint getExternalServiceEndpoint() { return _external; } public void onJbiMessageExchange(javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException { if (jbiMex.getRole() != javax.jbi.messaging.MessageExchange.Role.PROVIDER) { String errmsg = "Message exchange is not in PROVIDER role as expected: " + jbiMex.getExchangeId(); __log.fatal(errmsg); throw new IllegalArgumentException(errmsg); } if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) { // We can forget about the exchange. __log.debug("Consuming MEX tracker " + jbiMex.getExchangeId()); _jbiMexTracker.consume(jbiMex.getExchangeId()); return; } if (jbiMex.getOperation() == null) { throw new IllegalArgumentException("Null operation in JBI message exchange id=" + jbiMex.getExchangeId() + " endpoint=" + _endpoint); } if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) { boolean success = false; Exception err = null; try { invokeOde(jbiMex, ((InOnly) jbiMex).getInMessage()); success = true; } catch (Exception ex) { __log.error("Error invoking ODE.", ex); err = ex; } finally { if (!success) { jbiMex.setStatus(ExchangeStatus.ERROR); if (err != null && jbiMex.getError() == null) jbiMex.setError(err); } else { if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) jbiMex.setStatus(ExchangeStatus.DONE); } _ode.getChannel().send(jbiMex); } } else if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_OUT)) { boolean success = false; Exception err = null; try { invokeOde(jbiMex, ((InOut) jbiMex).getInMessage()); success = true; } catch (Exception ex) { __log.error("Error invoking ODE.", ex); err = ex; } catch (Throwable t) { __log.error("Unexpected error invoking ODE.", t); err = new RuntimeException(t); } finally { // If we got an error that wasn't sent. if (jbiMex.getStatus() == ExchangeStatus.ACTIVE && !success) { if (err != null && jbiMex.getError() == null) { jbiMex.setError(err); } jbiMex.setStatus(ExchangeStatus.ERROR); _ode.getChannel().send(jbiMex); } } } else { __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + jbiMex.getPattern()); jbiMex.setStatus(ExchangeStatus.ERROR); jbiMex.setError(new Exception("Unknown message exchange pattern: " + jbiMex.getPattern())); } } /** * Called from * {@link MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)} * * @param mex * message exchange */ public void onResponse(MyRoleMessageExchange mex) { __log.debug("Consuming MEX tracker " + mex.getClientId()); javax.jbi.messaging.MessageExchange jbiMex = _jbiMexTracker.consume(mex.getClientId()); if (jbiMex == null) { __log.warn("Ignoring unknown async reply: " + mex); return; } switch (mex.getStatus()) { case FAULT: outResponseFault(mex, jbiMex); break; case RESPONSE: outResponse(mex, jbiMex);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?