📄 dispatchmanager.java
字号:
/*------------------------------------------------------------------------------Name: DispatchManager.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.dispatch;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.error.I_MsgErrorHandler;import org.xmlBlaster.util.error.MsgErrorInfo;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.plugin.PluginManagerBase;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_QueuePutListener;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;import org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor;import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;import org.xmlBlaster.util.property.PropString;import java.util.ArrayList;import java.util.HashSet;/** * Manages the sending of messages and commands and does error recovery * further we communicate with the dispatcher plugin if one is configured. * <p /> * There is one instance of this class per queue and remote connection. * @author xmlBlaster@marcelruff.info */public final class DispatchManager implements I_Timeout, I_QueuePutListener{ public final String ME; private final Global glob; private static Logger log = Logger.getLogger(DispatchManager.class.getName()); private final I_Queue msgQueue; private final DispatchConnectionsHandler dispatchConnectionsHandler; private final I_MsgErrorHandler failureListener; private final I_MsgSecurityInterceptor securityInterceptor; private final I_MsgDispatchInterceptor msgInterceptor; private HashSet connectionStatusListeners; private final String typeVersion; /** If > 0 does burst mode */ private long collectTime = -1L; private long toAliveTime = 0; private long toPollingTime = 0; private boolean dispatchWorkerIsActive = false; /** The worker for synchronous invocations */ private DispatchWorker syncDispatchWorker; private Timestamp timerKey = null; private int notifyCounter = 0; private boolean isShutdown = false; private boolean isSyncMode = false; private boolean trySyncMode = false; // true: client side queue embedding, false: server side callback queue private boolean inAliveTransition = false; private final Object ALIVE_TRANSITION_MONITOR = new Object(); private int burstModeMaxEntries = -1; private long burstModeMaxBytes = -1L; /** async delivery is activated only when this flag is 'true'. Used to temporarly inhibit dispatch of messages */ private boolean dispatcherActive = true; private SessionName sessionName; /** * @param msgQueue The message queue which i use (!!! TODO: this changes, we should pass it on every method where needed) * @param connectionStatusListener The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java), or null * @param addrArr The addresses i shall connect to */ public DispatchManager(Global glob, I_MsgErrorHandler failureListener, I_MsgSecurityInterceptor securityInterceptor, I_Queue msgQueue, I_ConnectionStatusListener connectionStatusListener, AddressBase[] addrArr, SessionName sessionName) throws XmlBlasterException { if (failureListener == null || msgQueue == null) throw new IllegalArgumentException("DispatchManager failureListener=" + failureListener + " msgQueue=" + msgQueue); this.ME = msgQueue.getStorageId().getId(); this.glob = glob; this.sessionName = sessionName; if (log.isLoggable(Level.FINE)) log.fine(ME+": Loading DispatchManager ..."); this.msgQueue = msgQueue; this.failureListener = failureListener; this.securityInterceptor = securityInterceptor; this.dispatchConnectionsHandler = this.glob.createDispatchConnectionsHandler(this); this.connectionStatusListeners = new HashSet(); if (connectionStatusListener != null) this.connectionStatusListeners.add(connectionStatusListener); initDispatcherActive(addrArr); /* * Check i a plugin is configured ("DispatchPlugin/defaultPlugin") * If configured, the plugin instance is searched in the Global scope * and if none is found one is created (see DispatcherPluginManager) * Default server setting is to use no dispatcher plugin */ PropString propString = new PropString(PluginManagerBase.NO_PLUGIN_TYPE); // "undef"; if (addrArr != null && addrArr.length > 0) // Check if client wishes a specific plugin propString.setValue(addrArr[0].getDispatchPlugin()); this.typeVersion = propString.getValue(); this.msgInterceptor = glob.getDispatchPluginManager().getPlugin(this.typeVersion); // usually from cache if (log.isLoggable(Level.FINE)) log.fine(ME+": DispatchPlugin/defaultPlugin=" + propString.getValue() + " this.msgInterceptor=" + this.msgInterceptor); if (this.msgInterceptor != null) { this.msgInterceptor.addDispatchManager(this); if (log.isLoggable(Level.FINE)) log.fine(ME+": Activated dispatcher plugin '" + this.typeVersion + "'"); } this.msgQueue.addPutListener(this); // to get putPre() and putPost() events this.dispatchConnectionsHandler.initialize(addrArr); } /** * @return Never null */ public SessionName getSessionName() { return this.sessionName; } public boolean isSyncMode() { return this.isSyncMode; } /** * Set behavior of dispatch framework. * @param trySyncMode true: client side queue embedding, false: server side callback queue * defaults to false */ public void trySyncMode(boolean trySyncMode) { this.trySyncMode = trySyncMode; switchToSyncMode(); } /** * Reconfigure dispatcher with given properties. * * Note that only a limited re-configuration is supported * @param addressArr The new configuration */ public final void updateProperty(CallbackAddress[] addressArr) throws XmlBlasterException { initDispatcherActive(addressArr); this.dispatchConnectionsHandler.initialize(addressArr); } public void finalize() { removeBurstModeTimer(); if (log.isLoggable(Level.FINE)) log.fine(ME+": finalize - garbage collected"); } public I_Queue getQueue() { return this.msgQueue; } /* * Register yourself if you want to be informed about the remote connection status. * @param connectionStatusListener The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java) * @return true if we did not already contain the specified element. */ public synchronized boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener) { return this.connectionStatusListeners.add(connectionStatusListener); } public synchronized boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener, boolean fireInitial) { if (connectionStatusListener == null) return true; boolean ret = this.connectionStatusListeners.add(connectionStatusListener); if (fireInitial) { if (isDead()) connectionStatusListener.toAlive(this, ConnectionStateEnum.DEAD); else if (isPolling()) connectionStatusListener.toAlive(this, ConnectionStateEnum.POLLING); else connectionStatusListener.toAlive(this, ConnectionStateEnum.ALIVE); } return ret; } /** * Remove the given listener * @param connectionStatusListener * @return true if it was removed */ public synchronized boolean removeConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener) { return this.connectionStatusListeners.remove(connectionStatusListener); } public synchronized I_ConnectionStatusListener[] getConnectionStatusListeners() { if (this.connectionStatusListeners.size() == 0) return new I_ConnectionStatusListener[0]; return (I_ConnectionStatusListener[])this.connectionStatusListeners.toArray(new I_ConnectionStatusListener[this.connectionStatusListeners.size()]); } /** * The name in the configuration file for the plugin * @return e.g. "Priority,1.0" */ public String getTypeVersion() { return this.typeVersion; } /** * @return The import/export encrypt handle or null if created by a SubjectInfo (no session info available) */ public I_MsgSecurityInterceptor getMsgSecurityInterceptor() { return this.securityInterceptor; } /** * @return The handler of all callback plugins, is never null */ public final DispatchConnectionsHandler getDispatchConnectionsHandler() { return this.dispatchConnectionsHandler; } /** * How many messages maximum shall the callback thread take in one bulk out of the * callback queue and deliver in one bulk. */ public final int getBurstModeMaxEntries() { return this.burstModeMaxEntries; } /** * How many bytes maximum shall the callback thread take in one bulk out of the * callback queue and deliver in one bulk. */ public final long getBurstModeMaxBytes() { return this.burstModeMaxBytes; } /** * Get timestamp when we went to ALIVE state. * @return millis timestamp */ public final long getAliveSinceTime() { return this.toAliveTime; } /** * Get timestamp when we went to POLLING state. * @return millis timestamp */ public final long getPollingSinceTime() { return this.toPollingTime; } /** * Call by DispatchConnectionsHandler on state transition * NOTE: toAlive is called initially when a protocol plugin is successfully loaded * but we don't know yet if it ever is able to connect */ void toAlive(ConnectionStateEnum oldState) { if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to ALIVE"); // Remember the current collectTime AddressBase addr = this.dispatchConnectionsHandler.getAliveAddress(); if (addr == null) { log.severe(ME+": toAlive action has no alive address"); return; } try { this.inAliveTransition = true; if (this.toAliveTime <= this.toPollingTime) { this.toAliveTime = System.currentTimeMillis(); } this.burstModeMaxEntries = addr.getBurstModeMaxEntries(); this.burstModeMaxBytes = addr.getBurstModeMaxBytes(); synchronized (this.ALIVE_TRANSITION_MONITOR) { // 1. We allow a client to intercept and for example destroy all entries in the queue I_ConnectionStatusListener[] listeners = getConnectionStatusListeners(); for (int i=0; i<listeners.length; i++) { listeners[i].toAlive(this, oldState); } // 2. If a dispatch plugin is registered it may do its work if (this.msgInterceptor != null) this.msgInterceptor.toAlive(this, oldState); } } finally { this.inAliveTransition = false; } collectTime = addr.getCollectTime(); // burst mode if > 0L // 3. Deliver. Will be delayed if burst mode timer is activated, will switch to sync mode if necessary activateDispatchWorker(); } /** Call by DispatchConnectionsHandler on state transition */ void toPolling(ConnectionStateEnum oldState) { if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to POLLING"); if (this.toPollingTime <= this.toAliveTime) { this.toPollingTime = System.currentTimeMillis(); } switchToASyncMode(); // 1. We allow a client to intercept and for example destroy all entries in the queue I_ConnectionStatusListener[] listeners = getConnectionStatusListeners(); for (int i=0; i<listeners.length; i++) { listeners[i].toPolling(this, oldState); } // 2. If a dispatch plugin is registered it may do its work if (this.msgInterceptor != null) this.msgInterceptor.toPolling(this, oldState); } /** * * @param ex */ public void toDead(XmlBlasterException ex) { shutdownFomAnyState(ConnectionStateEnum.UNDEF, ex); } /** Call by DispatchConnectionsHandler on state transition */ void shutdownFomAnyState(ConnectionStateEnum oldState, XmlBlasterException ex) { if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to DEAD"); if (oldState == ConnectionStateEnum.DEAD) return; if (this.isShutdown) return; if (ex != null) { // Very dangerous code! The caller ends up with changed Exception type ex.changeErrorCode(ErrorCode.COMMUNICATION_NOCONNECTION_DEAD); } else { ex = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "Switch from " + oldState + " to DEAD, reason is not known"); } // 1. We allow a client to intercept and for example destroy all entries in the queue I_ConnectionStatusListener[] listeners = getConnectionStatusListeners(); for (int i=0; i<listeners.length; i++) { try { // Only pass original ex.getMessage() - not the changed errorCode listeners[i].toDead(this, oldState, ex.getMessage()); } catch (Throwable e) { e.printStackTrace(); } } // 2. If a dispatch plugin is registered it may do its work if (this.msgInterceptor != null) this.msgInterceptor.toDead(this, oldState, ex.getMessage()); if (oldState != ConnectionStateEnum.UNDEF) givingUpDelivery(ex); } private void givingUpDelivery(XmlBlasterException ex) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Entering givingUpDelivery(), state is " + this.dispatchConnectionsHandler.getState()); removeBurstModeTimer(); // The error handler flushed the queue and does error handling with them
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -