⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 processstoreimpl.java

📁 bpel执行引擎用来执行bpel业务流程
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        pconf.setState(state);        if (old != null && old != state)            fireStateChange(pid, state, pconf.getDeploymentUnit().getName());    }    public void setRetiredPackage(String packageName, boolean retired) {        DeploymentUnitDir duDir = _deploymentUnits.get(packageName);        if (duDir == null) throw new ContextException("Could not find package " + packageName);        for (QName processName : duDir.getProcessNames()) {            setState(toPid(processName, duDir.getVersion()), retired ? ProcessState.RETIRED : ProcessState.ACTIVE);        }    }    public ProcessConf getProcessConfiguration(final QName processId) {        _rw.readLock().lock();        try {            return _processes.get(processId);        } finally {            _rw.readLock().unlock();        }    }    public void setProperty(final QName pid, final QName propName, final Node value) {        setProperty(pid, propName, DOMUtils.domToStringLevel2(value));    }    public void setProperty(final QName pid, final QName propName, final String value) {        if (__log.isDebugEnabled())            __log.debug("Setting property " + propName + " on process " + pid);        ProcessConfImpl pconf = _processes.get(pid);        if (pconf == null) {            String msg = __msgs.msgProcessNotFound(pid);            __log.info(msg);            throw new ContextException(msg);        }        final DeploymentUnitDir dudir = pconf.getDeploymentUnit();        exec(new ProcessStoreImpl.Callable<Object>() {            public Object call(ConfStoreConnection conn) {                DeploymentUnitDAO dudao = conn.getDeploymentUnit(dudir.getName());                if (dudao == null)                    return null;                ProcessConfDAO proc = dudao.getProcess(pid);                if (proc == null)                    return null;                proc.setProperty(propName, value);                return null;            }        });        fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.PROPERTY_CHANGED, pid, dudir.getName()));    }    /**     * Load all the deployment units out of the store. Called on start-up.     *     */    public void loadAll() {        final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();        exec(new Callable<Object>() {            public Object call(ConfStoreConnection conn) {                Collection<DeploymentUnitDAO> dus = conn.getDeploymentUnits();                for (DeploymentUnitDAO du : dus)                    try {                        loaded.addAll(load(du));                    } catch (Exception ex) {                        __log.error("Error loading DU from store: " + du.getName(), ex);                    }                return null;            }        });        for (ProcessConfImpl p : loaded) {            try {                fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName());            } catch (Exception except) {                __log.error("Error while activating process: pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except);            }        }    }    public List<QName> getProcesses() {        _rw.readLock().lock();        try {            return new ArrayList<QName>(_processes.keySet());        } finally {            _rw.readLock().unlock();        }    }    public long getCurrentVersion() {        long version = exec(new Callable<Long>() {            public Long call(ConfStoreConnection conn) {                return conn.getNextVersion();            }        });        return version;    }    protected void fireEvent(ProcessStoreEvent pse) {        __log.debug("firing event: " + pse);        for (ProcessStoreListener psl : _listeners)            psl.onProcessStoreEvent(pse);    }    private void fireStateChange(QName processId, ProcessState state, String duname) {        switch (state) {        case ACTIVE:            fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.ACTVIATED, processId, duname));            break;        case DISABLED:            fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.DISABLED, processId, duname));            break;        case RETIRED:            fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.RETIRED, processId, duname));            break;        }    }    public void registerListener(ProcessStoreListener psl) {        __log.debug("Registering listener " + psl);        _listeners.add(psl);    }    public void unregisterListener(ProcessStoreListener psl) {        __log.debug("Unregistering listener " + psl);        _listeners.remove(psl);    }    /**     * Execute database transactions in an isolated context.     *     * @param <T>     *            return type     * @param callable     *            transaction     * @return     */    synchronized <T> T exec(Callable<T> callable) {        // We want to submit db jobs to an executor to isolate        // them from the current thread,        Future<T> future = _executor.submit(callable);        try {            return future.get();        } catch (Exception e) {            throw new ContextException("DbError", e);        }    }    private ConfStoreConnection getConnection() {        return _cf.getConnection();    }    /**     * Create a property mapping based on the initial values in the deployment descriptor.     *     * @param dd     * @return     */    public static Map<QName, Node> calcInitialProperties(TDeployment.Process dd) {        HashMap<QName, Node> ret = new HashMap<QName, Node>();        if (dd.getPropertyList().size() > 0) {            for (TDeployment.Process.Property property : dd.getPropertyList()) {                Element elmtContent = DOMUtils.getElementContent(property.getDomNode());                if (elmtContent != null) {                    // We'll need DOM Level 3                    Document doc = DOMUtils.newDocument();                    doc.appendChild(doc.importNode(elmtContent, true));                    ret.put(property.getName(), doc.getDocumentElement());                } else                    ret.put(property.getName(), property.getDomNode().getFirstChild());            }        }        return ret;    }    /**     * Figure out the initial process state from the state in the deployment descriptor.     *     * @param dd     *            deployment descriptor     * @return     */    private static ProcessState calcInitialState(TDeployment.Process dd) {        ProcessState state = ProcessState.ACTIVE;        if (dd.isSetActive() && dd.getActive() == false)            state = ProcessState.DISABLED;        if (dd.isSetRetired())            state = ProcessState.RETIRED;        return state;    }    /**     * Load a deployment unit record stored in the db into memory.     *     * @param dudao     */    protected List<ProcessConfImpl> load(DeploymentUnitDAO dudao) {        __log.debug("Loading deployment unit record from db: " + dudao.getName());        File dudir = findDeployDir(dudao);        if (dudir == null || !dudir.exists())            throw new ContextException("Deployed directory " + dudir + " no longer there!");        DeploymentUnitDir dud = new DeploymentUnitDir(dudir);        dud.scan();        ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();        _rw.writeLock().lock();        try {            _deploymentUnits.put(dud.getName(), dud);            long version = 0;            for (ProcessConfDAO p : dudao.getProcesses()) {                TDeployment.Process pinfo = dud.getProcessDeployInfo(p.getType());                if (pinfo == null) {                    __log.warn("Cannot load " + p.getPID() + "; cannot find descriptor.");                    continue;                }                Map<QName, Node> props = calcInitialProperties(pinfo);                // TODO: update the props based on the values in the DB.                ProcessConfImpl pconf = new ProcessConfImpl(p.getPID(), p.getType(), p.getVersion(), dud, pinfo, dudao                        .getDeployDate(), props, p.getState(), eprContext);                version = p.getVersion();                _processes.put(pconf.getProcessId(), pconf);                loaded.add(pconf);            }            // All processes and the DU have the same version            dud.setVersion(version);        } finally {            _rw.writeLock().unlock();        }        return loaded;    }    protected File findDeployDir(DeploymentUnitDAO dudao) {        File f = new File(dudao.getDeploymentUnitDir());        if (f.exists())            return f;        f = new File(_deployDir, dudao.getName());        if (f.exists())            return f;        return null;    }    /**     * Make sure that the deployment unit is loaded.     *     * @param duName     *            deployment unit name     */    protected boolean load(final String duName) {        _rw.writeLock().lock();        try {            if (_deploymentUnits.containsKey(duName))                return true;        } finally {            _rw.writeLock().unlock();        }        try {            return exec(new Callable<Boolean>() {                public Boolean call(ConfStoreConnection conn) {                    DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName);                    if (dudao == null)                        return false;                    load(dudao);                    return true;                }            });        } catch (Exception ex) {            __log.error("Error loading deployment unit: " + duName);            return false;        }    }    /**     * Wrapper for database transactions.     *     * @author Maciej Szefler     *     * @param <V>     *            return type     */    abstract class Callable<V> implements java.util.concurrent.Callable<V> {        public V call() {            boolean success = false;            ConfStoreConnection conn = getConnection();            try {                conn.begin();                V r = call(conn);                conn.commit();                success = true;                return r;            } finally {                if (!success)                    try {                        conn.rollback();                    } catch (Exception ex) {                        __log.error("DbError", ex);                    }                try {                    conn.close();                } catch (Exception ex) {                    __log.error("DbError", ex);                }            }        }        abstract V call(ConfStoreConnection conn);    }    public void setDeployDir(File depDir) {        _deployDir = depDir;    }    public File getDeployDir() {        return _deployDir;    }    public static DataSource createInternalDS(String guid) {        jdbcDataSource hsqlds = new jdbcDataSource();        hsqlds.setDatabase("jdbc:hsqldb:mem:" + guid);        hsqlds.setUser("sa");        hsqlds.setPassword("");        return hsqlds;    }    public static void shutdownInternalDB(DataSource ds) {        try {            ds.getConnection().createStatement().execute("SHUTDOWN;");        } catch (SQLException e) {            __log.error("Error shutting down.", e);        }    }    private List<QName> toPids(Collection<QName> processTypes, long version) {        ArrayList<QName> result = new ArrayList<QName>();        for (QName pqName : processTypes) {            result.add(toPid(pqName, version));        }        return result;    }    private QName toPid(QName processType, long version) {        return new QName(processType.getNamespaceURI(), processType.getLocalPart() + "-" + version);    }    private DeploymentUnitDir findOldDU(String newName) {        DeploymentUnitDir old = null;        int dashIdx = newName.lastIndexOf("-");        if (dashIdx > 0 && dashIdx + 1 < newName.length()) {            String radical = newName.substring(0, dashIdx);            int newVersion = -1;            try {                newVersion = Integer.parseInt(newName.substring(newName.lastIndexOf("-") + 1));            } catch (NumberFormatException e) {                // Swallowing, if we can't parse then we just can't find an old version            }            while (old == null && newVersion >= 0)                old = _deploymentUnits.get(radical + "-" + (newVersion--));        }        return old;    }    private class SimpleThreadFactory implements ThreadFactory {        int threadNumber = 0;        public Thread newThread(Runnable r) {            threadNumber += 1;            Thread t = new Thread(r, "ProcessStoreImpl-"+threadNumber);            t.setDaemon(true);            return t;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -