⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketcollectionmanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
  protected void sourceRouteUpdated(SourceRouteManager manager) {    if (sourceRouteQueue.contains(manager)) {      sourceRouteQueue.remove(manager);      sourceRouteQueue.addFirst(manager);    } else {      if (logger.level <= Logger.SEVERE) {        logger.log("(SCM) SERIOUS ERROR: Request to record update for unknown source route " + manager);      }    }  }  /**   * Makes this node resign from the network. Is designed to be used for   * debugging and testing.   *   * @exception IOException DESCRIBE THE EXCEPTION   */  public void destroy() throws IOException {    resigned = true;    pingManager.resign();    while (socketQueue.size() > 0) {      ((SocketManager) sockets.get(socketQueue.getFirst())).close();    }    while (sourceRouteQueue.size() > 0) {      ((SourceRouteManager) sourceRouteQueue.getFirst()).close();    }    // anything somehow left in sockets?    while (sockets.size() > 0) {      ((SocketManager) sockets.values().iterator().next()).close();    }    // any left in un    while (unIdentifiedSM.size() > 0) {      ((SocketManager) unIdentifiedSM.iterator().next()).close();    }    key.channel().close();    key.cancel();  }  /**   * Internal testing method which simulates a stall. DO NOT USE!!!!!   */  public void stall() {    key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);    Iterator i = sockets.keySet().iterator();    while (i.hasNext()) {      SelectionKey key = ((SocketManager) sockets.get(i.next())).key;      key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);    }    pingManager.stall();  }  /**   * Internal class which represents a message which is currently delayed,   * waiting for an open socket. The message will be tried using exponential   * backoff up to BACKOFF_LIMIT times before being dropped.   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  protected class MessageRetry extends rice.selector.TimerTask {    // The number of tries that have occurred so far    /**     * DESCRIBE THE FIELD     */    protected int tries = 0;    // the current timeout    /**     * DESCRIBE THE FIELD     */    protected long timeout = BACKOFF_INITIAL;    // The destination route    /**     * DESCRIBE THE FIELD     */    protected SourceRoute route;    // The message    /**     * DESCRIBE THE FIELD     */    protected SocketBuffer message;    // This is to keep a hard link to the AM, so it isn't collected    /**     * DESCRIBE THE FIELD     */    protected AddressManager am;    /**     * Constructor, taking a message and the route     *     * @param message The message     * @param route The route     * @param am DESCRIBE THE PARAMETER     */    public MessageRetry(SourceRoute route, SocketBuffer message, AddressManager am) {      this.am = am;      this.message = message;      this.route = route;      this.timeout = (long) (timeout * (0.8 + (0.4 * random.nextDouble())));      pastryNode.getTimer().schedule(this, timeout);    }    /**     * Main processing method for the DeadChecker object     */    public void run() {      if (!sendInternal(route, message)) {        if (logger.level <= Logger.FINE) {          logger.log("BACKOFF: Could not send message " + message + " after " + tries + " timeout " + timeout + " retries - retrying.");        }        if (tries < BACKOFF_LIMIT) {          tries++;          timeout = (long) ((2 * timeout) * (0.8 + (0.4 * random.nextDouble())));          pastryNode.getTimer().schedule(this, timeout);        } else {          if (logger.level <= Logger.WARNING) {            logger.log("WARNING: Could not send message " + message + " after " + tries + " retries.  Dropping on the floor.");          }        }      } else {        if (logger.level <= Logger.FINE) {          logger.log("BACKOFF: Was able to send message " + message + " after " + tries + " timeout " + timeout + " retries.");        }      }    }  }  /**   * DESCRIBE THE CLASS   *   * @version $Id: SocketCollectionManager.java 3274 2006-05-15 16:17:47Z jeffh   *      $   * @author jeffh   */  protected class DeadChecker extends rice.selector.TimerTask implements PingResponseListener {    // The number of tries that have occurred so far    /**     * DESCRIBE THE FIELD     */    protected int tries = 1;    // the total number of tries before declaring death    /**     * DESCRIBE THE FIELD     */    protected int numTries;    // the path to check    /**     * DESCRIBE THE FIELD     */    protected SourceRoute path;    /**     * Constructor for DeadChecker.     *     * @param numTries DESCRIBE THE PARAMETER     * @param path DESCRIBE THE PARAMETER     */    public DeadChecker(SourceRoute path, int numTries) {      if (logger.level <= Logger.FINE) {        logger.log("DeadChecker(" + path + ") started.");      }      this.path = path;      this.numTries = numTries;    }    /**     * DESCRIBE THE METHOD     *     * @param RTT DESCRIBE THE PARAMETER     * @param timeHeardFrom DESCRIBE THE PARAMETER     * @param path DESCRIBE THE PARAMETER     */    public void pingResponse(SourceRoute path, long RTT, long timeHeardFrom) {      if (logger.level <= Logger.FINE) {        logger.log("Terminated DeadChecker(" + path + ") due to ping.");      }      manager.markAlive(path);      cancel();    }    /**     * Main processing method for the DeadChecker object value of tries before     * run() is called:the time since ping was called:the time since deadchecker     * was started 1:500:500 2:1000:1500 3:2000:3500 4:4000:7500 5:8000:15500 //     * ~15 seconds to find 1 path faulty, using source routes gives us 30     * seconds to find a node faulty     */    public void run() {      if (tries < numTries) {        tries++;        if (manager.getLiveness(path.getLastHop()) == SocketNodeHandle.LIVENESS_ALIVE) {          manager.markSuspected(path);        }        pingManager.ping(path, this);        int absPD = (int) (PING_DELAY * Math.pow(2, tries - 1));        int jitterAmt = (int) (((float) absPD) * PING_JITTER);        int scheduledTime = absPD - jitterAmt + random.nextInt(jitterAmt * 2);        ((SocketPastryNode) pastryNode).getTimer().schedule(this, scheduledTime);      } else {        if (logger.level <= Logger.FINE) {          logger.log("DeadChecker(" + path + ") expired - marking as dead.");        }        manager.markDead(path);        cancel();      }    }    /**     * DESCRIBE THE METHOD     *     * @return DESCRIBE THE RETURN VALUE     */    public boolean cancel() {      pingManager.removePingResponseListener(path, this);      return super.cancel();    }  }  /**   * Private class which is tasked with maintaining a source route which goes   * through this node. This class maintains to sockets, and transfers the data   * between them. It also is responsible for performing the initial handshake   * and sending the data across the wire.   *   * @version $Id: SocketCollectionManager.java 3274 2006-05-15 16:17:47Z jeffh   *      $   * @author jeffh   */  protected class SourceRouteManager extends SelectionKeyHandler {    // the first channel    private SocketChannel channel1;    // the second channel    private SocketChannel channel2;    // the repeater, which does the actual byte moving from socket to socket    private SocketChannelRepeater repeater;    /**     * Constructor which accepts an incoming connection, represented by the     * selection key. This constructor builds a new     * IntermediateSourceRouteManager, and waits until the greeting message is     * read from the other end. Once the greeting is received, the manager makes     * sure that a socket for this handle is not already open, and then proceeds     * as normal.     *     * @param key The server accepting key for the channel     * @exception IOException DESCRIBE THE EXCEPTION     */    public SourceRouteManager(SelectionKey key) throws IOException {      this.repeater = new SocketChannelRepeater(pastryNode, this);      sourceRouteOpened(this);      acceptConnection(key);    }    /**     * Internal method which returns the other key     *     * @param channel DESCRIBE THE PARAMETER     * @return The right key     */    SocketChannel otherChannel(SelectableChannel channel) {      return (channel == channel1 ? channel2 : channel1);    }    /**     * Internal method which adds an interest op to the given channel's interest     * set. One should note that if the passed in key is null, it will determine     * which channel this is the key for, and then rebuild a key for that     * channel.     *     * @param channel The channel     * @param op The operation to add to the key's interest set     * @exception IOException DESCRIBE THE EXCEPTION     */    protected void addInterestOp(SelectableChannel channel, int op) throws IOException {      String k = (channel == channel1 ? "1" : "2");      if (logger.level <= Logger.FINER) {        logger.log("(SRM) " + this + "   adding interest op " + op + " to key " + k);      }      if (pastryNode.getEnvironment().getSelectorManager().getKey(channel) == null) {        if (logger.level <= Logger.FINER) {          logger.log("(SRM) " + this + "   key " + k + " is null - reregistering with ops " + op);        }        pastryNode.getEnvironment().getSelectorManager().register(channel, this, op);      } else {        pastryNode.getEnvironment().getSelectorManager().register(channel, this, pastryNode.getEnvironment().getSelectorManager().getKey(channel).interestOps() | op);        if (logger.level <= Logger.FINER) {          logger.log("(SRM) " + this + "   interest ops for key " + k + " are now " + pastryNode.getEnvironment().getSelectorManager().getKey(channel).interestOps());        }      }    }    /**     * Internal method which removes an interest op to the given key's interest     * set. One should note that if the passed in key no longer has any interest     * ops, it is cancelled, removed from the selector's key set, and the     * corresponding key is set to null in this class.     *     * @param channel The channel     * @param op The operation to remove from the key's interest set     * @exception IOException DESCRIBE THE EXCEPTION     */    protected void removeInterestOp(SelectableChannel channel, int op) throws IOException {      String k = (channel == channel1 ? "1" : "2");      if (logger.level <= Logger.FINER) {        logger.log("(SRM) " + this + "   removing interest op " + op + " from key " + k);      }      SelectionKey key = pastryNode.getEnvironment().getSelectorManager().getKey(channel);      if (key != null) {        key.interestOps(key.interestOps() & ~op);        if (key.interestOps() == 0) {          if (logger.level <= Logger.FINER) {            logger.log("(SRM) " + this + "   key " + k + " has no interest ops - cancelling");          }          pastryNode.getEnvironment().getSelectorManager().cancel(key);        }      }    }    /**     * Method which initiates a shutdown of this socket by calling     * shutdownOutput(). This has the effect of removing the manager from the     * open list.     *     * @param channel DESCRIBE THE PARAMETER     */    public void shutdown(SocketChannel channel) {      try {        if (logger.level <= Logger.FINE) {          logger.log("(SRM) " + this + " shutting down output to key " + (channel == channel1 ? "1" : "2"));        }        channel.socket().shutdownOutput();        sourceRouteClosed(this);      } catch (IOException e) {        if (logger.level <= Logger.SEVERE) {          logger.log("ERROR: Received exception " + e + " while shutting down SR output.");        }        close();      }    }    /**     * Method which closes down this socket manager, by closing the socket,     * cancelling the key and setting the key to be interested in nothing     */    public void close() {      if (logger.level <= Logger.FINE) {        logger.log("(SRM) " + this + " closing source route");      }      try {        if (channel1 != null) {          SelectionKey key = pastryNode.getEnvironment().getSelectorManager().getKey(channel1);          if (key != null) {            key.cancel();          }          channel1.close();          channel1 = null;        }        if (channel2 != null) {          SelectionKey key = pastryNode.getEnvironment().getSelectorManager().getKey(channel2);          if (key != null) {            key.cancel();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -