📄 dispatchconnectionshandler.java
字号:
/*------------------------------------------------------------------------------Name: DispatchConnectionsHandler.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: Holding messages waiting on client callback.Author: xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/package org.xmlBlaster.util.dispatch;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.address.AddressBase;import java.util.ArrayList;/** * Holding all necessary infos to establish a remote * connection and invoke update()/updateOneway()/ping(). * <p> * This instance is a 'logical connection' hiding multiple * 'physical' connections (called DispatchConnection). * </p> * <p> * One instance of this is used for each DispatchManager (one logical connection). * </p> * <pre> * State chart of the 'logical connection': * * +<-----------------initialize()--------------+ * | | * | +<--toAlive()----+ +<-initialize()+ | * | | initialize() | | | | * | | | | | | * ######### ########## ########## * # # # # # # * # ALIVE # # POLLING # # DEAD # * # # # # # # * ######### ########## ########## * | | | | | | * | +--toPolling()-->+ +--toDead()-->+ | * | initialize() initialize() | * | | * +------------------toDead()----------------->+ * initialize() * </pre> * <p> * Note: Recovery from dead state is only possible if * new callback addresses are passed with initialize() * </p> * <p> * Note: toAlive(), toPolling() and toDead() are called * by a single DispatchConnection only, telling its state change. * </p> * @author xmlBlaster@marcelruff.info */abstract public class DispatchConnectionsHandler{ public final String ME; protected final Global glob; private static Logger log = Logger.getLogger(DispatchConnectionsHandler.class.getName()); protected final DispatchManager dispatchManager; protected final DispatchStatistic statistic; protected I_PostSendListener postSendListener; /** holds all DispatchConnection instances */ private ArrayList conList = new ArrayList(); private ConnectionStateEnum state = ConnectionStateEnum.UNDEF; /** * You need to call initialize() after construction. * @param dispatchManager The message queue witch i belong to * @param cbAddr The addresses i shall connect to */ public DispatchConnectionsHandler(Global glob, DispatchManager dispatchManager) throws XmlBlasterException { this.ME = dispatchManager.getQueue().getStorageId().toString(); this.glob = glob; this.dispatchManager = dispatchManager; this.statistic = new DispatchStatistic(); } public final DispatchManager getDispatchManager() { return this.dispatchManager; } /** * Access the listener for send messages. * @return Returns the postSendListener or null if none is registered */ public final I_PostSendListener getPostSendListener() { return this.postSendListener; } /** * Register a listener to get notifications when a messages is successfully send. * Max one can be registered, any old one will be overwritten * @param postSendListener The postSendListener to set. */ public final void registerPostSendListener(I_PostSendListener postSendListener) { this.postSendListener = postSendListener; } /** * Overwrite existing connections with new configuration */ public final void initialize(AddressBase[] cbAddr) throws XmlBlasterException { int oldConSize = getCountDispatchConnection();//conList.size(); DispatchConnection reconfiguredCon = null; if (log.isLoggable(Level.FINER)) log.finer(ME+": Initialize old connections=" + oldConSize + " new connections=" + ((cbAddr==null)?0:cbAddr.length)); ArrayList toShutdown = new ArrayList(); try { synchronized (this.dispatchManager) { DispatchConnection[] tmpList = getDispatchConnectionArr(); clearDispatchConnectionList(); // conList.clear(); if (cbAddr == null || cbAddr.length==0) { for (int ii=0; ii<tmpList.length; ii++) { if (tmpList[ii] != null) tmpList[ii].shutdown(); } updateState(null); return; } // shutdown callbacks not in use any more ... for (int ii=0; ii<tmpList.length; ii++) { boolean found = false; DispatchConnection tmpConn = tmpList[ii]; if (tmpConn == null) continue; for (int jj=0; jj<cbAddr.length; jj++) { Object obj = cbAddr[jj].getCallbackDriver(); if (obj != null && obj != tmpConn.getAddress().getCallbackDriver()) { continue; } if (tmpConn.getAddress().isSameAddress(cbAddr[jj])) { found = true; break; } } if (!found) { log.info(ME+": Shutting down callback connection '" + tmpConn.getName() + "' because of new configuration."); toShutdown.add(tmpConn); tmpList[ii] = null; //con.shutdown(); } } // keep existing addresses, add the new ones ... for (int ii=0; ii<cbAddr.length; ii++) { boolean found = false; for (int jj=0; jj<tmpList.length; jj++) { DispatchConnection tmpCon = tmpList[jj]; if (tmpCon == null) continue; if (cbAddr[ii].isSameAddress((tmpCon).getAddress())) { found = true; tmpCon.setAddress(cbAddr[ii]); addDispatchConnection(tmpCon); // reuse reconfiguredCon = tmpCon; tmpCon.registerProgressListener(this.statistic); // presistent SOCKET cb after restart break; } } if (!found) { try { // This creates a client or cb instance with its plugin DispatchConnection con = createDispatchConnection(cbAddr[ii]); if (log.isLoggable(Level.FINE)) log.fine(ME+": Create new DispatchConnection, retries=" + cbAddr[ii].getRetries() + " :" + cbAddr[ii].toXml()); try { addDispatchConnection(con); con.initialize(); con.registerProgressListener(this.statistic); } catch (XmlBlasterException e) { if (e.isCommunication()) { // Initial POLLING ? this.dispatchManager.toPolling(this.state); if (log.isLoggable(Level.FINE)) log.fine(ME+": Load " + cbAddr[ii].toString() + ": " + e.getMessage()); } else { log.severe(ME+": Can't load " + cbAddr[ii].toString() + ": " + e.getMessage()); toShutdown.add(con); //con.shutdown(); synchronized (conList) { conList.remove(con); } } } } catch (XmlBlasterException e) { log.warning(ME+": Can't load " + cbAddr[ii].toString() + ": " + e.getMessage()); throw e; } catch (Throwable e) { log.severe(ME+": Can't load " + cbAddr[ii].toXml() + ": " + e.toString()); throw XmlBlasterException.convert(glob, ME, "", e); } // TODO: cleanup if exception is thrown by createDispatchConnection() } } tmpList = null; } // synchronized } finally { // We had the case where // java.net.PlainSocketImpl.socketClose0(Native Method) // blocked for 20 min in LAST_ACK, so we shutdown outside of the synchronized now: for (int i=0; i<toShutdown.size(); i++) { DispatchConnection con = (DispatchConnection)toShutdown.get(i); try { con.shutdown(); } catch (XmlBlasterException ex) { log.severe(ME+"initialize(): Could not shutdown properly. " + ex.getMessage()); } } updateState(null); // Redundant?? if (log.isLoggable(Level.FINE)) log.fine(ME+": Reached state = " + state.toString()); if (reconfiguredCon != null && /*reconfiguredCon.*/isPolling() && oldConSize > 0) { this.glob.getPingTimer().addTimeoutListener(reconfiguredCon, 0L, "poll"); // force a reconnect try } } } /** * Create a DispatchConnection instance and load the protocol plugin. * You should call initialie() later. */ abstract public DispatchConnection createDispatchConnection(AddressBase address) throws XmlBlasterException; /** @return a currently available callback connection (with any state) or null */ public final DispatchConnection getCurrentDispatchConnection() { DispatchConnection d = getAliveDispatchConnection(); if (d == null) d = getPollingDispatchConnection(); if (d == null) d = getDeadDispatchConnection(); return d; } /** @return a currently alive callback connection or null */ public final DispatchConnection getAliveDispatchConnection() { DispatchConnection[] arr = getDispatchConnectionArr(); for (int ii=0; ii<arr.length; ii++) { if (arr[ii].isAlive()) return arr[ii]; } return null; } /** @return a currently polling callback connection or null */ public final DispatchConnection getPollingDispatchConnection() { DispatchConnection[] arr = getDispatchConnectionArr(); for (int ii=0; ii<arr.length; ii++) { if (arr[ii].isPolling()) return arr[ii]; } return null; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -