⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchmanager.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------Name:      DispatchManager.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.dispatch;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.error.I_MsgErrorHandler;import org.xmlBlaster.util.error.MsgErrorInfo;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.plugin.PluginManagerBase;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_QueuePutListener;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;import org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor;import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;import org.xmlBlaster.util.property.PropString;import java.util.ArrayList;import java.util.HashSet;/** * Manages the sending of messages and commands and does error recovery * further we communicate with the dispatcher plugin if one is configured. * <p /> * There is one instance of this class per queue and remote connection. * @author xmlBlaster@marcelruff.info */public final class DispatchManager implements I_Timeout, I_QueuePutListener{   public final String ME;   private final Global glob;   private static Logger log = Logger.getLogger(DispatchManager.class.getName());   private final I_Queue msgQueue;   private final DispatchConnectionsHandler dispatchConnectionsHandler;   private final I_MsgErrorHandler failureListener;   private final I_MsgSecurityInterceptor securityInterceptor;   private final I_MsgDispatchInterceptor msgInterceptor;   private HashSet connectionStatusListeners;   private final String typeVersion;   /** If > 0 does burst mode */   private long collectTime = -1L;   private long toAliveTime = 0;   private long toPollingTime = 0;   private boolean dispatchWorkerIsActive = false;   /** The worker for synchronous invocations */   private DispatchWorker syncDispatchWorker;   private Timestamp timerKey = null;   private int notifyCounter = 0;   private boolean isShutdown = false;   private boolean isSyncMode = false;   private boolean trySyncMode = false; // true: client side queue embedding, false: server side callback queue   private boolean inAliveTransition = false;   private final Object ALIVE_TRANSITION_MONITOR = new Object();   private int burstModeMaxEntries = -1;   private long burstModeMaxBytes = -1L;   /** async delivery is activated only when this flag is 'true'. Used to temporarly inhibit dispatch of messages */   private boolean dispatcherActive = true;   private SessionName sessionName;   /**    * @param msgQueue The message queue which i use (!!! TODO: this changes, we should pass it on every method where needed)    * @param connectionStatusListener The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java), or null    * @param addrArr The addresses i shall connect to    */   public DispatchManager(Global glob, I_MsgErrorHandler failureListener,                          I_MsgSecurityInterceptor securityInterceptor,                          I_Queue msgQueue, I_ConnectionStatusListener connectionStatusListener,                          AddressBase[] addrArr, SessionName sessionName) throws XmlBlasterException {      if (failureListener == null || msgQueue == null)         throw new IllegalArgumentException("DispatchManager failureListener=" + failureListener + " msgQueue=" + msgQueue);      this.ME = msgQueue.getStorageId().getId();      this.glob = glob;      this.sessionName = sessionName;      if (log.isLoggable(Level.FINE)) log.fine(ME+": Loading DispatchManager ...");      this.msgQueue = msgQueue;      this.failureListener = failureListener;      this.securityInterceptor = securityInterceptor;      this.dispatchConnectionsHandler = this.glob.createDispatchConnectionsHandler(this);      this.connectionStatusListeners = new HashSet();      if (connectionStatusListener != null) this.connectionStatusListeners.add(connectionStatusListener);      initDispatcherActive(addrArr);      /*       * Check i a plugin is configured ("DispatchPlugin/defaultPlugin")       * If configured, the plugin instance is searched in the Global scope       * and if none is found one is created (see DispatcherPluginManager)       * Default server setting is to use no dispatcher plugin       */      PropString propString = new PropString(PluginManagerBase.NO_PLUGIN_TYPE); // "undef";      if (addrArr != null && addrArr.length > 0) // Check if client wishes a specific plugin         propString.setValue(addrArr[0].getDispatchPlugin());      this.typeVersion = propString.getValue();      this.msgInterceptor = glob.getDispatchPluginManager().getPlugin(this.typeVersion); // usually from cache      if (log.isLoggable(Level.FINE)) log.fine(ME+": DispatchPlugin/defaultPlugin=" + propString.getValue() + " this.msgInterceptor="  + this.msgInterceptor);      if (this.msgInterceptor != null) {         this.msgInterceptor.addDispatchManager(this);         if (log.isLoggable(Level.FINE)) log.fine(ME+": Activated dispatcher plugin '" + this.typeVersion + "'");      }      this.msgQueue.addPutListener(this); // to get putPre() and putPost() events      this.dispatchConnectionsHandler.initialize(addrArr);   }   /**    * @return Never null    */   public SessionName getSessionName() {      return this.sessionName;   }   public boolean isSyncMode() {      return this.isSyncMode;   }   /**    * Set behavior of dispatch framework.    * @param trySyncMode true: client side queue embedding, false: server side callback queue    * defaults to false    */   public void trySyncMode(boolean trySyncMode) {      this.trySyncMode = trySyncMode;      switchToSyncMode();   }   /**    * Reconfigure dispatcher with given properties.    *    * Note that only a limited re-configuration is supported    * @param addressArr The new configuration    */   public final void updateProperty(CallbackAddress[] addressArr) throws XmlBlasterException {      initDispatcherActive(addressArr);      this.dispatchConnectionsHandler.initialize(addressArr);   }   public void finalize() {      removeBurstModeTimer();      if (log.isLoggable(Level.FINE)) log.fine(ME+": finalize - garbage collected");   }   public I_Queue getQueue() {      return this.msgQueue;   }   /*    * Register yourself if you want to be informed about the remote connection status.    * @param connectionStatusListener The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java)    * @return true if we did not already contain the specified element.    */   public synchronized boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener) {      return this.connectionStatusListeners.add(connectionStatusListener);   }   public synchronized boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener, boolean fireInitial) {      if (connectionStatusListener == null) return true;      boolean ret = this.connectionStatusListeners.add(connectionStatusListener);      if (fireInitial) {         if (isDead())            connectionStatusListener.toAlive(this, ConnectionStateEnum.DEAD);         else if (isPolling())            connectionStatusListener.toAlive(this, ConnectionStateEnum.POLLING);         else            connectionStatusListener.toAlive(this, ConnectionStateEnum.ALIVE);      }      return ret;   }   /**    * Remove the given listener    * @param connectionStatusListener    * @return true if it was removed    */   public synchronized boolean removeConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener) {      return this.connectionStatusListeners.remove(connectionStatusListener);   }   public synchronized I_ConnectionStatusListener[] getConnectionStatusListeners() {      if (this.connectionStatusListeners.size() == 0)         return new I_ConnectionStatusListener[0];      return (I_ConnectionStatusListener[])this.connectionStatusListeners.toArray(new I_ConnectionStatusListener[this.connectionStatusListeners.size()]);   }   /**    * The name in the configuration file for the plugin    * @return e.g. "Priority,1.0"    */   public String getTypeVersion() {      return this.typeVersion;   }   /**    * @return The import/export encrypt handle or null if created by a SubjectInfo (no session info available)    */   public I_MsgSecurityInterceptor getMsgSecurityInterceptor() {      return this.securityInterceptor;   }   /**    * @return The handler of all callback plugins, is never null    */   public final DispatchConnectionsHandler getDispatchConnectionsHandler() {      return this.dispatchConnectionsHandler;   }   /**    * How many messages maximum shall the callback thread take in one bulk out of the    * callback queue and deliver in one bulk.    */   public final int getBurstModeMaxEntries() {      return this.burstModeMaxEntries;   }   /**    * How many bytes maximum shall the callback thread take in one bulk out of the    * callback queue and deliver in one bulk.    */   public final long getBurstModeMaxBytes() {      return this.burstModeMaxBytes;   }   /**    * Get timestamp when we went to ALIVE state.    * @return millis timestamp    */   public final long getAliveSinceTime() {      return this.toAliveTime;   }   /**    * Get timestamp when we went to POLLING state.    * @return millis timestamp    */   public final long getPollingSinceTime() {      return this.toPollingTime;   }   /**    * Call by DispatchConnectionsHandler on state transition    * NOTE: toAlive is called initially when a protocol plugin is successfully loaded    * but we don't know yet if it ever is able to connect    */   void toAlive(ConnectionStateEnum oldState) {      if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to ALIVE");      // Remember the current collectTime      AddressBase addr = this.dispatchConnectionsHandler.getAliveAddress();      if (addr == null) {         log.severe(ME+": toAlive action has no alive address");         return;      }      try {         this.inAliveTransition = true;         if (this.toAliveTime <= this.toPollingTime) {            this.toAliveTime = System.currentTimeMillis();         }         this.burstModeMaxEntries = addr.getBurstModeMaxEntries();         this.burstModeMaxBytes = addr.getBurstModeMaxBytes();         synchronized (this.ALIVE_TRANSITION_MONITOR) {            // 1. We allow a client to intercept and for example destroy all entries in the queue            I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();            for (int i=0; i<listeners.length; i++) {               listeners[i].toAlive(this, oldState);            }            // 2. If a dispatch plugin is registered it may do its work            if (this.msgInterceptor != null)               this.msgInterceptor.toAlive(this, oldState);         }      }      finally {         this.inAliveTransition = false;      }      collectTime = addr.getCollectTime(); // burst mode if > 0L      // 3. Deliver. Will be delayed if burst mode timer is activated, will switch to sync mode if necessary      activateDispatchWorker();   }   /** Call by DispatchConnectionsHandler on state transition */   void toPolling(ConnectionStateEnum oldState) {      if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to POLLING");      if (this.toPollingTime <= this.toAliveTime) {         this.toPollingTime = System.currentTimeMillis();      }      switchToASyncMode();      // 1. We allow a client to intercept and for example destroy all entries in the queue      I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();      for (int i=0; i<listeners.length; i++) {         listeners[i].toPolling(this, oldState);      }      // 2. If a dispatch plugin is registered it may do its work      if (this.msgInterceptor != null)         this.msgInterceptor.toPolling(this, oldState);   }   /**    *     * @param ex    */   public void toDead(XmlBlasterException ex) {      shutdownFomAnyState(ConnectionStateEnum.UNDEF, ex);   }   /** Call by DispatchConnectionsHandler on state transition */   void shutdownFomAnyState(ConnectionStateEnum oldState, XmlBlasterException ex) {      if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to DEAD");      if (oldState == ConnectionStateEnum.DEAD) return;      if (this.isShutdown) return;      if (ex != null) { // Very dangerous code! The caller ends up with changed Exception type         ex.changeErrorCode(ErrorCode.COMMUNICATION_NOCONNECTION_DEAD);      }      else {         ex = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME,                  "Switch from " + oldState + " to DEAD, reason is not known");      }      // 1. We allow a client to intercept and for example destroy all entries in the queue      I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();      for (int i=0; i<listeners.length; i++) {         try {            // Only pass original ex.getMessage() - not the changed errorCode            listeners[i].toDead(this, oldState, ex.getMessage());         }         catch (Throwable e) {            e.printStackTrace();         }      }      // 2. If a dispatch plugin is registered it may do its work      if (this.msgInterceptor != null)         this.msgInterceptor.toDead(this, oldState, ex.getMessage());      if (oldState != ConnectionStateEnum.UNDEF)         givingUpDelivery(ex);   }   private void givingUpDelivery(XmlBlasterException ex) {      if (log.isLoggable(Level.FINE)) log.fine(ME+": Entering givingUpDelivery(), state is " + this.dispatchConnectionsHandler.getState());      removeBurstModeTimer();      // The error handler flushed the queue and does error handling with them

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -