⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchconnectionshandler.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      DispatchConnectionsHandler.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment:   Holding messages waiting on client callback.Author:    xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/package org.xmlBlaster.util.dispatch;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.address.AddressBase;import java.util.ArrayList;/** * Holding all necessary infos to establish a remote * connection and invoke update()/updateOneway()/ping().  * <p> * This instance is a 'logical connection' hiding multiple * 'physical' connections (called DispatchConnection). * </p> * <p> * One instance of this is used for each DispatchManager (one logical connection). * </p> * <pre> *    State chart of the 'logical connection': * *      +<-----------------initialize()--------------+ *      |                                            | *      |   +<--toAlive()----+    +<-initialize()+   |           *      |   |   initialize() |    |              |   |   *      |   |                |    |              |   |   *    #########            ##########         ########## *   #         #          #          #       #          # *   #  ALIVE  #          # POLLING  #       #  DEAD    # *   #         #          #          #       #          # *    #########            ##########         ########## *      |   |                |    |             |    | *      |   +--toPolling()-->+    +--toDead()-->+    | *      |      initialize()          initialize()    | *      |                                            | *      +------------------toDead()----------------->+ *                         initialize() * </pre> * <p> * Note: Recovery from dead state is only possible if *       new callback addresses are passed with initialize() * </p> * <p> * Note: toAlive(), toPolling() and toDead() are called *       by a single DispatchConnection only, telling its state change. * </p> * @author xmlBlaster@marcelruff.info */abstract public class DispatchConnectionsHandler{   public final String ME;   protected final Global glob;   private static Logger log = Logger.getLogger(DispatchConnectionsHandler.class.getName());   protected final DispatchManager dispatchManager;   protected final DispatchStatistic statistic;   protected I_PostSendListener postSendListener;   /** holds all DispatchConnection instances */   private ArrayList conList = new ArrayList();   private ConnectionStateEnum state = ConnectionStateEnum.UNDEF;      /**    * You need to call initialize() after construction.     * @param dispatchManager The message queue witch i belong to    * @param cbAddr The addresses i shall connect to    */   public DispatchConnectionsHandler(Global glob, DispatchManager dispatchManager) throws XmlBlasterException {      this.ME = dispatchManager.getQueue().getStorageId().toString();      this.glob = glob;      this.dispatchManager = dispatchManager;      this.statistic = new DispatchStatistic();   }   public final DispatchManager getDispatchManager() {      return this.dispatchManager;   }   /**    * Access the listener for send messages.     * @return Returns the postSendListener or null if none is registered    */   public final I_PostSendListener getPostSendListener() {      return this.postSendListener;   }   /**    * Register a listener to get notifications when a messages is successfully send.     * Max one can be registered, any old one will be overwritten     * @param postSendListener The postSendListener to set.    */   public final void registerPostSendListener(I_PostSendListener postSendListener) {      this.postSendListener = postSendListener;   }   /**    * Overwrite existing connections with new configuration    */   public final void initialize(AddressBase[] cbAddr) throws XmlBlasterException {      int oldConSize = getCountDispatchConnection();//conList.size();      DispatchConnection reconfiguredCon = null;      if (log.isLoggable(Level.FINER)) log.finer(ME+": Initialize old connections=" + oldConSize +                                 " new connections=" + ((cbAddr==null)?0:cbAddr.length));      ArrayList toShutdown = new ArrayList();      try {         synchronized (this.dispatchManager) {                        DispatchConnection[] tmpList = getDispatchConnectionArr();            clearDispatchConnectionList(); // conList.clear();                        if (cbAddr == null || cbAddr.length==0) {               for (int ii=0; ii<tmpList.length; ii++) {                  if (tmpList[ii] != null)                     tmpList[ii].shutdown();               }               updateState(null);               return;            }            // shutdown callbacks not in use any more ...            for (int ii=0; ii<tmpList.length; ii++) {               boolean found = false;               DispatchConnection  tmpConn = tmpList[ii];               if (tmpConn == null) continue;               for (int jj=0; jj<cbAddr.length; jj++) {                  Object obj = cbAddr[jj].getCallbackDriver();                  if (obj != null && obj != tmpConn.getAddress().getCallbackDriver()) {                     continue;                  }                  if (tmpConn.getAddress().isSameAddress(cbAddr[jj])) {                     found = true;                     break;                  }               }               if (!found) {                  log.info(ME+": Shutting down callback connection '" + tmpConn.getName() + "' because of new configuration.");                  toShutdown.add(tmpConn);                  tmpList[ii] = null;                  //con.shutdown();               }            }            // keep existing addresses, add the new ones ...            for (int ii=0; ii<cbAddr.length; ii++) {               boolean found = false;               for (int jj=0; jj<tmpList.length; jj++) {                  DispatchConnection tmpCon = tmpList[jj];                  if (tmpCon == null) continue;                  if (cbAddr[ii].isSameAddress((tmpCon).getAddress())) {                     found = true;                     tmpCon.setAddress(cbAddr[ii]);                     addDispatchConnection(tmpCon); // reuse                     reconfiguredCon = tmpCon;                     tmpCon.registerProgressListener(this.statistic); // presistent SOCKET cb after restart                     break;                  }               }               if (!found) {                  try {  // This creates a client or cb instance with its plugin                     DispatchConnection con = createDispatchConnection(cbAddr[ii]);                     if (log.isLoggable(Level.FINE)) log.fine(ME+": Create new DispatchConnection, retries=" + cbAddr[ii].getRetries() + " :" + cbAddr[ii].toXml());                     try {                        addDispatchConnection(con);                        con.initialize();                        con.registerProgressListener(this.statistic);                     }                     catch (XmlBlasterException e) {                        if (e.isCommunication()) { // Initial POLLING ?                           this.dispatchManager.toPolling(this.state);                           if (log.isLoggable(Level.FINE)) log.fine(ME+": Load " + cbAddr[ii].toString() + ": " + e.getMessage());                        }                        else {                           log.severe(ME+": Can't load " + cbAddr[ii].toString() + ": " + e.getMessage());                           toShutdown.add(con);                           //con.shutdown();                           synchronized (conList) {                              conList.remove(con);                           }                        }                     }                  }                  catch (XmlBlasterException e) {                     log.warning(ME+": Can't load " + cbAddr[ii].toString() + ": " + e.getMessage());                     throw e;                  }                  catch (Throwable e) {                     log.severe(ME+": Can't load " + cbAddr[ii].toXml() + ": " + e.toString());                     throw XmlBlasterException.convert(glob, ME, "", e);                  }                  // TODO: cleanup if exception is thrown by createDispatchConnection()               }            }                        tmpList = null;         } // synchronized      }      finally {         // We had the case where         //   java.net.PlainSocketImpl.socketClose0(Native Method)         // blocked for 20 min in LAST_ACK, so we shutdown outside of the synchronized now:          for (int i=0;  i<toShutdown.size(); i++) {            DispatchConnection con = (DispatchConnection)toShutdown.get(i);            try {               con.shutdown();            }            catch (XmlBlasterException ex) {               log.severe(ME+"initialize(): Could not shutdown properly. " + ex.getMessage());            }         }         updateState(null);  // Redundant??         if (log.isLoggable(Level.FINE)) log.fine(ME+": Reached state = " + state.toString());         if (reconfiguredCon != null && /*reconfiguredCon.*/isPolling() && oldConSize > 0) {            this.glob.getPingTimer().addTimeoutListener(reconfiguredCon, 0L, "poll");  // force a reconnect try         }      }   }   /**    * Create a DispatchConnection instance and load the protocol plugin.     * You should call initialie() later.    */   abstract public DispatchConnection createDispatchConnection(AddressBase address) throws XmlBlasterException;   /** @return a currently available callback connection (with any state) or null */   public final DispatchConnection getCurrentDispatchConnection() {      DispatchConnection d = getAliveDispatchConnection();      if (d == null)         d = getPollingDispatchConnection();      if (d == null)         d = getDeadDispatchConnection();      return d;   }   /** @return a currently alive callback connection or null */   public final DispatchConnection getAliveDispatchConnection() {      DispatchConnection[] arr = getDispatchConnectionArr();      for (int ii=0; ii<arr.length; ii++) {         if (arr[ii].isAlive())            return arr[ii];      }      return null;   }   /** @return a currently polling callback connection or null */   public final DispatchConnection getPollingDispatchConnection() {      DispatchConnection[] arr = getDispatchConnectionArr();      for (int ii=0; ii<arr.length; ii++) {         if (arr[ii].isPolling())            return arr[ii];      }      return null;   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -