📄 dispatchconnection.java
字号:
} catch (XmlBlasterException e) { if (isPolling() && log.isLoggable(Level.FINE)) log.fine(ME + "Exception from update(), retryCounter=" + retryCounter + ", state=" + this.state.toString()); for (int i=0; i<msgArr.length; i++) msgArr[i].incrRedeliverCounter(); if (isDead()) throw e; handleTransition(true, e); if (isDead()) throw e; if (retry(e)) { log.severe(ME + "Exception from update(), retryCounter=" + retryCounter + ", state=" + this.state.toString() + ": " + e.getMessage()); } else { throw e; // forward server side exception to the client } } Thread.dumpStack(); throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "This exception is never reached" + toXml("")); } /** * Does the real ping to the remote server instance. * @param data QoS, never null * @return ping return QoS, never null * @see org.xmlBlaster.protocol.I_XmlBlaster#ping(String) */ abstract public String doPing(String data) throws XmlBlasterException; /** * Ping the remote server instance (callback of the client or xmlBlaster itself) */ public final String ping(String data) throws XmlBlasterException { return ping(data, true); } /** * Ping the xmlBlaster server or callback server of the client. * Sets serverAcceptsRequests to false if the protocol reaches the remote server but this * is in standby mode. * @param byDispatchConnectionsHandler true if invoked by DispatchConnectionsHandler * we can throw exceptions back. * false: If invoked by our timer/ping thread, we need to notify the situation */ protected final String ping(String data, boolean byDispatchConnectionsHandler) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(ME + "ping(" + data + ")"); if (isDead()) { // assert log.severe(ME + "Protocol driver is in state DEAD, ping failed"); throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "Protocol driver is in state DEAD, ping failed"); } data = (data==null)?"":data; boolean stalled = false; DispatchStatistic stats = this.connectionsHandler.getDispatchStatistic(); try { if (log.isLoggable(Level.FINE)) log.fine(ME + stats.toXml("")); // check if an ongoing write operation on the same client if ((stats.ongoingWrite() || stats.ongoingRead()) && this.bypassPingOnActivity) { if (log.isLoggable(Level.FINE)) log.fine(ME + "AN ONGOING WRITE- OR READ OPERATION WHILE PINGING"); long currentBytesWritten = stats.getOverallBytesWritten() + stats.getCurrBytesWritten(); long currentBytesRead = stats.getOverallBytesRead() + stats.getCurrBytesRead(); if (this.previousBytesWritten != currentBytesWritten || this.previousBytesRead != currentBytesRead) { if (log.isLoggable(Level.FINE)) log.fine(ME + "there was an activity since last ping, previousWritten='" + this.previousBytesWritten + "' and currentWritten='" + currentBytesWritten + "' previousRead='" + this.previousBytesRead + "' currentRead='" + currentBytesRead + "'"); this.previousBytesWritten = currentBytesWritten; this.previousBytesRead = currentBytesRead; handleTransition(false, null); stalled = false; return Constants.RET_OK; } else { if (forcePingFailure()) { if (log.isLoggable(Level.FINE)) { String debug = " currentBytesWritten='" + currentBytesWritten + "' currentBytesRead='" + currentBytesRead + "'"; log.fine(ME + "there was NO activity since last ping, throwing an exception" + debug); } throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_RESPONSETIMEOUT, glob.getId(), ME, (String)null, "Ping result: The dispatcher was busy sending data but there was no activity since last ping", (String)null, (Timestamp)null, (String)null, (String)null, (String)null, true); /* We need to set serverSide==true ! */ } else { String debug = " currentBytesWritten='" + currentBytesWritten + "' currentBytesRead='" + currentBytesRead + "'"; log.fine(ME + "there was NO activity since last ping, will set the connection to stalled. " + debug); stalled = true; return Constants.RET_OK; } } } stalled = false; this.previousBytesWritten = stats.getOverallBytesWritten() + stats.getCurrBytesWritten(); this.previousBytesRead = stats.getOverallBytesRead() + stats.getCurrBytesRead(); long now = System.currentTimeMillis(); String returnVal = doPing(data); this.connectionsHandler.getDispatchStatistic().setPingRoundTripDelay(System.currentTimeMillis() - now); // Ignore "" returns as this was specified to always return in older xmlBlaster versions if (returnVal.length() > 0 && returnVal.indexOf("OK") == -1) { // Fake a server standby exception: ping() is not specified to transport a remote XmlBlasterException but carries standby information in the state id. StatusQosData qos = glob.getStatusQosFactory().readObject(returnVal); if (!Constants.STATE_OK.equals(qos.getState())) { throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_SERVERDENY, glob.getId(), ME, (String)null, "Ping result: The server is in run level " + qos.getState() + " and not ready for requests", (String)null, (Timestamp)null, (String)null, (String)null, (String)null, true); /* We need to set serverSide==true ! */ } } if (log.isLoggable(Level.FINE) && isAlive()) log.fine(ME + "Success for ping('" + data + "'), return='" + returnVal + "'"); handleTransition(byDispatchConnectionsHandler, null); return returnVal; } catch (Throwable e) { // the remote ping does not throw any XmlBlasterException, see xmlBlaster.idl if (isAlive() && log.isLoggable(Level.FINE)) log.fine(ME + "Exception from remote ping(), retryCounter=" + retryCounter + ", state=" + this.state.toString() + ": " + e.toString()); e = processResponseTimeoutException(stats, e); if (e == null) stalled = true; handleTransition(byDispatchConnectionsHandler, e); return ""; // Only reached if from timeout } finally { if (stats != null) stats.setStalled(stalled); } } /** * Returns true if it is an XmlBlasterException with error code COMMUNICATION_RESPONSETIMEOUT, false otherwise * @param e * @return */ private final static boolean isResponseTimeout(Throwable e) { if (e == null) return false; if (e instanceof XmlBlasterException) { XmlBlasterException ex = (XmlBlasterException)e; if (ex.getErrorCode().equals(ErrorCode.COMMUNICATION_RESPONSETIMEOUT)) return true; return isResponseTimeout(ex.getEmbeddedException()); } return false; } /** * * @param e the original Communication Exception * @return the original communication exception or null if it wants to force a 'stalled' in case a ping timeout occured */ private final Throwable processResponseTimeoutException(DispatchStatistic stats, Throwable e) { if (this.address == null || !this.address.isStallOnPingTimeout()) return e; if (isResponseTimeout(e)) { return null; } return e; } /** On reconnect polling try to establish the connection */ abstract protected void reconnect() throws XmlBlasterException; /** * We are notified to do the next ping or reconnect polling. * <p /> * When connected, we ping<br /> * When connection is lost, we do reconnect polling * @param userData You get bounced back your userData which you passed * with Timeout.addTimeoutListener() * here it is "poll" or null */ public final void timeout(Object userData) { this.timerKey = null; if (isDead()) return; boolean isPing = (userData == null); if (isPing) { if (log.isLoggable(Level.FINE)) log.fine(ME + " timeout -> Going to ping remote server, physicalConnectionOk=" + this.physicalConnectionOk + ", serverAcceptsRequests=" + this.serverAcceptsRequests + " ..."); try { /*String result = */ping("", false); } catch (XmlBlasterException e) { if (isDead()) { if (log.isLoggable(Level.FINE)) log.fine(ME + "We are shutdown already: " + e.toString()); } else { e.printStackTrace(); log.severe(ME + "PANIC: " + e.toString()); } } // is handled in ping() itself } else { // reconnect polling try { if (log.isLoggable(Level.FINE)) log.fine(ME + "timeout -> Going to check if remote server is available again, physicalConnectionOk=" + this.physicalConnectionOk + ", serverAcceptsRequests=" + this.serverAcceptsRequests + " ..."); reconnect(); // The ClientDispatchConnection may choose to ping only try { /*String result = */ping("", false); } catch (XmlBlasterException e) { if (isDead()) { if (log.isLoggable(Level.FINE)) log.fine(ME + "We are shutdown already: " + e.toString()); } else { e.printStackTrace(); log.severe(ME + "PANIC: " + e.toString()); // is handled in ping() itself } } } catch (Throwable e) { if (log.isLoggable(Level.FINE)) log.fine("Polling for remote connection failed:" + e.getMessage()); if (e instanceof NullPointerException) e.printStackTrace(); if (logInterval > 0 && (retryCounter % logInterval) == 0) log.warning("No connection established, " + address.getLogId() + " still seems to be down after " + (retryCounter+1) + " connection retries."); try { handleTransition(false, e); } catch(XmlBlasterException e2) { e.printStackTrace(); log.severe("PANIC: " + e.toString()); } } } } protected void spanPingTimer(long timeout, boolean isPing) throws XmlBlasterException { String userData = (isPing) ? null : "poll"; synchronized (this.PING_TIMER_MONITOR) { this.timerKey = this.glob.getPingTimer().addOrRefreshTimeoutListener(this, timeout, userData, this.timerKey); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -