📄 correlatedputstrategy.java
字号:
log.debug("receiveSlave() master has already arrived"); // // master already found merge(cField, masterWi, wi); try { getStorage().storeWorkItem(getStoreName(), masterWi); } catch (final StoreException se) { throw new StoreException ("Failed to re-store master after insertion of "+ "correlated slave's attributes", se); } // // release master workitem getStore().release(null, masterId); // // bounce asynchronously asyncBounce(wi); } private void putSlave (final String correlId, final String correlField, final FlowExpressionId slaveWorkitemId) { java.util.List sList = (java.util.List)this.slaves.get(correlId); if (sList == null) { sList = new java.util.ArrayList(7); this.slaves.put(correlId, sList); } sList.add(new SlaveEntry(correlField, slaveWorkitemId)); } /* private InFlowWorkItem getAndLockMaster (final FlowExpressionId masterId) throws StoreException { final Lock masterLock = getStore().getLock(masterId); if (masterLock == null) { //log.debug("getAndLockMaster() no need to wait, master locked."); return getStore().getAndLock(null, masterId); } synchronized (masterLock) { try { log.debug("getAndLockMaster() waiting on the lock to open"); masterLock.wait(); } catch (final InterruptedException ie) { log.warn("getAndLockMaster() caught interruped exception", ie); } log.debug("getAndLockMaster() getting and locking master..."); return getStore().getAndLock(null, masterId); } } */ /* private void releaseMaster (final FlowExpressionId masterId) throws StoreException { getStore().release(null, masterId); } */ private void receiveMaster (final String cId, final InFlowWorkItem wi) throws StoreException { log.debug("receiveMaster() for '"+cId+"'"); //if (this.masters.get(cId) != null) //{ // log.warn // ("receiveMaster() there is already a master for id '"+cId+ // "'. Not registering later one as master."); // return; //} this.masters.put(cId, wi.getLastExpressionId()); // // maybe there are pending slaves final java.util.List slaveIds = (java.util.List)this.slaves.get(cId); if (slaveIds != null) { final java.util.Iterator it = slaveIds.iterator(); while (it.hasNext()) { final SlaveEntry se = (SlaveEntry)it.next(); final InFlowWorkItem slaveWi = getStorage() .retrieveWorkItem(this.slaveStoreName, se.workitemId); merge(se.cField, wi, slaveWi); getStorage() .removeWorkItem(this.slaveStoreName, se.workitemId); // // reply with slaveWi //if ( ! shouldBounceDirectly(slaveWi)) bounce(slaveWi); if ( ! shouldBounceDirectly(slaveWi)) asyncBounce(slaveWi); } this.slaves.remove(cId); saveState(); } // // storing master getStorage().storeWorkItem(getStoreName(), wi); } private void merge (final String correlationField, final InFlowWorkItem masterWi, final InFlowWorkItem slaveWi) { final StringMapAttribute result = new StringMapAttribute(); final java.util.Iterator it = slaveWi.getAttributes().keySet().iterator(); while (it.hasNext()) { final StringAttribute key = (StringAttribute)it.next(); final String skey = key.toString(); if (skey.equals(CORRELATION_ID) || skey.equals(CORRELATION_FIELD) || skey.equals(CORRELATION_BOUNCE)) { continue; } result.put(key, (Attribute)slaveWi.getAttributes().get(key)); } masterWi.getAttributes().put(correlationField, result); } /* * Should this slave workitem be bounced back just after its arrival ? */ private boolean shouldBounceDirectly (final InFlowWorkItem wi) { final String sBounce = wi.getAttributes().sget(CORRELATION_BOUNCE); if (sBounce == null) return false; return (sBounce.equalsIgnoreCase("true")); } /* * Forwards a workitem back to its emitting engine (participant expression) */ private void bounce (final InFlowWorkItem wi) throws StoreException { log.debug("bounce() boucing back "+wi.getLastExpressionId()); final String engineId = wi.getLastExpressionId().getEngineId(); final Participant pEngine = openwfe.org.engine.Definitions .getParticipantMap(((openwfe.org.Service)getStore()).getContext()) .get(engineId); try { pEngine.dispatch(wi); } catch (final Exception e) { throw new StoreException ("Failed bounce slave correlated workitem", e); } log.debug("bounce() bounced "+wi.getLastExpressionId()); } /* * bounce() but in its own thread */ private void asyncBounce (final InFlowWorkItem wi) { (new Thread() { public void run () { try { bounce(wi); } catch (final StoreException se) { log.warn ("Failed to bounce wi back "+wi.getLastExpressionId(), se); } } }).start(); } /** * Removes a workitem from the underlying storage. */ public void remove (final FlowExpressionId id) throws StoreException { if (this.masters.containsValue(id)) { final java.util.Iterator it = this.masters.keySet().iterator(); while (it.hasNext()) { final String correlationId = (String)it.next(); final FlowExpressionId masterId = (FlowExpressionId)this.masters.get(correlationId); if (masterId.equals(id)) { this.masters.remove(correlationId); log.debug ("remove() removed master for correlationId '"+ correlationId+"'"); break; } } } getStorage().removeWorkItem(getStoreName(), id); } // // STATIC METHODS // // INNER CLASSES protected static class SlaveEntry { /* correlation field */ private String cField = null; /* slave workitem id */ private FlowExpressionId workitemId = null; public SlaveEntry (final String cField, final FlowExpressionId workitemId) { this.cField = cField; this.workitemId = workitemId; } public String getCorrelationField () { return this.cField; } public FlowExpressionId getWorkitemId () { return this.workitemId; } public void setCorrelationField (final String cField) { this.cField = cField; } public void setWorkitemId (final FlowExpressionId fei) { this.workitemId = fei; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -