⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketcollectionmanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
   * Initiates a liveness test on the given address, if the remote node does not   * respond, it is declared dead.   *   * @param path DESCRIBE THE PARAMETER   */  protected void checkLiveness(SourceRoute path) {    if (path.getLastHop().equals(localAddress)) {      return;    }    if (!resigned) {      if (logger.level <= Logger.FINE) {        logger.log("CHECK DEAD: " + localAddress + " CHECKING DEATH OF PATH " + path);      }      DeadChecker checker = new DeadChecker(path, NUM_PING_TRIES);//      ((SocketPastryNode) pastryNode).getTimer().scheduleAtFixedRate(checker, PING_DELAY + random.nextInt(PING_JITTER), PING_DELAY + random.nextInt(PING_JITTER));      int delay = manager.proximity(path) * 2;      if (delay > PING_DELAY) {        delay = PING_DELAY;      }      ((SocketPastryNode) pastryNode).getTimer().schedule(checker, delay);      pingManager.ping(path, checker);    }  }  /**   * Method which should be called when a remote node is declared dead. This   * method will close any outstanding sockets, and will reroute any pending   * messages   *   * @param address The address which was declared dead   */  public void declaredDead(EpochInetSocketAddress address) {    SourceRoute[] routes = (SourceRoute[]) sockets.keySet().toArray(new SourceRoute[0]);    for (int i = 0; i < routes.length; i++) {      if (routes[i].getLastHop().equals(address)) {        if (logger.level <= Logger.FINE) {          logger.log("WRITE_TIMER::Closing active socket to " + routes[i]);        }        ((SocketManager) sockets.get(routes[i])).close();      }    }  }  /**   * ----- INTERNAL METHODS -----   *   * @param path DESCRIBE THE PARAMETER   * @param message DESCRIBE THE PARAMETER   * @return DESCRIBE THE RETURN VALUE   */  /**   * Method which sends a message across the wire.   *   * @param message The message to send   * @param path The path to send the message along   * @return DESCRIBE THE RETURN VALUE   */  protected boolean sendInternal(SourceRoute path, SocketBuffer message) {    if (!resigned) {      synchronized (sockets) {        if (!sockets.containsKey(path)) {          if (logger.level <= Logger.FINE) {            logger.log("(SCM) No connection open to path " + path + " - opening one");          }          openSocket(path, false);        }        if (sockets.containsKey(path)) {          if (logger.level <= Logger.FINE) {            logger.log("(SCM) Found connection open to path " + path + " - sending now");          }          ((SocketManager) sockets.get(path)).send(message);          socketUpdated(path);          return true;        } else {          if (logger.level <= Logger.WARNING) {            logger.log("(SCM) ERROR: Could not connect to remote address " + path + " delaying " + message);          }          return false;        }      }    } else {      return true;    }  }  /**   * Specified by the SelectionKeyHandler interface. Is called whenever a key   * has become acceptable, representing an incoming connection. This method   * will accept the connection, and attach a SocketConnector in order to read   * the greeting off of the channel. Once the greeting has been read, the   * connector will hand the channel off to the appropriate node handle.   *   * @param key The key which is acceptable.   */  public void accept(SelectionKey key) {    try {      new SocketAccepter(key);    } catch (IOException e) {      if (logger.level <= Logger.WARNING) {        logger.log("ERROR (accepting connection): " + e);      }    }  }  /**   * Method which opens a socket to a given remote node handle, and updates the   * bookkeeping to keep track of this socket   *   * @param path DESCRIBE THE PARAMETER   * @param bootstrap DESCRIBE THE PARAMETER   */  protected void openSocket(SourceRoute path, boolean bootstrap) {    try {      synchronized (sockets) {        if ((!sockets.containsKey(path)) &&          ((sockets.size() < MAX_OPEN_SOCKETS) || (getSocketToClose() != null))) {          socketOpened(path, new SocketManager(this, path, bootstrap));        }      }    } catch (IOException e) {      if (logger.level <= Logger.WARNING) {        logger.logException("GOT ERROR " + e + " OPENING PATH - MARKING PATH " + path + " AS DEAD!", e);      }      closeSocket(path);      manager.markDead(path);    }  }  /**   * Method which opens a socket to a given remote node handle, and updates the   * bookkeeping to keep track of this socket   *   * @param path DESCRIBE THE PARAMETER   * @param appId DESCRIBE THE PARAMETER   * @param connector DESCRIBE THE PARAMETER   * @param timeout DESCRIBE THE PARAMETER   */  protected void openAppSocket(SourceRoute path, int appId, AppSocketReceiver connector, int timeout) {    try {      synchronized (sockets) {        // all of these changes are synchronized on the same data structure        appSocketOpened(new SocketAppSocket(this, path, appId, connector, timeout));      }    } catch (IOException e) {      if (logger.level <= Logger.WARNING) {        logger.logException("GOT ERROR " + e + " OPENING PATH - MARKING PATH " + path + " AS DEAD!", e);      }      manager.markDead(path);    }  }  /**   * Method which cloeses a socket to a given remote node handle, and updates   * the bookkeeping to keep track of this closing. Note that this method does   * not completely close the socket, rather, it simply calls shutdown(), which   * starts the shutdown process.   *   * @param path DESCRIBE THE PARAMETER   */  protected void closeSocket(SourceRoute path) {    synchronized (sockets) {      if (sockets.containsKey(path)) {        ((SocketManager) sockets.get(path)).shutdown();      } else {        if (logger.level <= Logger.SEVERE) {          logger.log("(SCM) SERIOUS ERROR: Request to close socket to non-open handle to path " + path);        }      }    }  }  /**   * Method which is designed to be called by node handles when they wish to   * open a socket to their remote node. This method will determine if another   * node handle needs to disconnect, and will disconnect the ejected node   * handle if necessary.   *   * @param manager The manager for the remote address   * @param path DESCRIBE THE PARAMETER   */  protected void socketOpened(SourceRoute path, SocketManager manager) {    synchronized (sockets) {      if (!sockets.containsKey(path)) {        unIdentifiedSM.remove(manager);        sockets.put(path, manager);        socketQueue.addFirst(path);        if (logger.level <= Logger.FINE) {          logger.log("(SCM) Recorded opening of socket to path " + path);        }        if ((sockets.size() + appSockets.size()) > MAX_OPEN_SOCKETS) {          //SourceRoute toClose = (SourceRoute) socketQueue.removeLast();          closeOneSocket();        }      } else {        if (logger.level <= Logger.FINE) {          logger.logException("(SCM) Request to record path opening for already-open path " + path, new Exception("stack trace"));        }        String local = "" + localAddress.getAddress().getAddress().getHostAddress() + ":" + localAddress.getAddress().getPort();        String remote = "" + path.getLastHop().getAddress().getAddress().getHostAddress() + ":" + path.getLastHop().getAddress().getPort();        if (logger.level <= Logger.FINE) {          logger.log("(SCM) RESOLVE: Comparing paths " + local + " and " + remote);        }        if (remote.compareTo(local) < 0) {          if (logger.level <= Logger.FINE) {            logger.log("(SCM) RESOLVE: Cancelling existing connection to " + path);          }          SocketManager toClose = (SocketManager) sockets.get(path);          socketClosed(path, toClose);          socketOpened(path, manager);          toClose.close();        } else {          if (logger.level <= Logger.FINE) {            logger.log("(SCM) RESOLVE: Implicitly cancelling new connection to path " + path);          }        }      }    }  }  /**   * DESCRIBE THE METHOD   *   * @param sas DESCRIBE THE PARAMETER   */  protected void appSocketOpened(SocketAppSocket sas) {    synchronized (sockets) {      if (logger.level <= Logger.FINE) {        logger.log("(SCM) Recorded opening of app socket " + sas);      }      appSockets.addFirst(manager);      if ((sockets.size() + appSockets.size()) > MAX_OPEN_SOCKETS) {        //SourceRoute toClose = (SourceRoute) socketQueue.removeLast();        closeOneSocket();      }    }  }  /**   * TODO: Add also checking the top of the AppSocketQueue   */  protected void closeOneSocket() {    SourceRoute toClose = getSocketToClose();    socketQueue.remove(toClose);    if (logger.level <= Logger.FINE) {      logger.log("(SCM) Too many sockets open - closing currently unused socket to path " + toClose);    }    closeSocket(toClose);  }  /**   * Method which is designed to be called *ONCE THE SOCKET HAS BEEN CLOSED*.   * This method simply updates the bookeeping, but does not actually close the   * socket.   *   * @param manager The manager for the remote address   * @param path DESCRIBE THE PARAMETER   */  protected void socketClosed(SourceRoute path, SocketManager manager) {    synchronized (sockets) {      if (sockets.containsKey(path)) {        if (sockets.get(path) == manager) {          if (logger.level <= Logger.FINE) {            logger.log("(SCM) Recorded closing of socket to " + path);          }          socketQueue.remove(path);          sockets.remove(path);        } else {          if (logger.level <= Logger.FINE) {            logger.log("(SCM) SocketClosed called with corrent address, but incorrect manager - not a big deal.");          }        }      } else {        if (logger.level <= Logger.FINE) {          logger.log("(SCM) SocketClosed called with socket not in the list: path:" + path + " manager:" + manager);        }      }    }  }  /**   * Method which is designed to be called *ONCE THE SOCKET HAS BEEN CLOSED*.   * This method simply updates the bookeeping, but does not actually close the   * socket.   *   * @param sas DESCRIBE THE PARAMETER   */  protected void appSocketClosed(SocketAppSocket sas) {    synchronized (sockets) {      if (appSockets.contains(sas)) {        if (logger.level <= Logger.FINE) {          logger.log("(SCM) Recorded closing of app socket to " + sas);        }        appSockets.remove(sas);      } else {        if (logger.level <= Logger.FINE) {          logger.log("(SCM) appSocketClosed called with socket not in the list: path:" + sas);        }      }    }  }  /**   * Method which is designed to be called whenever a node has network activity.   * This is used to determine which nodes should be disconnected, should it be   * necessary (implementation of a LRU stack).   *   * @param path DESCRIBE THE PARAMETER   */  protected void socketUpdated(SourceRoute path) {    synchronized (sockets) {      if (sockets.containsKey(path)) {        socketQueue.remove(path);        socketQueue.addFirst(path);      } else {        if (logger.level <= Logger.SEVERE) {          logger.log("(SCM) SERIOUS ERROR: Request to record update for non-existant socket to " + path);        }      }    }  }  /**   * Method which is designed to be called when a new source route manager is   * created. This method will close another source route, if there are too many   * source routes already open through this node.   *   * @param manager The manager for the remote address   */  protected void sourceRouteOpened(SourceRouteManager manager) {    if (!sourceRouteQueue.contains(manager)) {      sourceRouteQueue.addFirst(manager);      if (logger.level <= Logger.FINE) {        logger.log("(SCM) Recorded opening of source route manager " + manager);      }      if (sourceRouteQueue.size() > MAX_OPEN_SOURCE_ROUTES) {        SourceRouteManager toClose = (SourceRouteManager) sourceRouteQueue.removeLast();        if (logger.level <= Logger.FINE) {          logger.log("(SCM) Too many source routes open - closing source route manager " + toClose);        }        toClose.close();        sourceRouteClosed(toClose);      }    } else {      if (logger.level <= Logger.FINE) {        logger.log("(SCM) ERROR: Request to record source route opening for already-open manager " + manager);      }      sourceRouteUpdated(manager);    }  }  /**   * Method which is designed to be called *ONCE THE SOURCE ROUTE MANAGER HAS   * BEEN CLOSED*. This method simply updates the bookeeping, but does not   * actually close the source route.   *   * @param manager The manager for the remote address   */  protected void sourceRouteClosed(SourceRouteManager manager) {    if (sourceRouteQueue.contains(manager)) {      sourceRouteQueue.remove(manager);      if (logger.level <= Logger.FINE) {        logger.log("(SCM) Recorded closing of source route manager " + manager);      }    } else {      if (logger.level <= Logger.WARNING) {        logger.log("(SCM) ERROR: Request to record source route closing for unknown manager " + manager);      }    }  }  /**   * Method which is designed to be called whenever a source route has network   * activity. This is used to determine which source routes should be   * disconnected, should it be necessary (implementation of a LRU stack).   *   * @param manager The manager with activity   */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -