📄 simpleexpressionpool.java
字号:
return; } if (fe.getParent() == GoneParentId.GONE_PARENT_ID) { //log.debug("replyToParent() gone parent at "+fe.getId()); log(fe.getId(), wi, History.EVT_GONE_PARENT, ""); this.removeExpression(fe); return; } // // end of a subflow if ( ! fe.getId().getWorkflowInstanceId() .equals(fe.getParent().getWorkflowInstanceId())) { //log.debug("replyToParent() end of subflow at "+fe.getId()); log(fe.getId(), wi, History.EVT_FLOW_END, "sub"); } // // just replying to the parent //log.debug // ("replyToParent() \n"+fe.getId()+ // "\n replies to\n"+fe.getParent()); if (fe.getId().getEngineId().equals(fe.getParent().getEngineId())) reply(fe.getParent(), wi); else replyToRemoteParent(fe.getParent(), wi); // // remove from pool this.removeExpression(fe); } /** * This method is called when the parent expression is located in another * engine. It's the key to distributed workflows with OpenWFE. */ protected void replyToRemoteParent (final FlowExpressionId fei, final InFlowWorkItem wi) throws ReplyException { log(fei, wi, History.EVT_DEBUG, "replying (to remote parent)"); final ParticipantMap pMap = Definitions.getParticipantMap(getContext()); final Participant parentEngine = pMap.get(fei.getEngineId()); if (parentEngine == null) { //log.warn // ("replyToRemoteParent() no remote engine named '"+ // fei.getEngineId()+"'"); throw new ReplyException ("No remote engine named '"+ fei.getEngineId()+"'"); } try { parentEngine.dispatch(wi); } catch (final DispatchingException de) { throw new ReplyException ("Failed to reply to remote engine '"+fei.getEngineId()+"'"); } } /** * Replies to an expression. * This method should only be called by listeners */ public void reply (final FlowExpressionId fei, final InFlowWorkItem wi) throws ReplyException { log(fei, wi, History.EVT_DEBUG, "replying"); final FlowExpression fe = fetch(fei); if (fe == null) { throw new ReplyException ("Cannot reply : expression not found "+fei); } //fe.reply(wi); getState(fe).reply(wi); fe.tag(wi); } /** * Releases a flow expression. * This method is called by the replyToFather method, as the * expression tree gets diminished. */ public void removeExpression (final FlowExpression fe) { log.debug("removeExpression() "+fe.getId()); try { if (getStore() != null) getStore().unstoreExpression(fe); } catch (PoolException pe) { log.warn ("Failed to unstore expression", pe); } } /** * Releases a flow expression indicated by its id. */ public void removeExpression (FlowExpressionId fei) { log.debug("removeExpression(fei)"); final FlowExpression fe = this.fetch(fei); this.removeExpression(fe); } /** * Will return true if there is still a FlowExpression stored for the * given FlowExpressionId. * Might get useful for a UI that has to determine if a workitem belongs * to an active or a dead branch of a workflow instance. */ public boolean isExpressionActive (final FlowExpressionId fei) { // // is it in store ? return getStore().isExpressionActive(fei); } /** * (leaf cancel) Cancels an expression (simply calls its cancel method and * removes it from the pool). * Will return a workitem if the cancelled expression was handling one. */ public InFlowWorkItem childCancel (final FlowExpressionId fei) throws ApplyException { log.debug("childCancel() "+fei); final FlowExpression fe = fetch(fei); if (fe == null) return null; final InFlowWorkItem wi = getState(fe).cancel(); if (fe.getPrevious() != null) childCancel(fe.getPrevious()); this.removeExpression(fe); return wi; } /** * (root cancel) Cancels a whole branch, starting with its top expression * fei. */ public void cancel (final FlowExpressionId fei) { log.debug("cancel() "+fei); final FlowExpression fe = fetch(fei); if (fe == null) return; log(fei, null, History.EVT_CANCEL_EXPRESSION, "cancelling exp"); try { // // rebinding if (fe.getPrevious() != null) setNext(fe.getPrevious(), fe.getNext()); if (fe.getNext() != null) setPrevious(fe.getNext(), fe.getPrevious()); // // cancelling //final InFlowWorkItem wi = leafCancel(fei); final InFlowWorkItem wi = getState(fe).cancel(); if (wi != null) // // taking care of resuming the flow { this.replyToParent(fe, wi); } this.removeExpression(fe); } catch (final Throwable t) { log.warn ("cancel() Failed to cancel an expression, "+ "removing it from pool anyway.", t); } } /** * Forgets an expression (it stays in the pool, but its reference to its * parent expression is replaced by a 'gone-parent' tag, so that its * execution continues but its results will get forgotten (never reaching * the parent flow)). */ public void forget (final FlowExpressionId fei) { final FlowExpression fe = fetch(fei); if (fe == null) return; fe.setParent(GoneParentId.GONE_PARENT_ID); fe.storeItself(); } /** * This implementation simply delegates the work to the underlying * store's contentIterator() method. */ public java.util.Iterator contentIterator (final Class assignClass) { return getStore().contentIterator(assignClass); } // // purge and when methods /** * This method checks every expressions stored and reply to those * that are ParticipantExpression instances and whose timeout has come. */ protected synchronized void purge () { //synchronized (this) //{ final long startTime = System.currentTimeMillis(); final java.util.Set timedOutExpressions = new java.util.HashSet(); log.info("purge() starting..."); java.util.Iterator it = contentIterator(ExpressionWithTimeOut.class); while (it.hasNext()) { FlowExpression fe = (FlowExpression)it.next(); if (fe.getApplyTime() == null) continue; ExpressionWithTimeOut toe = (ExpressionWithTimeOut)fe; // // some expressions may have their timeout disabled if ( ! toe.isTimeOutActivated()) continue; // // gather participant expression who timed out if (fe.getTimeSinceApplied() > toe.determineTimeOut()) { timedOutExpressions.add(toe); } } // // trigger their replies (outside of sync scope) it = timedOutExpressions.iterator(); while (it.hasNext()) { Object o = it.next(); ExpressionWithTimeOut toe = (ExpressionWithTimeOut)o; FlowExpression fe = (FlowExpression)o; try { log.debug("purge() Timeout reply for "+fe.getId()); toe.timeOutReply(); } catch (ReplyException re) { log.warn("purge() Failed to reply to "+fe.getId(), re); } } long elapsedTime = ((System.currentTimeMillis() - startTime) / 1000L); log.info("purge() Purge ends. It took "+elapsedTime+"s."); //} } /** * iterate through when expressions and poll them */ protected synchronized void checkExpressionsWithTimer () { log.debug("checkExpressionsWithTimer()"); //synchronized (this) //{ final java.util.Iterator it = contentIterator(ExpressionWithTimer.class); while (it.hasNext()) { FlowExpression fe = null; try { //final ExpressionWithTimer ewt = (ExpressionWithTimer)it.next(); fe = (FlowExpression)it.next(); if (fe instanceof RawExpression) continue; //log.debug // ("checkExpressionsWithTimer() checking "+ // ((FlowExpression)ewt).getId()); ((ExpressionWithTimer)fe).check(); } catch (final Throwable t) { log.warn ("checkExpressionsWithTimer() "+ "check failure on "+fe.getId(), t); } } //log.debug("checkExpressionsWithTimer() about to quit..."); //} //log.debug("checkExpressionsWithTimer() quitting."); } /** * Sets the 'next' field of the 'subject' expression. */ public void setNext (final FlowExpressionId subject, final FlowExpressionId next) throws PoolException { final FlowExpression fe = fetch(subject); fe.setNext(next); } /** * Sets the 'previous' field of the 'subject' expression. */ public void setPrevious (final FlowExpressionId subject, final FlowExpressionId previous) throws PoolException { final FlowExpression fe = fetch(subject); fe.setPrevious(previous); } // // CONTROL METHODS private boolean isSecurityManagerPresent () { try
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -