📄 bpelprocess.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.bpel.engine;import java.io.InputStream;import java.net.URI;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.Callable;import javax.xml.namespace.QName;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.common.FaultException;import org.apache.ode.bpel.dao.BpelDAOConnection;import org.apache.ode.bpel.dao.ProcessDAO;import org.apache.ode.bpel.dao.ProcessInstanceDAO;import org.apache.ode.bpel.engine.extvar.ExternalVariableConf;import org.apache.ode.bpel.engine.extvar.ExternalVariableManager;import org.apache.ode.bpel.evt.ProcessInstanceEvent;import org.apache.ode.bpel.explang.ConfigurationException;import org.apache.ode.bpel.explang.EvaluationException;import org.apache.ode.bpel.iapi.BpelEngineException;import org.apache.ode.bpel.iapi.Endpoint;import org.apache.ode.bpel.iapi.EndpointReference;import org.apache.ode.bpel.iapi.MessageExchange;import org.apache.ode.bpel.iapi.PartnerRoleChannel;import org.apache.ode.bpel.iapi.ProcessConf;import org.apache.ode.bpel.intercept.InterceptorInvoker;import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;import org.apache.ode.bpel.o.OElementVarType;import org.apache.ode.bpel.o.OExpressionLanguage;import org.apache.ode.bpel.o.OMessageVarType;import org.apache.ode.bpel.o.OPartnerLink;import org.apache.ode.bpel.o.OProcess;import org.apache.ode.bpel.o.Serializer;import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;import org.apache.ode.bpel.runtime.PROCESS;import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;import org.apache.ode.bpel.runtime.channels.FaultData;import org.apache.ode.jacob.soup.ReplacementMap;import org.apache.ode.utils.ObjectPrinter;import org.apache.ode.utils.msg.MessageBundle;import org.w3c.dom.Element;import org.w3c.dom.Node;import org.w3c.dom.NodeList;import org.w3c.dom.Text;/** * Entry point into the runtime of a BPEL process. * * @author mszefler * @author Matthieu Riou <mriou at apache dot org> */public class BpelProcess { static final Log __log = LogFactory.getLog(BpelProcess.class); private static final Messages __msgs = MessageBundle.getMessages(Messages.class); private volatile Map<OPartnerLink, PartnerLinkPartnerRoleImpl> _partnerRoles; private volatile Map<OPartnerLink, PartnerLinkMyRoleImpl> _myRoles; /** Mapping from a myrole to a {"Service Name" (QNAME) / port}. It's actually more a tuple than a map as * it's important to note that the same process with the same endpoint can have 2 different myroles. */ private volatile Map<PartnerLinkMyRoleImpl, Endpoint> _endpointToMyRoleMap; // Backup hashmaps to keep initial endpoints handy after dehydration private Map<Endpoint, EndpointReference> _myEprs = new HashMap<Endpoint, EndpointReference>(); private Map<Endpoint, EndpointReference> _partnerEprs = new HashMap<Endpoint, EndpointReference>(); private Map<Endpoint, PartnerRoleChannel> _partnerChannels = new HashMap<Endpoint, PartnerRoleChannel>(); final QName _pid; private volatile OProcess _oprocess; // Has the process already been hydrated before? private boolean _hydratedOnce = false; /** Last time the process was used. */ private volatile long _lastUsed; BpelEngineImpl _engine; DebuggerSupport _debugger; ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry; private ReplacementMap _replacementMap; final ProcessConf _pconf; /** {@link MessageExchangeInterceptor}s registered for this process. */ private final List<MessageExchangeInterceptor> _mexInterceptors = new ArrayList<MessageExchangeInterceptor>(); /** Latch-like thing to control hydration/dehydration. */ private HydrationLatch _hydrationLatch; /** Deploy-time configuraton for external variables. */ private ExternalVariableConf _extVarConf; private ExternalVariableManager _evm; public BpelProcess(ProcessConf conf) { _pid = conf.getProcessId(); _pconf = conf; _hydrationLatch = new HydrationLatch(); } /** * Retrives the base URI to use for local resource resolution. * * @return URI - instance representing the absolute file path to the physical location of the process definition folder. */ public URI getBaseResourceURI() { return this._pconf.getBaseURI(); } /** * Intiialize the external variable configuration/engine manager. This is called from hydration logic, so it * is possible to change the external variable configuration at runtime. * */ void initExternalVariables() { List<Element> conf = _pconf.getExtensionElement(ExternalVariableConf.EXTVARCONF_ELEMENT); _extVarConf = new ExternalVariableConf(conf); _evm = new ExternalVariableManager(_pid, _extVarConf, _engine._contexts.externalVariableEngines, _oprocess); } public String toString() { return "BpelProcess[" + _pid + "]"; } public ExternalVariableManager getEVM() { return _evm; } public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) { if (__log.isDebugEnabled()) __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action); markused(); BpelRuntimeContextImpl processInstance = createRuntimeContext(instanceDAO, null, null); processInstance.recoverActivity(channel, activityId, action, fault); } static String generateMessageExchangeIdentifier(String partnerlinkName, String operationName) { StringBuffer sb = new StringBuffer(partnerlinkName); sb.append('.'); sb.append(operationName); return sb.toString(); } /** * Entry point for message exchanges aimed at the my role. * * @param mex */ void invokeProcess(MyRoleMessageExchangeImpl mex) { try { _hydrationLatch.latch(1); List<PartnerLinkMyRoleImpl> targets = getMyRolesForService(mex.getServiceName()); if (targets.isEmpty()) { String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId()); __log.error(errmsg); mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg, null); return; } mex.getDAO().setProcess(getProcessDAO()); if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) { __log.debug("Aborting processing of mex " + mex + " due to interceptors."); return; } markused(); // Ideally, if Java supported closure, the routing code would return null or the appropriate // closure to handle the route. PartnerLinkMyRoleImpl.RoutingInfo routing = null; boolean routed = false; for (PartnerLinkMyRoleImpl target : targets) { routing = target.findRoute(mex); boolean createInstance = target.isCreateInstance(mex); if (mex.getStatus() != MessageExchange.Status.FAILURE) { if (routing.messageRoute == null && createInstance) { // No route but we can create a new instance target.invokeNewInstance(mex, routing); routed = true; break; } else if (routing.messageRoute != null) { // Found a route, hitting it target.invokeInstance(mex, routing); routed = true; break; } } } // Nothing found, saving for later if (!routed) { // TODO this is kind of hackish when no match and more than one myrole is selected. // we save the routing on the last myrole // actually the message queue should be attached to the instance instead of the correlator targets.get(targets.size()-1).noRoutingMatch(mex, routing); } // Now we have to update our message exchange status. If the <reply> was not hit during the // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way // or a two-way that needs to delivery the reply asynchronously. if (mex.getStatus() == MessageExchange.Status.REQUEST) { mex.setStatus(MessageExchange.Status.ASYNC); } markused(); } finally { _hydrationLatch.release(1); } // For a one way, once the engine is done, the mex can be safely released. if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) { mex.release(); } } /** Several myroles can use the same service in a given process */ private List<PartnerLinkMyRoleImpl> getMyRolesForService(QName serviceName) { List<PartnerLinkMyRoleImpl> myRoles = new ArrayList<PartnerLinkMyRoleImpl>(5); for (Map.Entry<PartnerLinkMyRoleImpl,Endpoint> e : getEndpointToMyRoleMap().entrySet()) { if (e.getValue().serviceName.equals(serviceName)) myRoles.add(e.getKey()); } return myRoles; } void initMyRoleMex(MyRoleMessageExchangeImpl mex) { markused(); PartnerLinkMyRoleImpl target = null; for (Map.Entry<PartnerLinkMyRoleImpl,Endpoint> e : getEndpointToMyRoleMap().entrySet()) { if (e.getValue().serviceName.equals(mex.getServiceName())) { // First one is fine as we're only interested in the portType and operation here and // even if a process has 2 myrole partner links target = e.getKey(); break; } } if (target != null) { mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName())); } else { __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex."); } } /** * Extract the value of a BPEL property from a BPEL messsage variable. * * @param msgData * message variable data * @param alias * alias to apply * @param target * description of the data (for error logging only) * @return value of the property * @throws FaultException */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -