📄 processstoreimpl.java
字号:
pconf.setState(state); if (old != null && old != state) fireStateChange(pid, state, pconf.getDeploymentUnit().getName()); } public void setRetiredPackage(String packageName, boolean retired) { DeploymentUnitDir duDir = _deploymentUnits.get(packageName); if (duDir == null) throw new ContextException("Could not find package " + packageName); for (QName processName : duDir.getProcessNames()) { setState(toPid(processName, duDir.getVersion()), retired ? ProcessState.RETIRED : ProcessState.ACTIVE); } } public ProcessConf getProcessConfiguration(final QName processId) { _rw.readLock().lock(); try { return _processes.get(processId); } finally { _rw.readLock().unlock(); } } public void setProperty(final QName pid, final QName propName, final Node value) { setProperty(pid, propName, DOMUtils.domToStringLevel2(value)); } public void setProperty(final QName pid, final QName propName, final String value) { if (__log.isDebugEnabled()) __log.debug("Setting property " + propName + " on process " + pid); ProcessConfImpl pconf = _processes.get(pid); if (pconf == null) { String msg = __msgs.msgProcessNotFound(pid); __log.info(msg); throw new ContextException(msg); } final DeploymentUnitDir dudir = pconf.getDeploymentUnit(); exec(new ProcessStoreImpl.Callable<Object>() { public Object call(ConfStoreConnection conn) { DeploymentUnitDAO dudao = conn.getDeploymentUnit(dudir.getName()); if (dudao == null) return null; ProcessConfDAO proc = dudao.getProcess(pid); if (proc == null) return null; proc.setProperty(propName, value); return null; } }); fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.PROPERTY_CHANGED, pid, dudir.getName())); } /** * Load all the deployment units out of the store. Called on start-up. * */ public void loadAll() { final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>(); exec(new Callable<Object>() { public Object call(ConfStoreConnection conn) { Collection<DeploymentUnitDAO> dus = conn.getDeploymentUnits(); for (DeploymentUnitDAO du : dus) try { loaded.addAll(load(du)); } catch (Exception ex) { __log.error("Error loading DU from store: " + du.getName(), ex); } return null; } }); for (ProcessConfImpl p : loaded) { try { fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName()); } catch (Exception except) { __log.error("Error while activating process: pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except); } } } public List<QName> getProcesses() { _rw.readLock().lock(); try { return new ArrayList<QName>(_processes.keySet()); } finally { _rw.readLock().unlock(); } } public long getCurrentVersion() { long version = exec(new Callable<Long>() { public Long call(ConfStoreConnection conn) { return conn.getNextVersion(); } }); return version; } protected void fireEvent(ProcessStoreEvent pse) { __log.debug("firing event: " + pse); for (ProcessStoreListener psl : _listeners) psl.onProcessStoreEvent(pse); } private void fireStateChange(QName processId, ProcessState state, String duname) { switch (state) { case ACTIVE: fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.ACTVIATED, processId, duname)); break; case DISABLED: fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.DISABLED, processId, duname)); break; case RETIRED: fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.RETIRED, processId, duname)); break; } } public void registerListener(ProcessStoreListener psl) { __log.debug("Registering listener " + psl); _listeners.add(psl); } public void unregisterListener(ProcessStoreListener psl) { __log.debug("Unregistering listener " + psl); _listeners.remove(psl); } /** * Execute database transactions in an isolated context. * * @param <T> * return type * @param callable * transaction * @return */ synchronized <T> T exec(Callable<T> callable) { // We want to submit db jobs to an executor to isolate // them from the current thread, Future<T> future = _executor.submit(callable); try { return future.get(); } catch (Exception e) { throw new ContextException("DbError", e); } } private ConfStoreConnection getConnection() { return _cf.getConnection(); } /** * Create a property mapping based on the initial values in the deployment descriptor. * * @param dd * @return */ public static Map<QName, Node> calcInitialProperties(TDeployment.Process dd) { HashMap<QName, Node> ret = new HashMap<QName, Node>(); if (dd.getPropertyList().size() > 0) { for (TDeployment.Process.Property property : dd.getPropertyList()) { Element elmtContent = DOMUtils.getElementContent(property.getDomNode()); if (elmtContent != null) { // We'll need DOM Level 3 Document doc = DOMUtils.newDocument(); doc.appendChild(doc.importNode(elmtContent, true)); ret.put(property.getName(), doc.getDocumentElement()); } else ret.put(property.getName(), property.getDomNode().getFirstChild()); } } return ret; } /** * Figure out the initial process state from the state in the deployment descriptor. * * @param dd * deployment descriptor * @return */ private static ProcessState calcInitialState(TDeployment.Process dd) { ProcessState state = ProcessState.ACTIVE; if (dd.isSetActive() && dd.getActive() == false) state = ProcessState.DISABLED; if (dd.isSetRetired()) state = ProcessState.RETIRED; return state; } /** * Load a deployment unit record stored in the db into memory. * * @param dudao */ protected List<ProcessConfImpl> load(DeploymentUnitDAO dudao) { __log.debug("Loading deployment unit record from db: " + dudao.getName()); File dudir = findDeployDir(dudao); if (dudir == null || !dudir.exists()) throw new ContextException("Deployed directory " + dudir + " no longer there!"); DeploymentUnitDir dud = new DeploymentUnitDir(dudir); dud.scan(); ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>(); _rw.writeLock().lock(); try { _deploymentUnits.put(dud.getName(), dud); long version = 0; for (ProcessConfDAO p : dudao.getProcesses()) { TDeployment.Process pinfo = dud.getProcessDeployInfo(p.getType()); if (pinfo == null) { __log.warn("Cannot load " + p.getPID() + "; cannot find descriptor."); continue; } Map<QName, Node> props = calcInitialProperties(pinfo); // TODO: update the props based on the values in the DB. ProcessConfImpl pconf = new ProcessConfImpl(p.getPID(), p.getType(), p.getVersion(), dud, pinfo, dudao .getDeployDate(), props, p.getState(), eprContext); version = p.getVersion(); _processes.put(pconf.getProcessId(), pconf); loaded.add(pconf); } // All processes and the DU have the same version dud.setVersion(version); } finally { _rw.writeLock().unlock(); } return loaded; } protected File findDeployDir(DeploymentUnitDAO dudao) { File f = new File(dudao.getDeploymentUnitDir()); if (f.exists()) return f; f = new File(_deployDir, dudao.getName()); if (f.exists()) return f; return null; } /** * Make sure that the deployment unit is loaded. * * @param duName * deployment unit name */ protected boolean load(final String duName) { _rw.writeLock().lock(); try { if (_deploymentUnits.containsKey(duName)) return true; } finally { _rw.writeLock().unlock(); } try { return exec(new Callable<Boolean>() { public Boolean call(ConfStoreConnection conn) { DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName); if (dudao == null) return false; load(dudao); return true; } }); } catch (Exception ex) { __log.error("Error loading deployment unit: " + duName); return false; } } /** * Wrapper for database transactions. * * @author Maciej Szefler * * @param <V> * return type */ abstract class Callable<V> implements java.util.concurrent.Callable<V> { public V call() { boolean success = false; ConfStoreConnection conn = getConnection(); try { conn.begin(); V r = call(conn); conn.commit(); success = true; return r; } finally { if (!success) try { conn.rollback(); } catch (Exception ex) { __log.error("DbError", ex); } try { conn.close(); } catch (Exception ex) { __log.error("DbError", ex); } } } abstract V call(ConfStoreConnection conn); } public void setDeployDir(File depDir) { _deployDir = depDir; } public File getDeployDir() { return _deployDir; } public static DataSource createInternalDS(String guid) { jdbcDataSource hsqlds = new jdbcDataSource(); hsqlds.setDatabase("jdbc:hsqldb:mem:" + guid); hsqlds.setUser("sa"); hsqlds.setPassword(""); return hsqlds; } public static void shutdownInternalDB(DataSource ds) { try { ds.getConnection().createStatement().execute("SHUTDOWN;"); } catch (SQLException e) { __log.error("Error shutting down.", e); } } private List<QName> toPids(Collection<QName> processTypes, long version) { ArrayList<QName> result = new ArrayList<QName>(); for (QName pqName : processTypes) { result.add(toPid(pqName, version)); } return result; } private QName toPid(QName processType, long version) { return new QName(processType.getNamespaceURI(), processType.getLocalPart() + "-" + version); } private DeploymentUnitDir findOldDU(String newName) { DeploymentUnitDir old = null; int dashIdx = newName.lastIndexOf("-"); if (dashIdx > 0 && dashIdx + 1 < newName.length()) { String radical = newName.substring(0, dashIdx); int newVersion = -1; try { newVersion = Integer.parseInt(newName.substring(newName.lastIndexOf("-") + 1)); } catch (NumberFormatException e) { // Swallowing, if we can't parse then we just can't find an old version } while (old == null && newVersion >= 0) old = _deploymentUnits.get(radical + "-" + (newVersion--)); } return old; } private class SimpleThreadFactory implements ThreadFactory { int threadNumber = 0; public Thread newThread(Runnable r) { threadNumber += 1; Thread t = new Thread(r, "ProcessStoreImpl-"+threadNumber); t.setDaemon(true); return t; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -