⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketpastrynodefactory.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
   * generated NodeId.   *   * @param bootstrap Node handle to bootstrap from.   * @param nodeId DESCRIBE THE PARAMETER   * @return A node with a random ID and next port number.   */  public PastryNode newNode(final NodeHandle bootstrap, Id nodeId) {    return newNode(bootstrap, nodeId, null);  }  /**   * Method which creates a Pastry node from the next port with a randomly   * generated NodeId.   *   * @param bootstrap Node handle to bootstrap from.   * @param proxy DESCRIBE THE PARAMETER   * @return A node with a random ID and next port number.   */  public PastryNode newNode(NodeHandle bootstrap, InetSocketAddress proxy) {    return newNode(bootstrap, nidFactory.generateNodeId(), proxy);  }  /**   * Method which creates a Pastry node from the next port with a randomly   * generated NodeId.   *   * @param bootstrap Node handle to bootstrap from.   * @param nodeId DESCRIBE THE PARAMETER   * @param pAddress DESCRIBE THE PARAMETER   * @return A node with a random ID and next port number.   */  public PastryNode newNode(NodeHandle bootstrap, Id nodeId,                            InetSocketAddress pAddress) {    try {      return newNode(bootstrap, nodeId, pAddress, true);      // fix the method just below if you change this    } catch (IOException e) {      if (logger.level <= Logger.WARNING) {        logger.log("Warning: " + e);      }      if (environment.getParameters().getBoolean("pastry_socket_increment_port_after_construction")) {        port++;        try {          return newNode(bootstrap, nodeId, pAddress);          // recursion, this will prevent from things getting too out of hand in          // case the node can't bind to anything, expect a StackOverflowException        } catch (StackOverflowError soe) {          if (logger.level <= Logger.SEVERE) {            logger.log("SEVERE: SocketPastryNodeFactory: Could not bind on any ports!" + soe);          }          throw soe;        }      } else {        throw new RuntimeException(e);      }    }  }  /**   * DESCRIBE THE METHOD   *   * @param bootstrap DESCRIBE THE PARAMETER   * @param nodeId DESCRIBE THE PARAMETER   * @param pAddress DESCRIBE THE PARAMETER   * @param throwException DESCRIBE THE PARAMETER   * @return DESCRIBE THE RETURN VALUE   * @exception IOException DESCRIBE THE EXCEPTION   */  public PastryNode newNode(NodeHandle bootstrap, Id nodeId,                            InetSocketAddress pAddress, boolean throwException) throws IOException {    if (!throwException) {      return newNode(bootstrap, nodeId, pAddress);    }    // yes, this is sort of bizarre    // the idea is that we can't throw an exception by default because it will break reverse compatibility    // so this method gets called twice if throwException is false.  But the second time,    // it will be called with true, but will be    // wrapped with the above function which will catch the exception.    // -Jeff May 12, 2006    if (bootstrap == null) {      if (logger.level <= Logger.WARNING) {        logger.log("No bootstrap node provided, starting a new ring binding to address "          + localAddress + ":" + port + "...");      }    }    // this code builds a different environment for each PastryNode    Environment environment = this.environment;    if (this.environment.getParameters().getBoolean(      "pastry_factory_multipleNodes")) {      if (this.environment.getLogManager() instanceof CloneableLogManager) {        LogManager lman = ((CloneableLogManager) this.environment          .getLogManager()).clone("0x" + nodeId.toStringBare());        SelectorManager sman = this.environment.getSelectorManager();        Processor proc = this.environment.getProcessor();        if (this.environment.getParameters().getBoolean(          "pastry_factory_selectorPerNode")) {          sman = new SelectorManager(nodeId.toString() + " Selector",            this.environment.getTimeSource(), lman);        }        if (this.environment.getParameters().getBoolean(          "pastry_factory_processorPerNode")) {          proc = new SimpleProcessor(nodeId.toString() + " Processor");        }        environment = new Environment(sman, proc, this.environment          .getRandomSource(), this.environment.getTimeSource(), lman,          this.environment.getParameters());        this.environment.addDestructable(environment);      }    }    try {      final SocketPastryNode pn = new SocketPastryNode(nodeId, environment);      SocketSourceRouteManager srManager = null;      EpochInetSocketAddress localAddress = null;      EpochInetSocketAddress proxyAddress = null;      // NOTE: We _don't_ want to use the environment RandomSource because this      // will cause      // problems if we run the same node twice quickly with the same seed. Epochs      // should really      // be different every time.      long epoch = random.nextLong();      synchronized (this) {        localAddress = getEpochAddress(port, epoch);        if (pAddress == null) {          proxyAddress = localAddress;        } else {          proxyAddress = new EpochInetSocketAddress(pAddress, epoch);        }        srManager = new SocketSourceRouteManager(pn, localAddress, proxyAddress,          random);        if (environment.getParameters().getBoolean("pastry_socket_increment_port_after_construction")) {          port++;        }      }      pn.setSocketSourceRouteManager(srManager);      SocketNodeHandle localhandle = new SocketNodeHandle(proxyAddress, nodeId);      localhandle = (SocketNodeHandle) pn.coalesce(localhandle);      MessageDispatch msgDisp = new MessageDispatch(pn);      RoutingTable routeTable = new RoutingTable(localhandle, rtMax, rtBase,        environment);      LeafSet leafSet = new LeafSet(localhandle, lSetSize);      StandardRouter router = new StandardRouter(pn);      StandardRouteSetProtocol rsProtocol = new StandardRouteSetProtocol(        pn, routeTable, environment);      pn.setElements(localhandle, msgDisp, leafSet, routeTable);      router.register();      rsProtocol.register();      pn.setSocketElements(proxyAddress, leafSetMaintFreq, routeSetMaintFreq);      PeriodicLeafSetProtocol lsProtocol = new PeriodicLeafSetProtocol(pn,        localhandle, leafSet, routeTable);      lsProtocol.register();      // msgDisp.registerReceiver(lsProtocol.getAddress(), lsProtocol);      ConsistentJoinProtocol jProtocol = new ConsistentJoinProtocol(pn,        localhandle, routeTable, leafSet);      jProtocol.register();      if (bootstrap != null) {        bootstrap = (SocketNodeHandle) pn.coalesce(bootstrap);      }      try {        Thread.sleep(1000);      } catch (InterruptedException e) {      }      pn.doneNode(getNearest(localhandle, bootstrap));      // pn.doneNode(bootstrap);      return pn;    } catch (IOException ioe) {      // this will useually be a bind exception      // clean up Environment      if (this.environment.getParameters().getBoolean(        "pastry_factory_multipleNodes")) {        environment.destroy();      }      throw ioe;    }  }  /**   * Method which can be used to test the connectivity contstrains of the local   * node. This (optional) method is designed to be called by applications to   * ensure that the local node is able to connect through the network - checks   * can be done to check TCP/UDP connectivity, firewall setup, etc... If the   * method works, then nothing should be done and the method should return. If   * an error condition is detected, an exception should be thrown.   *   * @param timeout DESCRIBE THE PARAMETER   * @param local DESCRIBE THE PARAMETER   * @param existing DESCRIBE THE PARAMETER   * @param env DESCRIBE THE PARAMETER   * @param logger DESCRIBE THE PARAMETER   * @return DESCRIBE THE RETURN VALUE   * @exception IOException DESCRIBE THE EXCEPTION   */  public static InetSocketAddress verifyConnection(int timeout,                                                   InetSocketAddress local, InetSocketAddress[] existing, Environment env,                                                   Logger logger) throws IOException {    if (logger.level <= Logger.INFO) {      logger.log("Verifying connection of local node " + local + " using "        + existing[0] + " and " + existing.length + " more");    }    DatagramSocket socket = null;    try {      socket = new DatagramSocket(local);      socket.setSoTimeout(timeout);      for (int i = 0; i < existing.length; i++) {//        byte[] buf = PingManager//        .addHeader(SourceRoute//            .build(new EpochInetSocketAddress(existing[i])),//            new IPAddressRequestMessage(env.getTimeSource()//                .currentTimeMillis()), new EpochInetSocketAddress(local),//            env, logger);        SocketBuffer sb = new SocketBuffer(new EpochInetSocketAddress(local),          SourceRoute.build(new EpochInetSocketAddress(existing[i])),          new IPAddressRequestMessage(env.getTimeSource()          .currentTimeMillis()));        DatagramPacket send = new DatagramPacket(sb.getBuffer().array(), sb.getBuffer().limit(), existing[i]);        socket.send(send);      }      DatagramPacket receive = new DatagramPacket(new byte[10000], 10000);      socket.receive(receive);      byte[] data = new byte[receive.getLength() - 38];      System.arraycopy(receive.getData(), 38, data, 0, data.length);      return ((IPAddressResponseMessage) new SocketBuffer(data).deserialize(new PingManager.PMDeserializer(logger))).getAddress();//      return ((IPAddressResponseMessage) PingManager.deserialize(data, env,//          null, logger)).getAddress();    } finally {      if (socket != null) {        socket.close();      }    }  }  /**   * DESCRIBE THE CLASS   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  class TimerContinuation implements Continuation {    /**     * DESCRIBE THE FIELD     */    public Object ret = null;    /**     * DESCRIBE THE METHOD     *     * @param result DESCRIBE THE PARAMETER     */    public void receiveResult(Object result) {      ret = result;      synchronized (this) {        this.notify();      }    }    /**     * DESCRIBE THE METHOD     *     * @param result DESCRIBE THE PARAMETER     */    public void receiveException(Exception result) {      synchronized (this) {        this.notify();      }    }  }  /**   * DESCRIBE THE CLASS   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  class SPNFDeserializer implements MessageDeserializer {    /**     * DESCRIBE THE METHOD     *     * @param buf DESCRIBE THE PARAMETER     * @param type DESCRIBE THE PARAMETER     * @param priority DESCRIBE THE PARAMETER     * @param sender DESCRIBE THE PARAMETER     * @return DESCRIBE THE RETURN VALUE     * @exception IOException DESCRIBE THE EXCEPTION     */    public rice.p2p.commonapi.Message deserialize(InputBuffer buf, short type, byte priority, rice.p2p.commonapi.NodeHandle sender) throws IOException {      switch (type) {        case NodeIdResponseMessage.TYPE:          return new NodeIdResponseMessage(buf);        case LeafSetResponseMessage.TYPE:          return new LeafSetResponseMessage(buf, nhf);        case RoutesResponseMessage.TYPE:          return new RoutesResponseMessage(buf);        case RouteRowResponseMessage.TYPE:          return new RouteRowResponseMessage(buf, nhf);        default:          if (logger.level <= Logger.SEVERE) {            logger.log("SERIOUS ERROR: Received unknown message address: " + 0 + "type:" + type);          }          return null;      }    }  }  static {    TOTAL_HEADER = new byte[SocketCollectionManager.TOTAL_HEADER_SIZE + 4];    // plus the appId    System.arraycopy(SocketCollectionManager.PASTRY_MAGIC_NUMBER, 0, TOTAL_HEADER, 0, SocketCollectionManager.PASTRY_MAGIC_NUMBER.length);    //System.arraycopy(new byte[4],0,TOTAL_HEADER,4,4); // version 0 // can just leave blank zero, cause java zeros everything automatically    System.arraycopy(SocketCollectionManager.HEADER_DIRECT, 0, TOTAL_HEADER, 8, SocketCollectionManager.HEADER_SIZE);    //System.arraycopy(new byte[4],0,TOTAL_HEADER,12,4); // appId 0 // can just leave blank zero, cause java zeros everything automatically  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -