📄 correlatedputstrategy.java
字号:
/* * Copyright (c) 2005, John Mettraux, OpenWFE.org * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * . Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * . Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * . Neither the name of the "OpenWFE" nor the names of its contributors may be * used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * $Id: CorrelatedPutStrategy.java,v 1.8 2005/07/28 14:58:10 jmettraux Exp $ *///// DefaultPutStrategy.java//// john.mettraux@openwfe.org//// generated with // jtmpl 1.1.01 2004/05/19 (john.mettraux@openwfe.org)//package openwfe.org.worklist.impl.store;import openwfe.org.MapUtils;import openwfe.org.ApplicationContext;import openwfe.org.xml.XmlCoder;import openwfe.org.engine.workitem.InFlowWorkItem;import openwfe.org.engine.workitem.Attribute;import openwfe.org.engine.workitem.StringAttribute;import openwfe.org.engine.workitem.StringMapAttribute;import openwfe.org.engine.expressions.FlowExpressionId;import openwfe.org.engine.participants.Participant;import openwfe.org.worklist.store.Lock;import openwfe.org.worklist.store.PutStrategy;import openwfe.org.worklist.store.WorkItemStore;import openwfe.org.worklist.store.StoreException;/** * This implementation takes care of correlating workitems : * * <p><font size=2>CVS Info : * <br>$Author: jmettraux $ * <br>$Id: CorrelatedPutStrategy.java,v 1.8 2005/07/28 14:58:10 jmettraux Exp $ </font> * * @author john.mettraux@openwfe.org */public class CorrelatedPutStrategy extends AbstractStoreStrategy implements PutStrategy{ private final static org.apache.log4j.Logger log = org.apache.log4j.Logger .getLogger(CorrelatedPutStrategy.class.getName()); // // CONSTANTS & co /** * The parameter 'cacheFileName' indicates to this strategy where it should * save its state. */ public final static String P_CACHE_FILE_NAME = "correlationCacheFileName"; private final static String DEFAULT_CACHE_FILE_NAME = openwfe.org.Utils.getTempDir()+"/"+ CorrelatedPutStrategy.class.getName()+"__cache.xml"; /** * The parameter 'correlationSlaveStoreName' tells the strategy where * to store the pending slave workitems (the ones waiting to get * inserted in their master workitem). * If this parameter is not specified, the slave store name will be the * regular storename suffixed with "_cslaves". */ public final static String P_SLAVE_STORE_NAME = "correlationSlaveStoreName"; /** * '__correlation_id__' : a workitem with such an attribute is said to be * correlated and belongs to the correlation class as other workitems * with the same value (a StringAttribute) as correlation id. */ public final static String CORRELATION_ID = "__correlation_id__"; /** * '__correlation_field__' : a correlated workitem with this attribute set * is said to be a 'slave correlated workitem', a correlated workitem * without this attribute is a 'master'. There can be only one master * per correlation class. */ public final static String CORRELATION_FIELD = "__correlation_field__"; /** * If the slave workitem has its attribute '__correlation_bounce__' set * to a string [attribute] value of 'true', this strategy will immediately * reply (forward) the slave workitem (after having kept a copy of it). * Else, the reply will occur only if (or when) the master workitem has * arrived / arrives. */ public final static String CORRELATION_BOUNCE = "__correlation_bounce__"; // // FIELDS private String cacheFileName = null; /* keeping track of the master workitems' FlowExpressionId */ private java.util.Map masters = null; /* storing the slaves whose master has not yet arrived */ private java.util.Map slaves = null; private String slaveStoreName = null; // // CONSTRUCTORS /** * Performs the initialization of the strategy (give it enough * info to do its task). */ public void init (final ApplicationContext context, final WorkItemStore store, final java.util.Map storeParams, final String storageName) throws StoreException { super.init(context, store, storeParams, storageName); this.cacheFileName = MapUtils.getAsString (storeParams, P_CACHE_FILE_NAME, DEFAULT_CACHE_FILE_NAME); log.debug("init() using cache at "+this.cacheFileName); this.slaveStoreName = MapUtils.getAsString (storeParams, P_SLAVE_STORE_NAME, store.getName()+"_cslaves"); log.debug("init() slaveStoreName set to "+this.slaveStoreName); loadState(); } // // METHODS private void loadState () { final java.io.File f = new java.io.File(this.cacheFileName); if ( ! f.exists()) { log.debug ("loadState() nothing to load, no file at "+this.cacheFileName); this.masters = new java.util.HashMap(); this.slaves = new java.util.HashMap(); return; } try { final java.util.Map state = (java.util.Map)XmlCoder.load(this.cacheFileName); this.masters = (java.util.Map)state.get("masters"); this.slaves = (java.util.Map)state.get("slaves"); log.debug("loadState() done."); } catch (final Exception e) { log.warn("loadState() failure with file "+this.cacheFileName, e); } } private void saveState () { log.debug("saveState()"); final java.util.Map state = new java.util.HashMap(2); state.put("masters", this.masters); state.put("slaves", this.slaves); try { XmlCoder.save(this.cacheFileName, state); } catch (final Exception e) { log.warn ("saveState() failed to save state. "+ "Will lose it at system shutdown.", e); } } // // METHODS from PutStrategy /** * Checks wether the incoming workitem is correlated (slave or master) or * not correlated at all, then takes the appropriate action. */ public void put (final InFlowWorkItem wi) throws StoreException { final String cId = wi.getAttributes().sget(CORRELATION_ID); if (cId != null) { final String cField = wi.getAttributes().sget(CORRELATION_FIELD); //synchronized (this) //{ if (cField != null) receiveSlave(cId, cField, wi); else receiveMaster(cId, wi); //} return; } // // no correlation, behaving like a regular put strategy... getStorage().storeWorkItem(getStoreName(), wi); } private void receiveSlave (final String cId, final String cField, final InFlowWorkItem wi) throws StoreException { log.debug("receiveSlave() for '"+cId+"' (field : '"+cField+"')"); final FlowExpressionId masterId = (FlowExpressionId)this.masters.get(cId); InFlowWorkItem masterWi = null; if (masterId != null) { try { masterWi = getStore().getAndLock(null, masterId); } catch (final StoreException se) { this.masters.remove(cId); log.debug("receiveSlave() did not find master"); } } if (masterId == null || masterWi == null) // // master hasn't arrived yet { log.debug("receiveSlave() master hasn't arrived yet."); getStorage().storeWorkItem(this.slaveStoreName, wi); putSlave(cId, cField, wi.getLastExpressionId()); saveState(); if (shouldBounceDirectly(wi)) asyncBounce(wi); return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -