📄 socketcollectionmanager.java
字号:
* Initiates a liveness test on the given address, if the remote node does not * respond, it is declared dead. * * @param path DESCRIBE THE PARAMETER */ protected void checkLiveness(SourceRoute path) { if (path.getLastHop().equals(localAddress)) { return; } if (!resigned) { if (logger.level <= Logger.FINE) { logger.log("CHECK DEAD: " + localAddress + " CHECKING DEATH OF PATH " + path); } DeadChecker checker = new DeadChecker(path, NUM_PING_TRIES);// ((SocketPastryNode) pastryNode).getTimer().scheduleAtFixedRate(checker, PING_DELAY + random.nextInt(PING_JITTER), PING_DELAY + random.nextInt(PING_JITTER)); int delay = manager.proximity(path) * 2; if (delay > PING_DELAY) { delay = PING_DELAY; } ((SocketPastryNode) pastryNode).getTimer().schedule(checker, delay); pingManager.ping(path, checker); } } /** * Method which should be called when a remote node is declared dead. This * method will close any outstanding sockets, and will reroute any pending * messages * * @param address The address which was declared dead */ public void declaredDead(EpochInetSocketAddress address) { SourceRoute[] routes = (SourceRoute[]) sockets.keySet().toArray(new SourceRoute[0]); for (int i = 0; i < routes.length; i++) { if (routes[i].getLastHop().equals(address)) { if (logger.level <= Logger.FINE) { logger.log("WRITE_TIMER::Closing active socket to " + routes[i]); } ((SocketManager) sockets.get(routes[i])).close(); } } } /** * ----- INTERNAL METHODS ----- * * @param path DESCRIBE THE PARAMETER * @param message DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ /** * Method which sends a message across the wire. * * @param message The message to send * @param path The path to send the message along * @return DESCRIBE THE RETURN VALUE */ protected boolean sendInternal(SourceRoute path, SocketBuffer message) { if (!resigned) { synchronized (sockets) { if (!sockets.containsKey(path)) { if (logger.level <= Logger.FINE) { logger.log("(SCM) No connection open to path " + path + " - opening one"); } openSocket(path, false); } if (sockets.containsKey(path)) { if (logger.level <= Logger.FINE) { logger.log("(SCM) Found connection open to path " + path + " - sending now"); } ((SocketManager) sockets.get(path)).send(message); socketUpdated(path); return true; } else { if (logger.level <= Logger.WARNING) { logger.log("(SCM) ERROR: Could not connect to remote address " + path + " delaying " + message); } return false; } } } else { return true; } } /** * Specified by the SelectionKeyHandler interface. Is called whenever a key * has become acceptable, representing an incoming connection. This method * will accept the connection, and attach a SocketConnector in order to read * the greeting off of the channel. Once the greeting has been read, the * connector will hand the channel off to the appropriate node handle. * * @param key The key which is acceptable. */ public void accept(SelectionKey key) { try { new SocketAccepter(key); } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.log("ERROR (accepting connection): " + e); } } } /** * Method which opens a socket to a given remote node handle, and updates the * bookkeeping to keep track of this socket * * @param path DESCRIBE THE PARAMETER * @param bootstrap DESCRIBE THE PARAMETER */ protected void openSocket(SourceRoute path, boolean bootstrap) { try { synchronized (sockets) { if ((!sockets.containsKey(path)) && ((sockets.size() < MAX_OPEN_SOCKETS) || (getSocketToClose() != null))) { socketOpened(path, new SocketManager(this, path, bootstrap)); } } } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.logException("GOT ERROR " + e + " OPENING PATH - MARKING PATH " + path + " AS DEAD!", e); } closeSocket(path); manager.markDead(path); } } /** * Method which opens a socket to a given remote node handle, and updates the * bookkeeping to keep track of this socket * * @param path DESCRIBE THE PARAMETER * @param appId DESCRIBE THE PARAMETER * @param connector DESCRIBE THE PARAMETER * @param timeout DESCRIBE THE PARAMETER */ protected void openAppSocket(SourceRoute path, int appId, AppSocketReceiver connector, int timeout) { try { synchronized (sockets) { // all of these changes are synchronized on the same data structure appSocketOpened(new SocketAppSocket(this, path, appId, connector, timeout)); } } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.logException("GOT ERROR " + e + " OPENING PATH - MARKING PATH " + path + " AS DEAD!", e); } manager.markDead(path); } } /** * Method which cloeses a socket to a given remote node handle, and updates * the bookkeeping to keep track of this closing. Note that this method does * not completely close the socket, rather, it simply calls shutdown(), which * starts the shutdown process. * * @param path DESCRIBE THE PARAMETER */ protected void closeSocket(SourceRoute path) { synchronized (sockets) { if (sockets.containsKey(path)) { ((SocketManager) sockets.get(path)).shutdown(); } else { if (logger.level <= Logger.SEVERE) { logger.log("(SCM) SERIOUS ERROR: Request to close socket to non-open handle to path " + path); } } } } /** * Method which is designed to be called by node handles when they wish to * open a socket to their remote node. This method will determine if another * node handle needs to disconnect, and will disconnect the ejected node * handle if necessary. * * @param manager The manager for the remote address * @param path DESCRIBE THE PARAMETER */ protected void socketOpened(SourceRoute path, SocketManager manager) { synchronized (sockets) { if (!sockets.containsKey(path)) { unIdentifiedSM.remove(manager); sockets.put(path, manager); socketQueue.addFirst(path); if (logger.level <= Logger.FINE) { logger.log("(SCM) Recorded opening of socket to path " + path); } if ((sockets.size() + appSockets.size()) > MAX_OPEN_SOCKETS) { //SourceRoute toClose = (SourceRoute) socketQueue.removeLast(); closeOneSocket(); } } else { if (logger.level <= Logger.FINE) { logger.logException("(SCM) Request to record path opening for already-open path " + path, new Exception("stack trace")); } String local = "" + localAddress.getAddress().getAddress().getHostAddress() + ":" + localAddress.getAddress().getPort(); String remote = "" + path.getLastHop().getAddress().getAddress().getHostAddress() + ":" + path.getLastHop().getAddress().getPort(); if (logger.level <= Logger.FINE) { logger.log("(SCM) RESOLVE: Comparing paths " + local + " and " + remote); } if (remote.compareTo(local) < 0) { if (logger.level <= Logger.FINE) { logger.log("(SCM) RESOLVE: Cancelling existing connection to " + path); } SocketManager toClose = (SocketManager) sockets.get(path); socketClosed(path, toClose); socketOpened(path, manager); toClose.close(); } else { if (logger.level <= Logger.FINE) { logger.log("(SCM) RESOLVE: Implicitly cancelling new connection to path " + path); } } } } } /** * DESCRIBE THE METHOD * * @param sas DESCRIBE THE PARAMETER */ protected void appSocketOpened(SocketAppSocket sas) { synchronized (sockets) { if (logger.level <= Logger.FINE) { logger.log("(SCM) Recorded opening of app socket " + sas); } appSockets.addFirst(manager); if ((sockets.size() + appSockets.size()) > MAX_OPEN_SOCKETS) { //SourceRoute toClose = (SourceRoute) socketQueue.removeLast(); closeOneSocket(); } } } /** * TODO: Add also checking the top of the AppSocketQueue */ protected void closeOneSocket() { SourceRoute toClose = getSocketToClose(); socketQueue.remove(toClose); if (logger.level <= Logger.FINE) { logger.log("(SCM) Too many sockets open - closing currently unused socket to path " + toClose); } closeSocket(toClose); } /** * Method which is designed to be called *ONCE THE SOCKET HAS BEEN CLOSED*. * This method simply updates the bookeeping, but does not actually close the * socket. * * @param manager The manager for the remote address * @param path DESCRIBE THE PARAMETER */ protected void socketClosed(SourceRoute path, SocketManager manager) { synchronized (sockets) { if (sockets.containsKey(path)) { if (sockets.get(path) == manager) { if (logger.level <= Logger.FINE) { logger.log("(SCM) Recorded closing of socket to " + path); } socketQueue.remove(path); sockets.remove(path); } else { if (logger.level <= Logger.FINE) { logger.log("(SCM) SocketClosed called with corrent address, but incorrect manager - not a big deal."); } } } else { if (logger.level <= Logger.FINE) { logger.log("(SCM) SocketClosed called with socket not in the list: path:" + path + " manager:" + manager); } } } } /** * Method which is designed to be called *ONCE THE SOCKET HAS BEEN CLOSED*. * This method simply updates the bookeeping, but does not actually close the * socket. * * @param sas DESCRIBE THE PARAMETER */ protected void appSocketClosed(SocketAppSocket sas) { synchronized (sockets) { if (appSockets.contains(sas)) { if (logger.level <= Logger.FINE) { logger.log("(SCM) Recorded closing of app socket to " + sas); } appSockets.remove(sas); } else { if (logger.level <= Logger.FINE) { logger.log("(SCM) appSocketClosed called with socket not in the list: path:" + sas); } } } } /** * Method which is designed to be called whenever a node has network activity. * This is used to determine which nodes should be disconnected, should it be * necessary (implementation of a LRU stack). * * @param path DESCRIBE THE PARAMETER */ protected void socketUpdated(SourceRoute path) { synchronized (sockets) { if (sockets.containsKey(path)) { socketQueue.remove(path); socketQueue.addFirst(path); } else { if (logger.level <= Logger.SEVERE) { logger.log("(SCM) SERIOUS ERROR: Request to record update for non-existant socket to " + path); } } } } /** * Method which is designed to be called when a new source route manager is * created. This method will close another source route, if there are too many * source routes already open through this node. * * @param manager The manager for the remote address */ protected void sourceRouteOpened(SourceRouteManager manager) { if (!sourceRouteQueue.contains(manager)) { sourceRouteQueue.addFirst(manager); if (logger.level <= Logger.FINE) { logger.log("(SCM) Recorded opening of source route manager " + manager); } if (sourceRouteQueue.size() > MAX_OPEN_SOURCE_ROUTES) { SourceRouteManager toClose = (SourceRouteManager) sourceRouteQueue.removeLast(); if (logger.level <= Logger.FINE) { logger.log("(SCM) Too many source routes open - closing source route manager " + toClose); } toClose.close(); sourceRouteClosed(toClose); } } else { if (logger.level <= Logger.FINE) { logger.log("(SCM) ERROR: Request to record source route opening for already-open manager " + manager); } sourceRouteUpdated(manager); } } /** * Method which is designed to be called *ONCE THE SOURCE ROUTE MANAGER HAS * BEEN CLOSED*. This method simply updates the bookeeping, but does not * actually close the source route. * * @param manager The manager for the remote address */ protected void sourceRouteClosed(SourceRouteManager manager) { if (sourceRouteQueue.contains(manager)) { sourceRouteQueue.remove(manager); if (logger.level <= Logger.FINE) { logger.log("(SCM) Recorded closing of source route manager " + manager); } } else { if (logger.level <= Logger.WARNING) { logger.log("(SCM) ERROR: Request to record source route closing for unknown manager " + manager); } } } /** * Method which is designed to be called whenever a source route has network * activity. This is used to determine which source routes should be * disconnected, should it be necessary (implementation of a LRU stack). * * @param manager The manager with activity */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -