📄 bpelserverimpl.java
字号:
_engine = null; _state = State.SHUTDOWN; } finally { _mngmtLock.writeLock().unlock(); } } public BpelEngine getEngine() { boolean registered = false; _mngmtLock.readLock().lock(); try { _contexts.scheduler.registerSynchronizer(new Synchronizer() { public void afterCompletion(boolean success) { _mngmtLock.readLock().unlock(); } public void beforeCompletion() { } }); registered = true; } finally { // If we failed to register the synchro,then there was an ex/throwable; we need to unlock now. if (!registered) _mngmtLock.readLock().unlock(); } return _engine; } public void register(ProcessConf conf) { if (conf == null) throw new NullPointerException("must specify non-null process configuration."); __log.debug("register: " + conf.getProcessId()); // Ok, IO out of the way, we will mod the server state, so need to get a // lock. try { _mngmtLock.writeLock().lockInterruptibly(); } catch (InterruptedException ie) { __log.debug("register(...) interrupted.", ie); throw new BpelEngineException(__msgs.msgOperationInterrupted()); } try { // If the process is already active, do nothing. if (_engine.isProcessRegistered(conf.getProcessId())) { __log.debug("skipping doRegister" + conf.getProcessId() + ") -- process is already registered"); return; } __log.debug("Registering process " + conf.getProcessId() + " with server."); BpelProcess process = new BpelProcess(conf); _engine.registerProcess(process); _registeredProcesses.add(process); if (_dehydrationPolicy == null) process.hydrate(); __log.info(__msgs.msgProcessRegistered(conf.getProcessId())); } finally { _mngmtLock.writeLock().unlock(); } } public void unregister(QName pid) throws BpelEngineException { if (__log.isTraceEnabled()) __log.trace("unregister: " + pid); try { _mngmtLock.writeLock().lockInterruptibly(); } catch (InterruptedException ie) { __log.debug("unregister() interrupted.", ie); throw new BpelEngineException(__msgs.msgOperationInterrupted()); } try { BpelProcess p = null; if (_engine != null) { p = _engine.unregisterProcess(pid); if (p != null) { _registeredProcesses.remove(p); __log.info(__msgs.msgProcessUnregistered(pid)); } } } catch (Exception ex) { __log.error(__msgs.msgProcessUnregisterFailed(pid), ex); throw new BpelEngineException(ex); } finally { _mngmtLock.writeLock().unlock(); } } /** * Register a global message exchange interceptor. * @param interceptor message-exchange interceptor */ public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) { // NOTE: do not synchronize, globalInterceptors is copy-on-write. _contexts.globalIntereceptors.add(interceptor); } /** * Unregister a global message exchange interceptor. * @param interceptor message-exchange interceptor */ public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) { // NOTE: do not synchronize, globalInterceptors is copy-on-write. _contexts.globalIntereceptors.remove(interceptor); } /** * Check a state transition from state "i" to state "j". */ private final boolean checkState(State i, State j) { if (_state == i) return true; if (_state == j) return false; return false;// throw new IllegalStateException("Unexpected state: " + i); } /* TODO: We need to have a method of cleaning up old deployment data. */ private boolean deleteProcessDAO(final QName pid) { try { // Delete it from the database. return _db.exec(new BpelDatabase.Callable<Boolean>() { public Boolean run(BpelDAOConnection conn) throws Exception { ProcessDAO proc = conn.getProcess(pid); if (proc != null) { proc.delete(); return true; } return false; } }); } catch (Exception ex) { String errmsg = "DbError"; __log.error(errmsg, ex); throw new BpelEngineException(errmsg, ex); } } public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException { getEngine().onScheduledJob(jobInfo); } private class ProcessDefReaper implements Runnable { public void run() { __log.debug("Starting process definition reaper thread."); long pollingTime = 10000; try { while (true) { Thread.sleep(pollingTime); _mngmtLock.writeLock().lockInterruptibly(); try { __log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount); // Copying the runnning process list to avoid synchronization // problems and a potential mess if a policy modifies the list List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses); CollectionsX.remove_if(candidates, new MemberOfFunction<BpelProcess>() { public boolean isMember(BpelProcess o) { return !o.hintIsHydrated(); } }); // And the happy winners are... List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(candidates); // Bye bye for (BpelProcess process : ripped) { __log.debug("Dehydrating process " + process.getPID()); process.dehydrate(); } } finally { _mngmtLock.writeLock().unlock(); } } } catch (InterruptedException e) { __log.info(e); } } } public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) { _dehydrationPolicy = dehydrationPolicy; } public void setConfigProperties(Properties configProperties) { _configProperties = configProperties; } public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException { _contexts.mexContext = mexContext; } public void setScheduler(Scheduler scheduler) throws BpelEngineException { _contexts.scheduler = scheduler; } public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException { _contexts.eprContext = eprContext; } /** * Set the DAO connection factory. The DAO is used by the BPEL engine to * persist information about active processes. * * @param daoCF * {@link BpelDAOConnectionFactory} implementation. */ public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException { _contexts.dao = daoCF; } public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) { _contexts.inMemDao = daoCF; } public void setBindingContext(BindingContext bc) { _contexts.bindingContext = bc; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -