⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketcollectionmanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither  the name  of Rice  University (RICE) nor  the names  of itscontributors may be  used to endorse or promote  products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package rice.pastry.socket;import java.io.*;import java.net.*;import java.nio.*;import java.nio.channels.*;import java.util.*;import rice.environment.logging.Logger;import rice.environment.params.Parameters;import rice.environment.random.RandomSource;import rice.p2p.commonapi.appsocket.AppSocketReceiver;import rice.p2p.commonapi.rawserialization.MessageDeserializer;import rice.p2p.util.MathUtils;import rice.pastry.*;import rice.pastry.messaging.*;import rice.pastry.routing.*;import rice.pastry.socket.SocketSourceRouteManager.AddressManager;import rice.pastry.socket.messaging.*;import rice.selector.*;/** * Class which maintains all outgoing open sockets. It is responsible for * keeping only MAX_OPEN_SOCKETS number of client sockets open at once. It also * binds a ServerSocketChannel to the specified port and listens for incoming * connections. Once a connections is established, it uses the interal * SocketConnector to read the greeting message (HelloMessage) off of the * stream, and hands the connection off to the appropriate node handle. * * @version $Id: SocketCollectionManager.java,v 1.3 2004/03/08 19:53:57 amislove *      Exp $ * @author Alan Mislove */public class SocketCollectionManager extends SelectionKeyHandler {  // the number of sockets where we start closing other sockets  /**   * DESCRIBE THE FIELD   */  public final int MAX_OPEN_SOCKETS;  // the number of source routes through this node (note, each has 2 sockets)  /**   * DESCRIBE THE FIELD   */  public final int MAX_OPEN_SOURCE_ROUTES;  // the size of the buffers for the socket  /**   * DESCRIBE THE FIELD   */  public final int SOCKET_BUFFER_SIZE;  // how long to wait for a ping response to come back before declaring lost  /**   * DESCRIBE THE FIELD   */  public final int PING_DELAY;  // factor of jitter to adjust to the ping waits - we may wait up to this time before giving up  /**   * DESCRIBE THE FIELD   */  public final float PING_JITTER;  // how many tries to ping before giving up  /**   * DESCRIBE THE FIELD   */  public final int NUM_PING_TRIES;  // the maximal amount of time to wait for write to be called before checking liveness  /**   * DESCRIBE THE FIELD   */  public final int WRITE_WAIT_TIME;  // the initial timeout for exponential backoff  /**   * DESCRIBE THE FIELD   */  public final long BACKOFF_INITIAL;  // the limit on the number of times for exponential backoff  /**   * DESCRIBE THE FIELD   */  public final int BACKOFF_LIMIT;  // the pastry node which this manager serves  SocketPastryNode pastryNode;  // the local address of the node  EpochInetSocketAddress localAddress;  // the linked list of open sockets  private LinkedList socketQueue;  // maps a SelectionKey -> SocketConnector  /**   * DESCRIBE THE FIELD   */  public Hashtable sockets;  // maps a SelectionKey -> SocketConnector  /**   * DESCRIBE THE FIELD   */  public LinkedList appSockets;  /**   * used to fix a memory leak caused by a hanging SM who never was put into the   * sockets collection put() called when SM is constructed remove() called when   * added to socekts on socketOpened emptied() in SCM.destroy()   */  HashSet unIdentifiedSM = new HashSet();  // the linked list of open source routes  private LinkedList sourceRouteQueue;  // ServerSocketChannel for accepting incoming connections  private SelectionKey key;  // the ping manager for doing udp stuff  private PingManager pingManager;  // the source route manager, which keeps track of routes  SocketSourceRouteManager manager;  // whether or not we've resigned  private boolean resigned;  /**   * DESCRIBE THE FIELD   */  protected Logger logger;  /**   * DESCRIBE THE FIELD   */  protected RandomSource random;  MessageDeserializer defaultDeserializer;  // the header which signifies a normal socket  /**   * DESCRIBE THE FIELD   */  protected static byte[] HEADER_DIRECT = new byte[]{0x06, 0x1B, 0x49, 0x74};  // the header which signifies a normal socket  /**   * DESCRIBE THE FIELD   */  protected static byte[] HEADER_SOURCE_ROUTE = new byte[]{0x19, 0x53, 0x13, 0x00};  // the length of the socket header  /**   * DESCRIBE THE FIELD   */  public static int HEADER_SIZE = HEADER_DIRECT.length;  /**   * DESCRIBE THE FIELD   */  protected static byte[] PASTRY_MAGIC_NUMBER = new byte[]{0x27, 0x40, 0x75, 0x3A};  // this is for historical purposes, and can definately be renamed  // this got added in FP 2.0 when we added a magic number and version number  /**   * DESCRIBE THE FIELD   */  public static int TOTAL_HEADER_SIZE = PASTRY_MAGIC_NUMBER.length + 4 + HEADER_SIZE;  /**   * Constructs a new SocketManager.   *   * @param node The pastry node this manager is serving   * @param manager DESCRIBE THE PARAMETER   * @param bindAddress DESCRIBE THE PARAMETER   * @param proxyAddress DESCRIBE THE PARAMETER   * @param random DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  public SocketCollectionManager(SocketPastryNode node, SocketSourceRouteManager manager, EpochInetSocketAddress bindAddress, EpochInetSocketAddress proxyAddress, RandomSource random) throws IOException {    this.pastryNode = node;    defaultDeserializer = new JavaSerializedDeserializer(node);    this.manager = manager;    this.localAddress = proxyAddress;    this.pingManager = new PingManager(node, manager, bindAddress, proxyAddress);    this.socketQueue = new LinkedList();    this.appSockets = new LinkedList();    this.sockets = new Hashtable();    this.sourceRouteQueue = new LinkedList();    this.resigned = false;    this.logger = node.getEnvironment().getLogManager().getLogger(SocketChannelWriter.class, null);    this.random = random;    if (random == null) {      this.random = node.getEnvironment().getRandomSource();    }    Parameters p = pastryNode.getEnvironment().getParameters();    MAX_OPEN_SOCKETS = p.getInt("pastry_socket_scm_max_open_sockets");    MAX_OPEN_SOURCE_ROUTES = p.getInt("pastry_socket_scm_max_open_source_routes");    SOCKET_BUFFER_SIZE = p.getInt("pastry_socket_scm_socket_buffer_size");    PING_DELAY = p.getInt("pastry_socket_scm_ping_delay");    PING_JITTER = p.getFloat("pastry_socket_scm_ping_jitter");    NUM_PING_TRIES = p.getInt("pastry_socket_scm_num_ping_tries");    WRITE_WAIT_TIME = p.getInt("pastry_socket_scm_write_wait_time");    BACKOFF_INITIAL = p.getInt("pastry_socket_scm_backoff_initial");    BACKOFF_LIMIT = p.getInt("pastry_socket_scm_backoff_limit");    if (logger.level <= Logger.FINE) {      logger.log("BINDING TO ADDRESS " + bindAddress + " AND CLAIMING " + localAddress);    }    ServerSocketChannel temp = null;    // just to clean up after the exception    try {      // bind to port      final ServerSocketChannel channel = ServerSocketChannel.open();      temp = channel;      channel.configureBlocking(false);      channel.socket().setReuseAddress(true);      channel.socket().bind(bindAddress.getAddress());      this.key = pastryNode.getEnvironment().getSelectorManager().register(channel, this, 0);      this.key.interestOps(SelectionKey.OP_ACCEPT);    } catch (IOException e) {//      if (logger.level <= Logger.WARNING) logger.logException("ERROR creating server socket channel ",e);      try {        if (temp != null) {          temp.close();        }      } catch (IOException e2) {      }      pingManager.resign();      throw e;    }  }  /**   * Returns whether or not a socket is currently open to the given route   *   * @param route The route   * @return Whether or not a socket is currently open to that route   */  public boolean isOpen(SourceRoute route) {    return sockets.containsKey(route);  }  /**   * Internal method which returns the next socket to be closed   *   * @return The next socket to be closed   */  protected SourceRoute getSocketToClose() {    for (int i = socketQueue.size() - 1; i >= 0; i--) {      if (((SocketManager) sockets.get(socketQueue.get(i))).writer.isEmpty()) {        return (SourceRoute) socketQueue.get(i);      }    }    return null;  }  /**   * Gets the NumSourceRoutes attribute of the SocketCollectionManager object   *   * @return The NumSourceRoutes value   */  public int getNumSourceRoutes() {    return sourceRouteQueue.size();  }  /**   * Gets the NumSockets attribute of the SocketCollectionManager object   *   * @return The NumSockets value   */  public int getNumSockets() {    return socketQueue.size();  }  /**   * Method which returns the internal PingManager   *   * @return The PingManager value   */  public PingManager getPingManager() {    return pingManager;  }  /**   * ----- EXTERNAL METHODS -----   *   * @param path DESCRIBE THE PARAMETER   * @param message DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  /**   * Method which sends bootstraps a node by sending message across the wire,   * using a fake IP address in the header so that the local node is not marked   * alive, and then closes the connection.   *   * @param message The message to send   * @param path DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  public void bootstrap(SourceRoute path, Message message) throws IOException {    if (!resigned) {      synchronized (sockets) {        openSocket(path, true);        ((SocketManager) sockets.get(path)).send(message);      }    }  }  /**   * Method which sends a message across the wire.   *   * @param message The message to send   * @param path DESCRIBE THE PARAMETER   * @param am DESCRIBE THE PARAMETER   */  public void send(SourceRoute path, SocketBuffer message, AddressManager am) {    if (!sendInternal(path, message)) {      new MessageRetry(path, message, am);    }  }  /**   * Method which sends a message across the wire.   *   * @param path DESCRIBE THE PARAMETER   * @param appId DESCRIBE THE PARAMETER   * @param receiver DESCRIBE THE PARAMETER   * @param timeout DESCRIBE THE PARAMETER   */  public void connect(SourceRoute path, int appId, AppSocketReceiver receiver, int timeout) {    openAppSocket(path, appId, receiver, timeout);  }  /**   * Method which suggests a ping to the remote node.   *   * @param route The route to use   */  public void ping(SourceRoute route) {    if (!resigned) {      pingManager.ping(route, null);    }  }  /**

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -