⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bpelengineimpl.java

📁 bpel执行引擎用来执行bpel业务流程
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.bpel.engine;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.dao.MessageExchangeDAO;import org.apache.ode.bpel.dao.ProcessDAO;import org.apache.ode.bpel.dao.ProcessInstanceDAO;import org.apache.ode.bpel.evt.BpelEvent;import org.apache.ode.bpel.iapi.BpelEngine;import org.apache.ode.bpel.iapi.BpelEngineException;import org.apache.ode.bpel.iapi.ContextException;import org.apache.ode.bpel.iapi.Endpoint;import org.apache.ode.bpel.iapi.Message;import org.apache.ode.bpel.iapi.MessageExchange;import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;import org.apache.ode.bpel.iapi.MessageExchange.Status;import org.apache.ode.bpel.iapi.MyRoleMessageExchange;import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;import org.apache.ode.bpel.iapi.Scheduler;import org.apache.ode.bpel.iapi.Scheduler.JobInfo;import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;import org.apache.ode.bpel.o.OPartnerLink;import org.apache.ode.bpel.o.OProcess;import org.apache.ode.utils.msg.MessageBundle;import javax.wsdl.Operation;import javax.wsdl.PortType;import javax.xml.namespace.QName;import java.io.File;import java.io.FileOutputStream;import java.io.ObjectOutputStream;import java.util.*;import java.util.concurrent.Callable;import java.util.concurrent.TimeUnit;/** * Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a * transaction. * * @author mszefler * @author Matthieu Riou <mriou at apache dot org> */public class BpelEngineImpl implements BpelEngine {    private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);    /** RNG, for delays */    private Random _random = new Random(System.currentTimeMillis());    private static double _delayMean = 0;    static {        try {            String delay = System.getenv("ODE_DEBUG_TX_DELAY");            if (delay != null && delay.length() > 0) {                _delayMean = Double.valueOf(delay);                __log.info("Stochastic debugging delay activated. Delay (Mean)=" + _delayMean + "ms.");            }        } catch (Throwable t) {            if (__log.isDebugEnabled()) {                __log.debug("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay", t);            } else {                __log.info("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay");            }        }    }    private static final Messages __msgs = MessageBundle.getMessages(Messages.class);    /** Maximum number of tries for async jobs. */    private static final int MAX_RETRIES = 3;    /** Active processes, keyed by process id. */    final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>();    /** Mapping from myrole endpoint name to active process. */    private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();    /** Manage instance-level locks. */    private final InstanceLockManager _instanceLockManager = new InstanceLockManager();    final Contexts _contexts;    public BpelEngineImpl(Contexts contexts) {        _contexts = contexts;    }    public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService,                                                       String operation, String pipedMexId)            throws BpelEngineException {        BpelProcess target = route(targetService, null);        MessageExchangeDAO dao;        if (target == null || target.isInMemory()) {            dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);        } else {            dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);        }        dao.setCorrelationId(clientKey);        dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());        dao.setPattern(MessageExchangePattern.UNKNOWN.toString());        dao.setCallee(targetService);        dao.setStatus(Status.NEW.toString());        dao.setOperation(operation);        dao.setPipedMessageExchangeId(pipedMexId);        MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(this, dao);        if (target != null) {            target.initMyRoleMex(mex);        }        return mex;    }    public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation) {        return createMessageExchange(clientKey, targetService, operation, null);            }    public MessageExchange getMessageExchange(String mexId) throws BpelEngineException {        MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);        if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);        if (mexdao == null)            return null;        ProcessDAO pdao = mexdao.getProcess();        BpelProcess process = pdao == null ? null : _activeProcesses.get(pdao.getProcessId());        MessageExchangeImpl mex;        switch (mexdao.getDirection()) {        case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:            if (process == null) {                String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());                __log.error(errmsg);                // TODO: Perhaps we should define a checked exception for this                // condition.                throw new BpelEngineException(errmsg);            }            {                OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());                PortType ptype = plink.partnerRolePortType;                Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());                // TODO: recover Partner's EPR                mex = new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null, plink.hasMyRole() ? process                        .getInitialMyRoleEPR(plink) : null, process.getPartnerRoleChannel(plink));            }            break;        case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:            mex = new MyRoleMessageExchangeImpl(this, mexdao);            if (process != null) {                OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());                PortType ptype = plink.myRolePortType;                Operation op = plink.getMyRoleOperation(mexdao.getOperation());                mex.setPortOp(ptype, op);            }            break;        default:            String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;            __log.fatal(errmsg);            throw new BpelEngineException(errmsg);        }        return mex;    }    BpelProcess unregisterProcess(QName process) {        BpelProcess p = _activeProcesses.remove(process);        if (p != null) {            if (__log.isDebugEnabled())                __log.debug("Deactivating process " + p.getPID());            Endpoint processEndpoint = null;            Iterator<Map.Entry<Endpoint,BpelProcess>> serviceIter = _serviceMap.entrySet().iterator();            while (serviceIter.hasNext()) {                Map.Entry<Endpoint,BpelProcess> processEntry = serviceIter.next();                if (processEntry.getValue()._pid.equals(process)) {                    serviceIter.remove();                    processEndpoint = processEntry.getKey();                }            }            // Only deactivating if no other process (version) need that endpoint anymore            // We're only routing using an endpoint/process map for now which means that deploying            // several versions of the same process using the same endpoint (which is the common            // case) will override previous deployments endpoints. So checking the endpoint is not            // enough, we also have to check other versions of the same process.            // A bit clunky, the maps held here should be retought a bit.            boolean otherVersions = false;            for (BpelProcess bpelProcess : _activeProcesses.values()) {                if (bpelProcess._pconf.getType().equals(p._pconf.getType()))                    otherVersions = true;            }            if (_serviceMap.get(processEndpoint) == null && !otherVersions) {                p.deactivate();            }        }        return p;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -