📄 priorizeddispatchplugin.java
字号:
/*------------------------------------------------------------------------------Name: PriorizedDispatchPlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.dispatch.plugins.prio;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.property.I_PropertyChangeListener;import org.xmlBlaster.util.property.PropertyChangeEvent;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor;import org.xmlBlaster.util.dispatch.DispatchManager;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.plugin.I_Plugin;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.error.MsgErrorInfo;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import java.util.ArrayList;import java.util.Map;import java.util.HashMap;import java.util.Iterator;/** * This dispatcher plugin allows to control how messages are sent to the remote side. * <p> * We subscribe to a status message which describes the current connection to the remote side. * Depending on a status message we pick messages with specific priorities and send only these. * </p> * <p> * This allows for example to send all messages if a 2MBit connection is up, and send * only high priority messages when the line drops to 64kBit. * </p> * <p> * The class ConfigurationParser Javadoc has an xml example of the configuration * </p> * <p> * This plugin class has only one instance per typeVersion for each Global scope. * The xmlBlaster client used to subscribe to the status messages is a singleton (in Global.instance() scope). * </p> * <p> * Note that two status sources exist: * </p> * <ol> * <li>The state of the connection of the dispatcher framework, it may for example be <i>POLLING</i> * for a remote connection or have an <i>ALIVE</i> state or even be in <i>DEAD</i> state. * If a configuration is found for this state it has highest precedence. * </li> * <li> * The status of a status message from outside. This is freely configurable and is for example <i>2M</i> * or <i>BACKUP</i>. This status message has lower precedence. * </li> * </ol> * @see org.xmlBlaster.util.dispatch.plugins.prio.ConfigurationParser * @see org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html" target="others">the dispatch.control.plugin requirement</a> * @author xmlBlaster@marcelruff.info */public final class PriorizedDispatchPlugin implements I_MsgDispatchInterceptor, I_Plugin, I_PropertyChangeListener, I_Notify{ private String ME = "PriorizedDispatchPlugin"; private Global glob; private static Logger log = Logger.getLogger(PriorizedDispatchPlugin.class.getName()); private ConfigurationParser parser = null; public static final String CONFIG_PROPERTY_KEY = "PriorizedDispatchPlugin/config"; private String specificConfigPropertyKey = null; private boolean hasSpecificConf = false; /** This is the configuration for the current status of the last received status message: */ private StatusConfiguration currMsgStatusConfiguration; private String currMsgStatus; private boolean hasDefaultActionOnly = true; // cache for performance private XmlBlasterNativeClient xmlBlasterClient; private Map dispatchManagerEntryMap = new HashMap(); private boolean isShutdown = false; public DispatchAction QUEUE_ACTION; /** * Is called by DispatchPluginManager after the instance is created. * @see I_MsgDispatchInterceptor#initialize(Global, String) */ public void initialize(Global glob, String typeVersion) throws XmlBlasterException { this.glob = glob; String sessionId = null; // !!!! In future needed for native access? synchronized(this) { // We only have one status client in the Global scope Object obj = glob.getObjectEntry("PriorizedDispatchPlugin.xmlBlasterAccess"); if (obj == null) { obj = new XmlBlasterNativeClient(glob, this, sessionId); glob.addObjectEntry("PriorizedDispatchPlugin.xmlBlasterAccess", obj); } xmlBlasterClient = (XmlBlasterNativeClient)obj; } this.QUEUE_ACTION = new DispatchAction(glob, DispatchAction.QUEUE); // Initialize this constant for later usage // Subscribe for configuration properties // "PriorizedDispatchPlugin/config[Priority,1.0]" has precedence over "PriorizedDispatchPlugin/config" // Note: This fires an initial event to statusChanged("startup") this.specificConfigPropertyKey = CONFIG_PROPERTY_KEY + "[" + typeVersion + "]"; this.glob.getProperty().addPropertyChangeListener(CONFIG_PROPERTY_KEY, "startup", this); this.glob.getProperty().addPropertyChangeListener(this.specificConfigPropertyKey, "startup", this); log.info("Succefully initialized"); } /** * This is called once for each dispatch manager using this plugin. */ public void addDispatchManager(DispatchManager dispatchManager) { DispatchManagerEntry managerEntry = new DispatchManagerEntry(dispatchManager); synchronized (this) { this.dispatchManagerEntryMap.put(dispatchManager, managerEntry); changeManagerState(dispatchManager, dispatchManager.getDispatchConnectionsHandler().getState(), false); } //flushHoldbackQueue(managerEntry); log.info("Stored dispatchManager=" + dispatchManager.getId() + ", dispatchManagerEntryMap.size()=" + dispatchManagerEntryMap.size()); } /** * Invoked when the configuration <i>PriorizedDispatchPlugin/config</i> has changed. * Supports changing configuration in hot operation. */ public void propertyChanged(PropertyChangeEvent ev) { if (log.isLoggable(Level.FINE)) log.fine("propertyChanged event: " + ev.toString()); String newConfig = ev.getNewValue(); if (newConfig == null || newConfig.equals("startup")) { // && ev.getOldValue() == null) if (this.parser != null) return; // Ignore startup events without any setting // We need to initialize this.parser so we proceed with default setting newConfig = "<msgDispatch/>"; } if (this.specificConfigPropertyKey.equals(ev.getKey())) hasSpecificConf = true; if (hasSpecificConf && CONFIG_PROPERTY_KEY.equals(ev.getKey())) return; // Ignore unspecific configuration synchronized (this) { ConfigurationParser oldParser = this.parser; StatusConfiguration oldConf = this.currMsgStatusConfiguration; boolean oldDef = this.hasDefaultActionOnly; try { // Parse and set the new configuration ... this.parser = new ConfigurationParser(this.glob, newConfig); } catch (XmlBlasterException e) { log.severe("The new property '" + ev.toString() + " is ignored: " + e.getMessage()); return; } try { // Now subscribe to messages according to new configuration ... subscribeStatusMessages(); // Activate ... statusChanged(this.currMsgStatus); log.info("Reconfigured priorized dispatch plugin with '" + ev.getKey() + "', currMsgStatus=" + this.currMsgStatus); } catch (XmlBlasterException e) { log.severe("The new property '" + ev.toString() + " is ignored: " + e.getMessage()); // rollback ... this.parser = oldParser; this.currMsgStatusConfiguration = oldConf; this.hasDefaultActionOnly = oldDef; if (this.parser != null) { try { subscribeStatusMessages(); } catch (XmlBlasterException e2) { log.severe("Rollback to old configuration failed: "+ e2.getMessage()); } statusChanged(this.currMsgStatus); } } } } /** * Subscribe to messages according to the current configuration. */ private void subscribeStatusMessages() throws XmlBlasterException { this.xmlBlasterClient.unSubscribeStatusMessages(this); // cleanup first Iterator it = this.parser.getStatusConfigurationMap().values().iterator(); while (it.hasNext()) { StatusConfiguration conf = (StatusConfiguration)it.next(); this.xmlBlasterClient.subscribeToStatusMessage(conf.getOid(), this); } } /** * Enforced by I_Plugin * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global, PluginInfo) */ public void init(org.xmlBlaster.util.Global glob, PluginInfo pluginInfo) { //java.util.Properties props = pluginInfo.getParameters(); } /** * Enforced by I_Plugin * @return "Priority" */ public String getType() { return "Priority"; } /** * Enforced by I_Plugin * @return "1.0" */ public final String getVersion() { return "1.0"; } /** * Changing the status of the dispatch strategy. * <p> * Enforced by I_Notify * </p> * On initialize: addPropertyChangeListener(this.CONFIG_PROPERTY_KEY, "startup", this); * an initial event is fired an calls this method to initialize all attributes here */ public final void statusChanged(String status) { if (log.isLoggable(Level.FINE)) log.fine("statusChanged(status=" + status + ")"); synchronized (this) { String oldStatus = this.currMsgStatus; this.currMsgStatus = status; this.currMsgStatusConfiguration = parser.getStatusConfiguration(currMsgStatus); this.hasDefaultActionOnly = this.currMsgStatusConfiguration.defaultActionOnly(); // cache for performance log.info("Changed priorized dispatch from old status=" + oldStatus + " to new status=" + this.currMsgStatus); if ((oldStatus==null&&this.currMsgStatus!=null) || (oldStatus!=null && !oldStatus.equals(this.currMsgStatus))) { Iterator it = this.dispatchManagerEntryMap.values().iterator(); while (it.hasNext()) { DispatchManagerEntry managerEntry = (DispatchManagerEntry)it.next(); managerEntry.setCurrConnectionStateConfiguration(parser.getStatusConfiguration(managerEntry.getCurrConnectionState())); flushHoldbackQueue(managerEntry); } } } //dispatchManager.activateDispatchWorker(); } /** * Lookup the corresponding DispatchAction object this message priority. */ private final DispatchAction getDispatchAction(DispatchManagerEntry managerEntry, MsgQueueEntry entry) { if (managerEntry.getCurrConnectionStateConfiguration() != null) { // Dispatcher state has precedence return managerEntry.getCurrConnectionStateConfiguration().getDispatchAction(entry.getPriorityEnum()); } return this.currMsgStatusConfiguration.getDispatchAction(entry.getPriorityEnum()); } /** * Called when new messages are available. * @see I_MsgDispatchInterceptor#doActivate(DispatchManager) */ public final boolean doActivate(DispatchManager dispatchManager) { return true; // The DispatchManager knows what and why it does it /* if (dispatchManager.getNotifyCounter() > 0 && dispatchManager.getQueue().getNumOfEntries() > 0) { if (log.isLoggable(Level.FINE)) log.trace(ME, "doAvtivate -> true: notifyCounter=" + dispatchManager.getNotifyCounter() + " currEntries=" + dispatchManager.getQueue().getNumOfEntries()); return true; } return false; */ } /** * Enforced by I_MsgDispatchInterceptor. * <p> * NOTE: When copying entries from one queue to another one we have * to take care that the reference counter in msgUnitStore is not temporary zero (and is * garbage collected). This is avoided by a peek() and later remove() - which is * necessary for persistent messages anyhow to ensure 100% crash safety. * </p> * @see I_MsgDispatchInterceptor#handleNextMessages(DispatchManager, ArrayList) */ public final ArrayList handleNextMessages(DispatchManager dispatchManager, ArrayList entries) throws XmlBlasterException { // take messages from queue (none blocking) ... ArrayList entryList = dispatchManager.getQueue().peekSamePriority(-1, -1L); // filter expired entries etc. ... // you should always call this method after taking messages from queue entryList = dispatchManager.prepareMsgsFromQueue(entryList); DispatchManagerEntry managerEntry = getDispatchManagerEntry(dispatchManager); if (managerEntry == null) { String text = "Internal error: can't queue " + ((entries==null) ? 0 : entries.size()) + " messages, dispatchManager=" + dispatchManager + " is unknown, dispatchManagerEntryMap.size()=" + ((dispatchManagerEntryMap==null) ? 0 : dispatchManagerEntryMap.size()); log.severe(text);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -