📄 socketcollectionmanager.java
字号:
protected void sourceRouteUpdated(SourceRouteManager manager) { if (sourceRouteQueue.contains(manager)) { sourceRouteQueue.remove(manager); sourceRouteQueue.addFirst(manager); } else { if (logger.level <= Logger.SEVERE) { logger.log("(SCM) SERIOUS ERROR: Request to record update for unknown source route " + manager); } } } /** * Makes this node resign from the network. Is designed to be used for * debugging and testing. * * @exception IOException DESCRIBE THE EXCEPTION */ public void destroy() throws IOException { resigned = true; pingManager.resign(); while (socketQueue.size() > 0) { ((SocketManager) sockets.get(socketQueue.getFirst())).close(); } while (sourceRouteQueue.size() > 0) { ((SourceRouteManager) sourceRouteQueue.getFirst()).close(); } // anything somehow left in sockets? while (sockets.size() > 0) { ((SocketManager) sockets.values().iterator().next()).close(); } // any left in un while (unIdentifiedSM.size() > 0) { ((SocketManager) unIdentifiedSM.iterator().next()).close(); } key.channel().close(); key.cancel(); } /** * Internal testing method which simulates a stall. DO NOT USE!!!!! */ public void stall() { key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT); Iterator i = sockets.keySet().iterator(); while (i.hasNext()) { SelectionKey key = ((SocketManager) sockets.get(i.next())).key; key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); } pingManager.stall(); } /** * Internal class which represents a message which is currently delayed, * waiting for an open socket. The message will be tried using exponential * backoff up to BACKOFF_LIMIT times before being dropped. * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ protected class MessageRetry extends rice.selector.TimerTask { // The number of tries that have occurred so far /** * DESCRIBE THE FIELD */ protected int tries = 0; // the current timeout /** * DESCRIBE THE FIELD */ protected long timeout = BACKOFF_INITIAL; // The destination route /** * DESCRIBE THE FIELD */ protected SourceRoute route; // The message /** * DESCRIBE THE FIELD */ protected SocketBuffer message; // This is to keep a hard link to the AM, so it isn't collected /** * DESCRIBE THE FIELD */ protected AddressManager am; /** * Constructor, taking a message and the route * * @param message The message * @param route The route * @param am DESCRIBE THE PARAMETER */ public MessageRetry(SourceRoute route, SocketBuffer message, AddressManager am) { this.am = am; this.message = message; this.route = route; this.timeout = (long) (timeout * (0.8 + (0.4 * random.nextDouble()))); pastryNode.getTimer().schedule(this, timeout); } /** * Main processing method for the DeadChecker object */ public void run() { if (!sendInternal(route, message)) { if (logger.level <= Logger.FINE) { logger.log("BACKOFF: Could not send message " + message + " after " + tries + " timeout " + timeout + " retries - retrying."); } if (tries < BACKOFF_LIMIT) { tries++; timeout = (long) ((2 * timeout) * (0.8 + (0.4 * random.nextDouble()))); pastryNode.getTimer().schedule(this, timeout); } else { if (logger.level <= Logger.WARNING) { logger.log("WARNING: Could not send message " + message + " after " + tries + " retries. Dropping on the floor."); } } } else { if (logger.level <= Logger.FINE) { logger.log("BACKOFF: Was able to send message " + message + " after " + tries + " timeout " + timeout + " retries."); } } } } /** * DESCRIBE THE CLASS * * @version $Id: SocketCollectionManager.java 3274 2006-05-15 16:17:47Z jeffh * $ * @author jeffh */ protected class DeadChecker extends rice.selector.TimerTask implements PingResponseListener { // The number of tries that have occurred so far /** * DESCRIBE THE FIELD */ protected int tries = 1; // the total number of tries before declaring death /** * DESCRIBE THE FIELD */ protected int numTries; // the path to check /** * DESCRIBE THE FIELD */ protected SourceRoute path; /** * Constructor for DeadChecker. * * @param numTries DESCRIBE THE PARAMETER * @param path DESCRIBE THE PARAMETER */ public DeadChecker(SourceRoute path, int numTries) { if (logger.level <= Logger.FINE) { logger.log("DeadChecker(" + path + ") started."); } this.path = path; this.numTries = numTries; } /** * DESCRIBE THE METHOD * * @param RTT DESCRIBE THE PARAMETER * @param timeHeardFrom DESCRIBE THE PARAMETER * @param path DESCRIBE THE PARAMETER */ public void pingResponse(SourceRoute path, long RTT, long timeHeardFrom) { if (logger.level <= Logger.FINE) { logger.log("Terminated DeadChecker(" + path + ") due to ping."); } manager.markAlive(path); cancel(); } /** * Main processing method for the DeadChecker object value of tries before * run() is called:the time since ping was called:the time since deadchecker * was started 1:500:500 2:1000:1500 3:2000:3500 4:4000:7500 5:8000:15500 // * ~15 seconds to find 1 path faulty, using source routes gives us 30 * seconds to find a node faulty */ public void run() { if (tries < numTries) { tries++; if (manager.getLiveness(path.getLastHop()) == SocketNodeHandle.LIVENESS_ALIVE) { manager.markSuspected(path); } pingManager.ping(path, this); int absPD = (int) (PING_DELAY * Math.pow(2, tries - 1)); int jitterAmt = (int) (((float) absPD) * PING_JITTER); int scheduledTime = absPD - jitterAmt + random.nextInt(jitterAmt * 2); ((SocketPastryNode) pastryNode).getTimer().schedule(this, scheduledTime); } else { if (logger.level <= Logger.FINE) { logger.log("DeadChecker(" + path + ") expired - marking as dead."); } manager.markDead(path); cancel(); } } /** * DESCRIBE THE METHOD * * @return DESCRIBE THE RETURN VALUE */ public boolean cancel() { pingManager.removePingResponseListener(path, this); return super.cancel(); } } /** * Private class which is tasked with maintaining a source route which goes * through this node. This class maintains to sockets, and transfers the data * between them. It also is responsible for performing the initial handshake * and sending the data across the wire. * * @version $Id: SocketCollectionManager.java 3274 2006-05-15 16:17:47Z jeffh * $ * @author jeffh */ protected class SourceRouteManager extends SelectionKeyHandler { // the first channel private SocketChannel channel1; // the second channel private SocketChannel channel2; // the repeater, which does the actual byte moving from socket to socket private SocketChannelRepeater repeater; /** * Constructor which accepts an incoming connection, represented by the * selection key. This constructor builds a new * IntermediateSourceRouteManager, and waits until the greeting message is * read from the other end. Once the greeting is received, the manager makes * sure that a socket for this handle is not already open, and then proceeds * as normal. * * @param key The server accepting key for the channel * @exception IOException DESCRIBE THE EXCEPTION */ public SourceRouteManager(SelectionKey key) throws IOException { this.repeater = new SocketChannelRepeater(pastryNode, this); sourceRouteOpened(this); acceptConnection(key); } /** * Internal method which returns the other key * * @param channel DESCRIBE THE PARAMETER * @return The right key */ SocketChannel otherChannel(SelectableChannel channel) { return (channel == channel1 ? channel2 : channel1); } /** * Internal method which adds an interest op to the given channel's interest * set. One should note that if the passed in key is null, it will determine * which channel this is the key for, and then rebuild a key for that * channel. * * @param channel The channel * @param op The operation to add to the key's interest set * @exception IOException DESCRIBE THE EXCEPTION */ protected void addInterestOp(SelectableChannel channel, int op) throws IOException { String k = (channel == channel1 ? "1" : "2"); if (logger.level <= Logger.FINER) { logger.log("(SRM) " + this + " adding interest op " + op + " to key " + k); } if (pastryNode.getEnvironment().getSelectorManager().getKey(channel) == null) { if (logger.level <= Logger.FINER) { logger.log("(SRM) " + this + " key " + k + " is null - reregistering with ops " + op); } pastryNode.getEnvironment().getSelectorManager().register(channel, this, op); } else { pastryNode.getEnvironment().getSelectorManager().register(channel, this, pastryNode.getEnvironment().getSelectorManager().getKey(channel).interestOps() | op); if (logger.level <= Logger.FINER) { logger.log("(SRM) " + this + " interest ops for key " + k + " are now " + pastryNode.getEnvironment().getSelectorManager().getKey(channel).interestOps()); } } } /** * Internal method which removes an interest op to the given key's interest * set. One should note that if the passed in key no longer has any interest * ops, it is cancelled, removed from the selector's key set, and the * corresponding key is set to null in this class. * * @param channel The channel * @param op The operation to remove from the key's interest set * @exception IOException DESCRIBE THE EXCEPTION */ protected void removeInterestOp(SelectableChannel channel, int op) throws IOException { String k = (channel == channel1 ? "1" : "2"); if (logger.level <= Logger.FINER) { logger.log("(SRM) " + this + " removing interest op " + op + " from key " + k); } SelectionKey key = pastryNode.getEnvironment().getSelectorManager().getKey(channel); if (key != null) { key.interestOps(key.interestOps() & ~op); if (key.interestOps() == 0) { if (logger.level <= Logger.FINER) { logger.log("(SRM) " + this + " key " + k + " has no interest ops - cancelling"); } pastryNode.getEnvironment().getSelectorManager().cancel(key); } } } /** * Method which initiates a shutdown of this socket by calling * shutdownOutput(). This has the effect of removing the manager from the * open list. * * @param channel DESCRIBE THE PARAMETER */ public void shutdown(SocketChannel channel) { try { if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " shutting down output to key " + (channel == channel1 ? "1" : "2")); } channel.socket().shutdownOutput(); sourceRouteClosed(this); } catch (IOException e) { if (logger.level <= Logger.SEVERE) { logger.log("ERROR: Received exception " + e + " while shutting down SR output."); } close(); } } /** * Method which closes down this socket manager, by closing the socket, * cancelling the key and setting the key to be interested in nothing */ public void close() { if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " closing source route"); } try { if (channel1 != null) { SelectionKey key = pastryNode.getEnvironment().getSelectorManager().getKey(channel1); if (key != null) { key.cancel(); } channel1.close(); channel1 = null; } if (channel2 != null) { SelectionKey key = pastryNode.getEnvironment().getSelectorManager().getKey(channel2); if (key != null) { key.cancel();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -