📄 bpelengineimpl.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.bpel.engine;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.dao.MessageExchangeDAO;import org.apache.ode.bpel.dao.ProcessDAO;import org.apache.ode.bpel.dao.ProcessInstanceDAO;import org.apache.ode.bpel.evt.BpelEvent;import org.apache.ode.bpel.iapi.BpelEngine;import org.apache.ode.bpel.iapi.BpelEngineException;import org.apache.ode.bpel.iapi.ContextException;import org.apache.ode.bpel.iapi.Endpoint;import org.apache.ode.bpel.iapi.Message;import org.apache.ode.bpel.iapi.MessageExchange;import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;import org.apache.ode.bpel.iapi.MessageExchange.Status;import org.apache.ode.bpel.iapi.MyRoleMessageExchange;import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;import org.apache.ode.bpel.iapi.Scheduler;import org.apache.ode.bpel.iapi.Scheduler.JobInfo;import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;import org.apache.ode.bpel.o.OPartnerLink;import org.apache.ode.bpel.o.OProcess;import org.apache.ode.utils.msg.MessageBundle;import javax.wsdl.Operation;import javax.wsdl.PortType;import javax.xml.namespace.QName;import java.io.File;import java.io.FileOutputStream;import java.io.ObjectOutputStream;import java.util.*;import java.util.concurrent.Callable;import java.util.concurrent.TimeUnit;/** * Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a * transaction. * * @author mszefler * @author Matthieu Riou <mriou at apache dot org> */public class BpelEngineImpl implements BpelEngine { private static final Log __log = LogFactory.getLog(BpelEngineImpl.class); /** RNG, for delays */ private Random _random = new Random(System.currentTimeMillis()); private static double _delayMean = 0; static { try { String delay = System.getenv("ODE_DEBUG_TX_DELAY"); if (delay != null && delay.length() > 0) { _delayMean = Double.valueOf(delay); __log.info("Stochastic debugging delay activated. Delay (Mean)=" + _delayMean + "ms."); } } catch (Throwable t) { if (__log.isDebugEnabled()) { __log.debug("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay", t); } else { __log.info("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay"); } } } private static final Messages __msgs = MessageBundle.getMessages(Messages.class); /** Maximum number of tries for async jobs. */ private static final int MAX_RETRIES = 3; /** Active processes, keyed by process id. */ final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>(); /** Mapping from myrole endpoint name to active process. */ private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>(); /** Manage instance-level locks. */ private final InstanceLockManager _instanceLockManager = new InstanceLockManager(); final Contexts _contexts; public BpelEngineImpl(Contexts contexts) { _contexts = contexts; } public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation, String pipedMexId) throws BpelEngineException { BpelProcess target = route(targetService, null); MessageExchangeDAO dao; if (target == null || target.isInMemory()) { dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); } else { dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); } dao.setCorrelationId(clientKey); dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString()); dao.setPattern(MessageExchangePattern.UNKNOWN.toString()); dao.setCallee(targetService); dao.setStatus(Status.NEW.toString()); dao.setOperation(operation); dao.setPipedMessageExchangeId(pipedMexId); MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(this, dao); if (target != null) { target.initMyRoleMex(mex); } return mex; } public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation) { return createMessageExchange(clientKey, targetService, operation, null); } public MessageExchange getMessageExchange(String mexId) throws BpelEngineException { MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId); if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId); if (mexdao == null) return null; ProcessDAO pdao = mexdao.getProcess(); BpelProcess process = pdao == null ? null : _activeProcesses.get(pdao.getProcessId()); MessageExchangeImpl mex; switch (mexdao.getDirection()) { case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE: if (process == null) { String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId()); __log.error(errmsg); // TODO: Perhaps we should define a checked exception for this // condition. throw new BpelEngineException(errmsg); } { OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId()); PortType ptype = plink.partnerRolePortType; Operation op = plink.getPartnerRoleOperation(mexdao.getOperation()); // TODO: recover Partner's EPR mex = new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null, plink.hasMyRole() ? process .getInitialMyRoleEPR(plink) : null, process.getPartnerRoleChannel(plink)); } break; case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE: mex = new MyRoleMessageExchangeImpl(this, mexdao); if (process != null) { OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId()); PortType ptype = plink.myRolePortType; Operation op = plink.getMyRoleOperation(mexdao.getOperation()); mex.setPortOp(ptype, op); } break; default: String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId; __log.fatal(errmsg); throw new BpelEngineException(errmsg); } return mex; } BpelProcess unregisterProcess(QName process) { BpelProcess p = _activeProcesses.remove(process); if (p != null) { if (__log.isDebugEnabled()) __log.debug("Deactivating process " + p.getPID()); Endpoint processEndpoint = null; Iterator<Map.Entry<Endpoint,BpelProcess>> serviceIter = _serviceMap.entrySet().iterator(); while (serviceIter.hasNext()) { Map.Entry<Endpoint,BpelProcess> processEntry = serviceIter.next(); if (processEntry.getValue()._pid.equals(process)) { serviceIter.remove(); processEndpoint = processEntry.getKey(); } } // Only deactivating if no other process (version) need that endpoint anymore // We're only routing using an endpoint/process map for now which means that deploying // several versions of the same process using the same endpoint (which is the common // case) will override previous deployments endpoints. So checking the endpoint is not // enough, we also have to check other versions of the same process. // A bit clunky, the maps held here should be retought a bit. boolean otherVersions = false; for (BpelProcess bpelProcess : _activeProcesses.values()) { if (bpelProcess._pconf.getType().equals(p._pconf.getType())) otherVersions = true; } if (_serviceMap.get(processEndpoint) == null && !otherVersions) { p.deactivate(); } } return p; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -