📄 dispatchconnection.java
字号:
/*------------------------------------------------------------------------------Name: DispatchConnection.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.dispatch;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.qos.StatusQosData;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.xbformat.I_ProgressListener;/** * Holding all necessary infos to establish a remote connection * and check this connection. * <pre> * State chart of a single connection: * * +<--toAlive()----+ * | | * ######### ########## ########## * # # # # # # * # ALIVE # # POLLING # # DEAD # * # # # # # # * ######### ########## ########## * | | | | | | * | +--toPolling()-->+ +--toDead()-->+ | * | | * +-------------toDead()---------------------->+ * * </pre> * <p> * Note that DispatchConnection can't recover from DEAD state * you need to create a new instance if desired * </p> * @author xmlBlaster@marcelruff.info * @author michele@laghi.eu */abstract public class DispatchConnection implements I_Timeout{ public final String ME; protected final Global glob; private static Logger log = Logger.getLogger(DispatchConnection.class.getName()); protected DispatchConnectionsHandler connectionsHandler = null; /** For logging only */ protected final String myId; protected AddressBase address; private Timestamp timerKey; /** Protects timerKey refresh */ private final Object PING_TIMER_MONITOR = new Object(); protected ConnectionStateEnum state = ConnectionStateEnum.UNDEF; protected int retryCounter = 0; private final long logEveryMillis; // 60000: every minute a log private int logInterval = 10; /** * Flag if the remote server is reachable but is not willing to process our requests (standby mode). * This flag is only evaluated in POLLING state */ protected boolean serverAcceptsRequests = false; protected boolean physicalConnectionOk = false; protected long previousBytesWritten; protected long previousBytesRead; private boolean bypassPingOnActivity = true; /** * Our loadPlugin() and initialize() needs to be called next. * @param connectionsHandler The DevliveryConnectionsHandler witch i belong to * @param address The address i shall connect to */ public DispatchConnection(Global glob, DispatchConnectionsHandler connectionsHandler, AddressBase address) { if (address == null) throw new IllegalArgumentException("DispatchConnection expects an address!=null"); this.ME = "DispatchConnection-" + connectionsHandler.getDispatchManager().getQueue().getStorageId() + " "; this.glob = glob; this.logEveryMillis = glob.getProperty().get("dispatch/logRetryEveryMillis", 60000L); // every minute a log if (log.isLoggable(Level.FINE)) log.fine("dispatch/logRetryEveryMillis=" + this.logEveryMillis); this.connectionsHandler = connectionsHandler; this.myId = connectionsHandler.getDispatchManager().getQueue().getStorageId().getId() + " "; this.address = address; } public void setAddress(AddressBase address) throws XmlBlasterException { if (log.isLoggable(Level.FINE)) log.fine(ME + "setAddress: configuration has changed (with same url) set to new address object"); this.address = address; } /** * @return A nice name for logging */ abstract public String getName(); /** * Connects on protocol level to the server and tries a ping. * Needs to be called after construction * <p> * Calls connectLowLevel() which needs to be implemented by derived classes * loadPlugin() needs to be called before. * </p> * Called by ClientDispatchConnectionsHandler or CbDispatchConnectionsHandler */ public final void initialize() throws XmlBlasterException { this.retryCounter = 0; if (this.logEveryMillis <= 0) { logInterval = -1; // no logging } else if (address.getDelay() < 1 || address.getDelay() > this.logEveryMillis) // millisec logInterval = 1; else logInterval = (int)(this.logEveryMillis / address.getDelay()); try { connectLowlevel(); handleTransition(true, null); } catch (XmlBlasterException e) { if (ErrorCode.COMMUNICATION_FORCEASYNC.equals(e.getErrorCode())) { if (log.isLoggable(Level.FINE)) log.fine(ME + "initialize:" + e.getMessage()); } else log.warning(ME + "initialize:" + e.getMessage()); if (retry(e)) { // all types of ErrorCode.COMMUNICATION* handleTransition(true, e); // never returns (only if DEAD) - throws exception } else { connectionsHandler.toDead(this, e); throw e; } } catch (Throwable throwable) { throwable.printStackTrace(); XmlBlasterException e = (throwable instanceof XmlBlasterException) ? (XmlBlasterException)throwable : new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "", throwable); if (log.isLoggable(Level.FINE)) log.fine(ME + e.toString()); connectionsHandler.toDead(this, e); throw e; } if (log.isLoggable(Level.FINE)) log.fine(ME + "Created driver for protocol '" + this.address.getType() + "'"); } private boolean retry(XmlBlasterException e) { if (e.isCommunication()) // all types of ErrorCode.COMMUNICATION* return true; //if (e.isErrorCode(ErrorCode.USER_SECURITY_AUTHENTICATION_ACCESSDENIED)) // return true; // If the client was killed in the server and tries to reconnect with old sessionId return false; } public void finalize() { if (this.timerKey != null) { this.glob.getPingTimer().removeTimeoutListener(this.timerKey); this.timerKey = null; } if (log.isLoggable(Level.FINE)) log.fine(ME + "finalize - garbage collected"); } public final AddressBase getAddress() { return this.address; } /** Called on COMMUNICATION errors, reset protocol driver for reconnect polling */ abstract public void resetConnection(); /** * The derived class should create an instance of the protocol driver. */ abstract public void loadPlugin() throws XmlBlasterException; /** * Connect on protocol layer and try an initial low level ping. * @exception XmlBlasterException with ErrorCode.COMMUNICATION* if server is not reachable * or other exceptions on other errors */ abstract public void connectLowlevel() throws XmlBlasterException; /** A human readable name of the protocol plugin */ abstract public String getDriverName(); /** * Send the messages. * @param msgArr Should be a copy of the original, since we export it which changes/encrypts the content. * msgArr[i].getReturnObj() transports the returned string array from the client which is decrypted * if necessary, for oneway updates it is null */ abstract public void doSend(MsgQueueEntry[] msgArr_) throws XmlBlasterException; /** * Should be overwritten by extending classes. */ abstract public I_ProgressListener registerProgressListener(I_ProgressListener listener); /** * Send the messages back to the client. * @param msgArr Should be a copy of the original, since we export it which changes/encrypts the content * @return The returned string from the client which is decrypted if necessary, for oneway updates it is null */ public void send(MsgQueueEntry[] msgArr) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(ME + "send(msgArr.length=" + msgArr.length + ")"); if (msgArr == null || msgArr.length == 0) return; // assert if (isDead()) { // assert log.severe(ME + "Connection to " + this.address.toString() + " is in state DEAD, msgArr.length=" + msgArr.length + " messages are lost"); throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "Internal problem: Connection to " + this.address.toString() + " is in state DEAD, msgArr.length=" + msgArr.length + " messages are lost"); } // Send the message ... try { long now = System.currentTimeMillis(); doSend(msgArr); this.connectionsHandler.getDispatchStatistic().setRoundTripDelay(System.currentTimeMillis() - now); handleTransition(true, null); return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -