⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketmanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
  /**   * DESCRIBE THE METHOD   *   * @param msg DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  public void send(Message msg) throws IOException {    PRawMessage rm;    if (msg instanceof PRawMessage) {      rm = (PRawMessage) msg;    } else {      rm = new PJavaSerializedMessage(msg);    }    // todo, pool    final SocketBuffer buffer = new SocketBuffer(manager.defaultDeserializer, manager.pastryNode);    buffer.serialize(rm, true);    send(buffer);  }  /**   * The entry point for outgoing messages - messages from here are   * ensocketQueued for transport to the remote node   *   * @param message DESCRIBE THE PARAMETER   */  public void send(final SocketBuffer message) {    writer.enqueue(message);    if (key != null) {      manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key);    }  }  /**   * Method which should change the interestOps of the handler's key. This   * method should *ONLY* be called by the selection thread in the context of a   * select().   *   * @param key The key in question   */  public synchronized void modifyKey(SelectionKey key) {    if (channel.socket().isOutputShutdown()) {      key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);      clearTimer();    } else if ((!writer.isEmpty()) && ((key.interestOps() & SelectionKey.OP_WRITE) == 0)) {      key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);      setTimer();    }  }  /**   * Specified by the SelectionKeyHandler interface - calling this tells this   * socket manager that the connection has completed and we can now read/write.   *   * @param key The key which is connectable.   */  public void connect(SelectionKey key) {    try {      // deregister interest in connecting to this socket      if (channel.finishConnect()) {        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);      }      manager.manager.markAlive(path);      if (manager.logger.level <= Logger.FINE) {        manager.logger.log("(SM) Found connectable channel - completed connection");      }    } catch (Exception e) {      if (manager.logger.level <= Logger.FINE) {        manager.logger.logException(          "(SM) Unable to connect to path " + path + " (" + e + ") marking as dead.", e);      }      manager.manager.markDead(path);      close();    }  }  /**   * Reads from the socket attached to this connector.   *   * @param key The selection key for this manager   */  public void read(SelectionKey key) {    try {      SocketBuffer o = reader.read(channel);      if (o != null) {        if (manager.logger.level <= Logger.FINE) {          manager.logger.log("(SM) Read message " + o + " from socket.");        }        receive(o);      }    } catch (IOException e) {      if (manager.logger.level <= Logger.FINE) {        manager.logger.log("(SM) WARNING " + e + " reading - cancelling.");      }      // if it's not a bootstrap path, and we didn't close this socket's output,      // then check to see if the remote address is dead or just closing a socket      if ((path != null) &&        (!((SocketChannel) key.channel()).socket().isOutputShutdown())) {        manager.checkLiveness(path);      }      close();    }  }  /**   * Writes to the socket attached to this socket manager.   *   * @param key The selection key for this manager   */  public synchronized void write(SelectionKey key) {    try {      clearTimer();      if (writer.write(channel)) {        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);        if (bootstrap) {          close();        }      } else {        setTimer();      }    } catch (IOException e) {      if (manager.logger.level <= Logger.WARNING) {        manager.logger.log("(SM) ERROR " + e + " writing - cancelling.");      }      close();    }  }  /**   * Accepts a new connection on the given key   *   * @param key DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  protected void acceptConnection(SelectionKey key) throws IOException {    this.channel = (SocketChannel) key.channel();    this.key = manager.pastryNode.getEnvironment().getSelectorManager().register(key.channel(), this, 0);    this.key.interestOps(SelectionKey.OP_READ);    if (manager.logger.level <= Logger.FINE) {      manager.logger.log(        "(SM) Accepted connection from " +        channel.socket().getRemoteSocketAddress());    }  }  /**   * Creates the outgoing socket to the remote handle   *   * @param path DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  protected void createConnection(final SourceRoute path) throws IOException {    this.path = path;    this.channel = SocketChannel.open();    this.channel.socket().setSendBufferSize(manager.SOCKET_BUFFER_SIZE);    this.channel.socket().setReceiveBufferSize(manager.SOCKET_BUFFER_SIZE);    this.channel.configureBlocking(false);    this.key = manager.pastryNode.getEnvironment().getSelectorManager().register(channel, this, 0);    if (manager.logger.level <= Logger.FINE) {      manager.logger.log("(SM) Initiating socket connection to path " + path);    }    manager.pastryNode.broadcastChannelOpened(path.getFirstHop().getAddress(), NetworkListener.REASON_NORMAL);    if (this.channel.connect(path.getFirstHop().getAddress())) {      this.key.interestOps(SelectionKey.OP_READ);    } else {      this.key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_CONNECT);    }  }  /**   * Method which is called once a message is received off of the wire If it's   * for us, it's handled here, otherwise, it's passed to the pastry node.   *   * @param delivery DESCRIBE THE PARAMETER   */  /**   * Method which is called once a message is received off of the wire If it's   * for us, it's handled here, otherwise, it's passed to the pastry node.   *   * @param delivery DESCRIBE THE PARAMETER   */  protected void receive(SocketBuffer delivery) {    if (delivery.getAddress() == 0) {      // short circuit, these are the internal messages that Socket handles      try {        delivery.deserialize(deserializer);      } catch (IOException ioe) {        if (manager.logger.level <= Logger.SEVERE) {          manager.logger.logException("Internal error while deserializing.", ioe);        }      }    } else {      long start = manager.pastryNode.getEnvironment().getTimeSource().currentTimeMillis();      manager.pastryNode.receiveMessage(delivery);      if (manager.logger.level <= Logger.FINER) {        manager.logger.log("ST: " + (manager.pastryNode.getEnvironment().getTimeSource().currentTimeMillis() - start) + " deliver of " + delivery);      }      return;    }  }  /**   * Internal method which clears the internal timer   */  protected void clearTimer() {    if (this.timer != null) {      this.timer.cancel();    }    this.timer = null;  }  // short circuit the deserialization step  /**   * DESCRIBE THE CLASS   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  class SMDeserializer implements MessageDeserializer {    /**     * DESCRIBE THE METHOD     *     * @param buf DESCRIBE THE PARAMETER     * @param type DESCRIBE THE PARAMETER     * @param priority DESCRIBE THE PARAMETER     * @param sender DESCRIBE THE PARAMETER     * @return DESCRIBE THE RETURN VALUE     * @exception IOException DESCRIBE THE EXCEPTION     */    public rice.p2p.commonapi.Message deserialize(InputBuffer buf, short type, byte priority, rice.p2p.commonapi.NodeHandle sender) throws IOException {      byte version;      switch (type) {        case SourceRoute.TYPE:          SourceRoute tempPath = SourceRoute.build(buf);          if (path == null) {            path = tempPath;            manager.socketOpened(path, SocketManager.this);            manager.manager.markAlive(path);            writer.setPath(path);            reader.setPath(path.reverse());            if (manager.logger.level <= Logger.FINE) {              manager.logger.log("Read open connection with path " + path);            }          } else {            if (manager.logger.level <= Logger.SEVERE) {              manager.logger.log("SERIOUS ERROR: Received duplicate path assignments: " + path + " and " + tempPath);            }          }          return null;        case NodeIdRequestMessage.TYPE:          version = buf.readByte();          switch (version) {            case 0:              send(new NodeIdResponseMessage(manager.pastryNode.getNodeId(), manager.localAddress.getEpoch()));              break;            default:              throw new IOException("Unknown Version: " + version);          }          return null;        case LeafSetRequestMessage.TYPE:          version = buf.readByte();          switch (version) {            case 0:              send(new LeafSetResponseMessage(manager.pastryNode.getLeafSet()));              break;            default:              throw new IOException("Unknown Version: " + version);          }          return null;        case RoutesRequestMessage.TYPE:          version = buf.readByte();          switch (version) {            case 0:              send(new RoutesResponseMessage((SourceRoute[]) manager.manager.getBest().values().toArray(new SourceRoute[0])));              break;            default:              throw new IOException("Unknown Version: " + version);          }          return null;        case RouteRowRequestMessage.TYPE:          version = buf.readByte();          switch (version) {            case 0://              RouteRowRequestMessage rrMessage = new RouteRowRequestMessage(buf.readInt());//              send(new RouteRowResponseMessage(manager.pastryNode.getRoutingTable().getRow(rrMessage.getRow())));              send(new RouteRowResponseMessage(manager.pastryNode.getRoutingTable().getRow(buf.readInt())));              break;            default:              throw new IOException("Unknown Version: " + version);          }          return null;        default:          if (manager.logger.level <= Logger.SEVERE) {            manager.logger.log("SERIOUS ERROR: Received unknown message address: " + 0 + "type:" + type);          }      }      return null;    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -