📄 bpelprocess.java
字号:
} QName getPID() { return _pid; } PartnerRoleChannel getPartnerRoleChannel(OPartnerLink partnerLink) { try { _hydrationLatch.latch(1); PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(partnerLink); if (prole == null) throw new IllegalStateException("Unknown partner link " + partnerLink); return prole._channel; } finally { _hydrationLatch.release(1); } } public void saveEvent(ProcessInstanceEvent event, ProcessInstanceDAO instanceDao) { markused(); if (instanceDao != null) saveInstanceEvent(event, instanceDao); else __log.debug("Couldn't find instance to save event, no event generated!"); } void saveInstanceEvent(ProcessInstanceEvent event, ProcessInstanceDAO instanceDao) { instanceDao.insertBpelEvent(event); } /** * Ask the process to dehydrate. */ void dehydrate() { try { _hydrationLatch.latch(0); // We don't actually need to do anything, the latch will run the doDehydrate method // when necessary.. } finally { _hydrationLatch.release(0); } } void hydrate() { try { _hydrationLatch.latch(1); // We don't actually need to do anything, the latch will run the doHydrate method // when necessary.. } finally { _hydrationLatch.release(1); } } OProcess getOProcess() { try { _hydrationLatch.latch(1); return _oprocess; } finally { _hydrationLatch.release(1); } } private Map<PartnerLinkMyRoleImpl,Endpoint> getEndpointToMyRoleMap() { try { _hydrationLatch.latch(1); return _endpointToMyRoleMap; } finally { _hydrationLatch.release(1); } } public ReplacementMap getReplacementMap(QName processName) { try { _hydrationLatch.latch(1); if (processName.equals(_pid)) return _replacementMap; else try { // We're asked for an older version of this process, fetching it OProcess oprocess = _engine.getOProcess(processName); // Older versions may ventually need more expression languages registerExprLang(oprocess); return new ReplacementMapImpl(oprocess); } catch (Exception e) { String errmsg = "Error reloading compiled process " + _pid + "; the file appears to be corrupted."; __log.error(errmsg); throw new BpelEngineException(errmsg, e); } } finally { _hydrationLatch.release(1); } } BpelEngineImpl getEngine() { return _engine; } public boolean isInMemory() { return _pconf.isTransient(); } public long getLastUsed() { return _lastUsed; } /** * Get a hint as to whether this process is hydrated. Note this is only a hint, since things could change. */ public boolean hintIsHydrated() { return _oprocess != null; } /** Keep track of the time the process was last used. */ private final void markused() { _lastUsed = System.currentTimeMillis(); } /** Create a version-appropriate runtime context. */ BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template, MyRoleMessageExchangeImpl instantiatingMessageExchange) { return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange); } /** * If necessary, create an object in the data store to represent the process. We'll re-use an existing object if it already * exists and matches the GUID. */ private void bounceProcessDAO(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) { __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")"); try { boolean create = true; ProcessDAO old = conn.getProcess(pid); if (old != null) { __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid()); if (oprocess.guid == null) { // No guid, old version assume its good create = false; } else { if (old.getGuid().equals(oprocess.guid)) { // Guids match, no need to create create = false; } else { // GUIDS dont match, delete and create new String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match " + oprocess.guid + "; replacing."; __log.debug(errmsg); old.delete(); } } } if (create) { ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, (int) version); for (String correlator : oprocess.getCorrelators()) { newDao.addCorrelator(correlator); } } } catch (BpelEngineException ex) { throw ex; } catch (Exception dce) { __log.error("DbError", dce); throw new BpelEngineException("DbError", dce); } } private class HydrationLatch extends NStateLatch { HydrationLatch() { super(new Runnable[2]); _transitions[0] = new Runnable() { public void run() { doDehydrate(); } }; _transitions[1] = new Runnable() { public void run() { doHydrate(); } }; } private void doDehydrate() { _oprocess = null; _partnerRoles = null; _myRoles = null; _endpointToMyRoleMap = null; _replacementMap = null; _expLangRuntimeRegistry = null; } private void doHydrate() { markused(); __log.debug("Rehydrating process " + _pconf.getProcessId()); try { _oprocess = deserializeCompiledProcess(_pconf.getCBPInputStream()); } catch (Exception e) { String errmsg = "Error reloading compiled process " + _pid + "; the file appears to be corrupted."; __log.error(errmsg); throw new BpelEngineException(errmsg, e); } _replacementMap = new ReplacementMapImpl(_oprocess); // Create an expression language registry for this process _expLangRuntimeRegistry = new ExpressionLanguageRuntimeRegistry(); registerExprLang(_oprocess); setRoles(_oprocess); initExternalVariables(); if (!_hydratedOnce) { for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) { // Null for initializePartnerRole = false if (prole._initialPartner != null) { PartnerRoleChannel channel = _engine._contexts.bindingContext.createPartnerRoleChannel(_pid, prole._plinkDef.partnerRolePortType, prole._initialPartner); prole._channel = channel; _partnerChannels.put(prole._initialPartner, prole._channel); EndpointReference epr = channel.getInitialEndpointReference(); if (epr != null) { prole._initialEPR = epr; _partnerEprs.put(prole._initialPartner, epr); } __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is " + prole._initialEPR); } } _hydratedOnce = true; } for (PartnerLinkMyRoleImpl myrole : _myRoles.values()) { myrole._initialEPR = _myEprs.get(myrole._endpoint); } for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) { prole._channel = _partnerChannels.get(prole._initialPartner); if (_partnerEprs.get(prole._initialPartner) != null) { prole._initialEPR = _partnerEprs.get(prole._initialPartner); } } if (isInMemory()) { bounceProcessDAO(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess); } else if (_engine._contexts.scheduler.isTransacted()) { // If we have a transaction, we do this in the current transaction. bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess); } else { // If we do not have a transaction we need to create one. try { _engine._contexts.scheduler.execIsolatedTransaction(new Callable<Object>() { public Object call() throws Exception { bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess); return null; } }).get(); // needs to be synchronous } catch (Exception ex) { String errmsg = "DbError"; __log.error(errmsg, ex); throw new BpelEngineException(errmsg, ex); } } } } private void registerExprLang(OProcess oprocess) { for (OExpressionLanguage elang : oprocess.expressionLanguages) { try { _expLangRuntimeRegistry.registerRuntime(elang); } catch (ConfigurationException e) { String msg = __msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, elang.properties); __log.error(msg, e); throw new BpelEngineException(msg, e); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -