📄 dispatchconnectionshandler.java
字号:
/** @return a currently alive callback connection or null */ public final AddressBase getAliveAddress() { DispatchConnection con = getAliveDispatchConnection(); return (con == null) ? null : con.getAddress(); } /** @return a currently dead callback connection or null */ public final AddressBase getDeadAddress() { DispatchConnection con = getDeadDispatchConnection(); return (con == null) ? null : con.getAddress(); } /** @return a dead callback connection or null */ public final DispatchConnection getDeadDispatchConnection() { DispatchConnection[] arr = getDispatchConnectionArr(); for (int ii=0; ii<arr.length; ii++) { if (arr[ii].isDead()) return arr[ii]; } return null; } /** @return a copy snapshot of the current connections */ public DispatchConnection[] getDispatchConnectionArr() { synchronized (conList) { return (DispatchConnection[])conList.toArray(new DispatchConnection[conList.size()]); } } /** @return Number of established callback connections */ public int getCountDispatchConnection() { synchronized (conList) { return conList.size(); } } public void addDispatchConnection(DispatchConnection con) { synchronized (conList) { conList.add(con); } } public void clearDispatchConnectionList() { synchronized (conList) { conList.clear(); } } /** Call by DispatchConnection on state transition */ final void toAlive(DispatchConnection con) { con.registerProgressListener(this.statistic); updateState(null); } /** Call by DispatchConnection on state transition */ final void toPolling(DispatchConnection con) { con.registerProgressListener(null); updateState(null); } /** Call by DispatchConnection on state transition */ final void toDead(DispatchConnection con, XmlBlasterException ex) { con.registerProgressListener(null); removeDispatchConnection(con); // does updateState() updateState(ex); } /** * Handles the state transition * @param XmlBlasterException can be null */ private final void updateState(XmlBlasterException ex) { ConnectionStateEnum oldState = this.state; ConnectionStateEnum tmp = ConnectionStateEnum.DEAD; if (log.isLoggable(Level.FINE)) log.fine(ME+": updateState() oldState="+oldState+" conList.size="+ getCountDispatchConnection()); DispatchConnection[] arr = getDispatchConnectionArr(); for (int ii=0; ii<arr.length; ii++) { if (arr[ii].isAlive()) { this.state = ConnectionStateEnum.ALIVE; if (oldState != this.state) dispatchManager.toAlive(oldState); return; } else if (arr[ii].isPolling()) { tmp = ConnectionStateEnum.POLLING; } } if (tmp == ConnectionStateEnum.POLLING) { this.state = ConnectionStateEnum.POLLING; if (oldState != this.state) dispatchManager.toPolling(oldState); } else if (tmp == ConnectionStateEnum.DEAD) { this.state = ConnectionStateEnum.DEAD; if (oldState != this.state) dispatchManager.shutdownFomAnyState(oldState, ex); } else { this.state = tmp; log.severe(ME+": Internal error in updateState(oldState="+oldState+","+this.state+") " + toXml("")); Thread.dumpStack(); } } /** * @return true if not initialized yet. */ public final boolean isUndef() { return this.state == ConnectionStateEnum.UNDEF; } /** * @return true if at least one connection is alive */ public final boolean isAlive() { return this.state == ConnectionStateEnum.ALIVE; } /** * @return true if no connection alive but at least one is still polling */ public final boolean isPolling() { return this.state == ConnectionStateEnum.POLLING; } /** * @return true if all connections are lost (polling is given up) */ public final boolean isDead() { return this.state == ConnectionStateEnum.DEAD; } public ConnectionStateEnum getState() { return this.state; } private final void removeDispatchConnection(DispatchConnection con) { if (log.isLoggable(Level.FINER)) log.finer(ME+": removeDispatchConnection(" + con.getName() + ") ..."); synchronized (conList) { conList.remove(con); } try { con.shutdown(); } catch (XmlBlasterException ex) { log.severe(ME+": Could not shutdown properly. " + ex.getMessage()); } if (log.isLoggable(Level.FINE)) log.fine(ME+": Destroyed one callback connection, " + getCountDispatchConnection() + " remain."); } /** * If no connection is available but the message is for example save queued, * we can generate here valid return objects * @param state e.g. Constants.STATE_OK */ abstract public void createFakedReturnObjects(I_QueueEntry[] entries, String state, String stateInfo) throws XmlBlasterException; /** * Send the messages back to the client. * If there are more fallback addresses, these will be used if the * first fails. * <p> * The RETURN value is transferred in the msgArr[i].getReturnObj(), for oneway updates it is null * </p> */ public void send(MsgQueueEntry[] msgArr) throws Throwable, XmlBlasterException { if (isDead()) // if (conList.size() < 1) throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "Callback of " + msgArr.length + " messages '" + msgArr[0].getKeyOid() + "' from [" + msgArr[0].getSender() + "] failed, no callback connection is available"); Throwable ex = null; // to remember exception // Try to find a connection which delivers the message ... // PtP messages from the subject Queue are delivered to all reachable sessions of this user ... DispatchConnection[] cons = getDispatchConnectionArr(); // take a snapshot for (int ii=0; ii<cons.length; ii++) { DispatchConnection con = cons[ii]; if (log.isLoggable(Level.FINE)) log.fine(ME+": Trying cb# " + ii + " state=" + con.getState().toString() + " ..."); if (con.isAlive()) { try { con.send(msgArr); return; } catch(Throwable e) { ex = e; if (ii<(cons.length-1)) log.warning(ME+": Callback failed, trying other addresses: " + e.toString()); } } } // error - no success sending message: if (ex == null) { if (cons.length == 0) { ex = new XmlBlasterException(glob, isDead() ? ErrorCode.COMMUNICATION_NOCONNECTION_DEAD : ErrorCode.COMMUNICATION_NOCONNECTION, ME, "Callback of " + msgArr.length + " messages '" + msgArr[0].getKeyOid() + "' from sender [" + msgArr[0].getSender() + "] failed, no callback connection is alive"); } else { ex = new XmlBlasterException(glob, isDead() ? ErrorCode.COMMUNICATION_NOCONNECTION_DEAD : ErrorCode.COMMUNICATION_NOCONNECTION, ME, "Callback of " + msgArr.length + " messages '" + msgArr[0].getKeyOid() + "' to client [" + cons[0].getAddress().getSecretSessionId() + "] from [" + msgArr[0].getSender() + "] failed, no callback connection is alive"); } } throw ex; } /** * @return A container holding some statistical delivery information, is never null */ public final DispatchStatistic getDispatchStatistic() { return this.statistic; } /** * Stop all callback drivers of this client. */ public final void shutdown() { if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering shutdown ..."); DispatchConnection[] arr=getDispatchConnectionArr(); clearDispatchConnectionList(); for (int ii=0; ii<arr.length; ii++) { try { arr[ii].shutdown(); } catch (XmlBlasterException ex) { log.severe(ME+": Could not shutdown properly. " + ex.getMessage()); } } } /** * Dump state of this object into a XML ASCII string. * <br> * @param extraOffset indenting of tags for nice output * @return internal state of SessionInfo as a XML ASCII string */ public final String toXml(String extraOffset) { StringBuffer sb = new StringBuffer(1000); if (extraOffset == null) extraOffset = ""; String offset = Constants.OFFSET + extraOffset; sb.append(offset).append("<DispatchConnectionsHandler state='").append(this.state.toString()).append("'>"); DispatchConnection[] arr = getDispatchConnectionArr(); if (arr.length < 1) sb.append(offset).append(" <noDispatchConnection/>"); else { for (int ii=0; ii<arr.length; ii++) { sb.append(offset).append(" <connection type='" + arr[ii].getDriverName() + "' state='" + arr[ii].getState() + "'/>"); } } sb.append(offset).append("</DispatchConnectionsHandler>"); return sb.toString(); } public abstract ArrayList filterDistributorEntries(ArrayList entries, Throwable ex); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -