⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 processstoreimpl.java

📁 bpel执行引擎用来执行bpel业务流程
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.store;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.compiler.api.CompilationException;import org.apache.ode.bpel.dd.DeployDocument;import org.apache.ode.bpel.dd.TDeployment;import org.apache.ode.bpel.iapi.ContextException;import org.apache.ode.bpel.iapi.ProcessConf;import org.apache.ode.bpel.iapi.ProcessState;import org.apache.ode.bpel.iapi.ProcessStore;import org.apache.ode.bpel.iapi.ProcessStoreEvent;import org.apache.ode.bpel.iapi.ProcessStoreListener;import org.apache.ode.bpel.iapi.EndpointReferenceContext;import org.apache.ode.store.DeploymentUnitDir.CBPInfo;import org.apache.ode.utils.DOMUtils;import org.apache.ode.utils.GUID;import org.apache.ode.utils.msg.MessageBundle;import org.apache.ode.il.config.OdeConfigProperties;import org.hsqldb.jdbc.jdbcDataSource;import org.w3c.dom.Document;import org.w3c.dom.Element;import org.w3c.dom.Node;import javax.sql.DataSource;import javax.xml.namespace.QName;import java.io.File;import java.io.IOException;import java.sql.SQLException;import java.util.*;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ThreadFactory;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** * <p> * JDBC-based implementation of a process store. Also provides an "in-memory" store by way of HSQL database. * </p> * * <p> * The philsophy here is to keep things simple. Process store operations are relatively infrequent. Performance of the public * methods is not a concern. However, note that the {@link org.apache.ode.bpel.iapi.ProcessConf} objects returned by the class are * going to be used from within the engine runtime, and hence their performance needs to be very good. Similarly, these objects * should be immutable so as not to confuse the engine. * * Note the way that the database is used in this class, it is more akin to a recovery log, this is intentional: we want to start * up, load stuff from the database and then pretty much forget about it when it comes to reads. * * @author Maciej Szefler <mszefler at gmail dot com> * @author mriou <mriou at apache dot org> */public class ProcessStoreImpl implements ProcessStore {    private static final Log __log = LogFactory.getLog(ProcessStoreImpl.class);    private static final Messages __msgs = MessageBundle.getMessages(Messages.class);    private final CopyOnWriteArrayList<ProcessStoreListener> _listeners = new CopyOnWriteArrayList<ProcessStoreListener>();    private Map<QName, ProcessConfImpl> _processes = new HashMap<QName, ProcessConfImpl>();    private Map<String, DeploymentUnitDir> _deploymentUnits = new HashMap<String, DeploymentUnitDir>();    /** Guards access to the _processes and _deploymentUnits */    private final ReadWriteLock _rw = new ReentrantReadWriteLock();    /** GUID used to create a unique in-memory db. */    private String _guid = new GUID().toString();    private ConfStoreConnectionFactory _cf;    private EndpointReferenceContext eprContext;    protected File _deployDir;    /**     * Executor used to process DB transactions. Allows us to isolate the TX context, and to ensure that only one TX gets executed a     * time. We don't really care to parallelize these operations because: i) HSQL does not isolate transactions and we don't want     * to get confused ii) we're already serializing all the operations with a read/write lock. iii) we don't care about     * performance, these are infrequent operations.     */    private ExecutorService _executor = Executors.newSingleThreadExecutor(new SimpleThreadFactory());    /**     * In-memory DataSource, or <code>null</code> if we are using a real DS. We need this to shutdown the DB.     */    private DataSource _inMemDs;    public ProcessStoreImpl() {        this(null, null, "", new OdeConfigProperties(new Properties(), ""), true);    }    public ProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean auto) {        this.eprContext = eprContext;        if (ds != null) {            // ugly hack            if (persistenceType.toLowerCase().indexOf("hib") != -1)                _cf = new org.apache.ode.store.hib.DbConfStoreConnectionFactory(ds, props.getProperties(), auto);            else                _cf = new org.apache.ode.store.jpa.DbConfStoreConnectionFactory(ds, auto);        } else {            // If the datasource is not provided, then we create a HSQL-based in-memory            // database. Makes testing a bit simpler.            DataSource hsqlds = createInternalDS(_guid);            if ("hibernate".equalsIgnoreCase(persistenceType))                _cf = new org.apache.ode.store.hib.DbConfStoreConnectionFactory(hsqlds, props.getProperties(), auto);            else                _cf = new org.apache.ode.store.jpa.DbConfStoreConnectionFactory(hsqlds, auto);            _inMemDs = hsqlds;        }    }    public void shutdown() {        if (_inMemDs != null) {            shutdownInternalDB(_inMemDs);            _inMemDs = null;        }    }    @Override    protected void finalize() throws Throwable {        // force a shutdown so that HSQL cleans up its mess.        try {            shutdown();        } catch (Throwable t) {            ; // we tried, no worries.        }        super.finalize();    }    /**     * Deploys a process.     */    public Collection<QName> deploy(final File deploymentUnitDirectory) {        __log.info(__msgs.msgDeployStarting(deploymentUnitDirectory));        final Date deployDate = new Date();        // Create the DU and compile/scan it before acquiring lock.        final DeploymentUnitDir du = new DeploymentUnitDir(deploymentUnitDirectory);        try {            du.compile();        } catch (CompilationException ce) {            String errmsg = __msgs.msgDeployFailCompileErrors(ce);            __log.error(errmsg, ce);            throw new ContextException(errmsg, ce);        }        du.scan();        final DeployDocument dd = du.getDeploymentDescriptor();        final ArrayList<ProcessConfImpl> processes = new ArrayList<ProcessConfImpl>();        Collection<QName> deployed;        _rw.writeLock().lock();        // Process and DU use a monotonically increased single version number.        long version = exec(new Callable<Long>() {            public Long call(ConfStoreConnection conn) {                return conn.getNextVersion();            }        });        try {            if (_deploymentUnits.containsKey(du.getName())) {                String errmsg = __msgs.msgDeployFailDuplicateDU(du.getName());                __log.error(errmsg);                throw new ContextException(errmsg);            }            du.setVersion(version);            for (TDeployment.Process processDD : dd.getDeploy().getProcessList()) {                QName pid = toPid(processDD.getName(), version);                // Retires older version if we can find one                DeploymentUnitDir oldDU = findOldDU(du.getName());                if (oldDU != null)                    setRetiredPackage(oldDU.getName(), true);                if (_processes.containsKey(pid)) {                    String errmsg = __msgs.msgDeployFailDuplicatePID(processDD.getName(), du.getName());                    __log.error(errmsg);                    throw new ContextException(errmsg);                }                QName type = processDD.getType() != null ? processDD.getType() : processDD.getName();                CBPInfo cbpInfo = du.getCBPInfo(type);                if (cbpInfo == null) {                    String errmsg = __msgs.msgDeployFailedProcessNotFound(processDD.getName(), du.getName());                    __log.error(errmsg);                    throw new ContextException(errmsg);                }                // final OProcess oprocess = loadCBP(cbpInfo.cbp);                ProcessConfImpl pconf = new ProcessConfImpl(pid, processDD.getName(), version, du, processDD, deployDate,                        calcInitialProperties(processDD), calcInitialState(processDD), eprContext);                processes.add(pconf);            }            _deploymentUnits.put(du.getName(), du);            for (ProcessConfImpl process : processes) {                __log.info(__msgs.msgProcessDeployed(du.getDeployDir(), process.getProcessId()));                _processes.put(process.getProcessId(), process);            }        } finally {            _rw.writeLock().unlock();        }        // Do the deployment in the DB. We need this so that we remember deployments across system shutdowns.        // We don't fail if there is a DB error, simply print some errors.        deployed = exec(new Callable<Collection<QName>>() {            public Collection<QName> call(ConfStoreConnection conn) {                // Check that this deployment unit is not deployed.                DeploymentUnitDAO dudao = conn.getDeploymentUnit(du.getName());                if (dudao != null) {                    String errmsg = "Database out of synch for DU " + du.getName();                    __log.warn(errmsg);                    dudao.delete();                }                dudao = conn.createDeploymentUnit(du.getName());                try {                    dudao.setDeploymentUnitDir(deploymentUnitDirectory.getCanonicalPath());                } catch (IOException e1) {                    String errmsg = "Error getting canonical path for " + du.getName()                            + "; deployment unit will not be available after restart!";                    __log.error(errmsg);                }                ArrayList<QName> deployed = new ArrayList<QName>();                // Going trough each process declared in the dd                for (ProcessConfImpl pc : processes) {                    try {                        ProcessConfDAO newDao = dudao.createProcess(pc.getProcessId(), pc.getType(), pc.getVersion());                        newDao.setState(pc.getState());                        for (Map.Entry<QName, Node> prop : pc.getProcessProperties().entrySet()) {                            newDao.setProperty(prop.getKey(), DOMUtils.domToString(prop.getValue()));                        }                        deployed.add(pc.getProcessId());                        conn.setVersion(pc.getVersion());                    } catch (Throwable e) {                        String errmsg = "Error persisting deployment record for " + pc.getProcessId()                                + "; process will not be available after restart!";                        __log.error(errmsg, e);                    }                }                return deployed;            }        });        // We want the events to be fired outside of the bounds of the writelock.        try {            for (ProcessConfImpl process : processes) {                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.DEPLOYED, process.getProcessId(), process.getDeploymentUnit()                        .getName()));                fireStateChange(process.getProcessId(), process.getState(), process.getDeploymentUnit().getName());            }        } catch (Exception e) {            // A problem at that point means that engine deployment failed, we don't want the store to keep the du            __log.warn("Deployment failed within the engine, store undeploying process.");            undeploy(deploymentUnitDirectory);            if (e instanceof ContextException) throw (ContextException) e;            else throw new ContextException("Deployment failed within the engine.", e);        }        return deployed;    }    public Collection<QName> undeploy(final File dir) {        try {            exec(new Callable<Collection<QName>>() {                public Collection<QName> call(ConfStoreConnection conn) {                    DeploymentUnitDAO dudao = conn.getDeploymentUnit(dir.getName());                    if (dudao != null)                        dudao.delete();                    return null;                }            });        } catch (Exception ex) {            __log.error("Error synchronizing with data store; " + dir.getName() + " may be reappear after restart!");        }        Collection<QName> undeployed = Collections.emptyList();        DeploymentUnitDir du;        _rw.writeLock().lock();        try {            du = _deploymentUnits.remove(dir.getName());            if (du != null) {                undeployed = toPids(du.getProcessNames(), du.getVersion());                _processes.keySet().removeAll(undeployed);            }        } finally {            _rw.writeLock().unlock();        }        for (QName pn : undeployed) {            fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName()));            __log.info(__msgs.msgProcessUndeployed(pn));        }        return undeployed;    }    public Collection<String> getPackages() {        _rw.readLock().lock();        try {            return new ArrayList<String>(_deploymentUnits.keySet());        } finally {            _rw.readLock().unlock();        }    }    public List<QName> listProcesses(String packageName) {        _rw.readLock().lock();        try {            DeploymentUnitDir du = _deploymentUnits.get(packageName);            if (du == null)                return null;            return toPids(du.getProcessNames(), du.getVersion());        } finally {            _rw.readLock().unlock();        }    }    public void setState(final QName pid, final ProcessState state) {        __log.debug("Changing process state for " + pid + " to " + state);        final ProcessConfImpl pconf;        _rw.readLock().lock();        try {            pconf = _processes.get(pid);            if (pconf == null) {                String msg = __msgs.msgProcessNotFound(pid);                __log.info(msg);                throw new ContextException(msg);            }        } finally {            _rw.readLock().unlock();        }        final DeploymentUnitDir dudir = pconf.getDeploymentUnit();        // Update in the database.        ProcessState old = exec(new Callable<ProcessState>() {            public ProcessState call(ConfStoreConnection conn) {                DeploymentUnitDAO dudao = conn.getDeploymentUnit(dudir.getName());                if (dudao == null) {                    String errmsg = __msgs.msgProcessNotFound(pid);                    __log.error(errmsg);                    throw new ContextException(errmsg);                }                ProcessConfDAO dao = dudao.getProcess(pid);                if (dao == null) {                    String errmsg = __msgs.msgProcessNotFound(pid);                    __log.error(errmsg);                    throw new ContextException(errmsg);                }                ProcessState old = dao.getState();                dao.setState(state);                pconf.setState(state);                return old;            }        });

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -