⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketcollectionmanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
          }          channel2.close();          channel2 = null;        }        sourceRouteClosed(this);      } catch (IOException e) {        if (logger.level <= Logger.WARNING) {          logger.log("ERROR: Recevied exception " + e + " while closing intermediateSourceRoute!");        }      }    }    /**     * Specified by the SelectionKeyHandler interface - calling this tells this     * socket manager that the connection has completed and we can now     * read/write.     *     * @param key The key which is connectable.     */    public void connect(SelectionKey key) {      if (logger.level <= Logger.FINE) {        logger.log("(SRM) " + this + " connecting to key " + (key.channel() == channel1 ? "1" : "2"));      }      try {        // deregister interest in connecting to this socket        if (((SocketChannel) key.channel()).finishConnect()) {          removeInterestOp(key.channel(), SelectionKey.OP_CONNECT);        }        if (logger.level <= Logger.FINE) {          logger.log("(SRM) Found connectable source route channel - completed connection");        }      } catch (IOException e) {        if (logger.level <= Logger.WARNING) {          logger.log("(SRM) Got exception " + e + " on connect - killing off source route");        }        close();      }    }    /**     * Reads from the socket attached to this connector.     *     * @param key The selection key for this manager     */    public void read(SelectionKey key) {      String k = (key.channel() == channel1 ? "1" : "2");      if (logger.level <= Logger.FINE) {        logger.log("(SRM) " + this + " reading from key " + k + " " + key.interestOps());      }      try {        try {          if (repeater.read((SocketChannel) key.channel())) {            addInterestOp(otherChannel(key.channel()), SelectionKey.OP_WRITE);            removeInterestOp(key.channel(), SelectionKey.OP_READ);          }          if (logger.level <= Logger.FINE) {            logger.log("(SRM) " + this + " done reading from key " + k);          }        } catch (ClosedChannelException e) {          if (logger.level <= Logger.FINE) {            logger.log("(SRM) " + this + " reading from key " + k + " returned -1 - processing shutdown");          }          // then determine if the sockets are now completely shut down,          // or if only half is now closed          if (otherChannel(key.channel()).socket().isInputShutdown()) {            if (logger.level <= Logger.FINE) {              logger.log("(SRM) " + this + " other key is shut down - closing");            }            close();          } else {            // first, deregister in reading and writing to the appropriate sockets            ((SocketChannel) key.channel()).socket().shutdownInput();            removeInterestOp(key.channel(), SelectionKey.OP_READ);            removeInterestOp(otherChannel(key.channel()), SelectionKey.OP_WRITE);            if (logger.level <= Logger.FINE) {              logger.log("(SRM) " + this + " other key not yet closed - shutting it down");            }            shutdown(otherChannel(key.channel()));          }        }      } catch (IOException e) {        if (logger.level <= Logger.FINE) {          logger.logException(            "(SRM) ERROR " + e + " reading source route - cancelling.", e);        }        close();      }    }    /**     * Writes to the socket attached to this socket manager.     *     * @param key The selection key for this manager     */    public synchronized void write(SelectionKey key) {      String k = (key.channel() == channel1 ? "1" : "2");      if (logger.level <= Logger.FINER) {        logger.log("(SRM) " + this + " writing to key " + k + " " + key.interestOps());      }      try {        if (repeater.write((SocketChannel) key.channel())) {          addInterestOp(otherChannel(key.channel()), SelectionKey.OP_READ);          removeInterestOp(key.channel(), SelectionKey.OP_WRITE);        }        if (logger.level <= Logger.FINER) {          logger.log("(SRM) " + this + " done writing to key " + k);        }      } catch (IOException e) {        if (logger.level <= Logger.WARNING) {          logger.log("ERROR " + e + " writing source route - cancelling.");        }        close();      }    }    /**     * Accepts a new connection on the given key     *     * @param key DESCRIBE THE PARAMETER     * @exception IOException DESCRIBE THE EXCEPTION     */    protected void acceptConnection(SelectionKey key) throws IOException {      if (logger.level <= Logger.FINE) {        logger.log("(SRM) " + this + " accepted connection for key 1 as " + ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());      }      if (logger.level <= Logger.FINE) {        logger.log("(SRM) Accepted source route connection from " + ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());      }      pastryNode.getEnvironment().getSelectorManager().register(key.channel(), this, SelectionKey.OP_READ);      this.channel1 = (SocketChannel) key.channel();    }    /**     * Creates the outgoing socket to the remote handle     *     * @param address The accress to connect to     * @exception IOException DESCRIBE THE EXCEPTION     */    protected void createConnection(final EpochInetSocketAddress address) throws IOException {      if (logger.level <= Logger.FINE) {        logger.log("(SRM) " + this + " creating connection for key 2 as " + address.getAddress());      }      channel2 = SocketChannel.open();      channel2.socket().setSendBufferSize(SOCKET_BUFFER_SIZE);      channel2.socket().setReceiveBufferSize(SOCKET_BUFFER_SIZE);      channel2.configureBlocking(false);      if (logger.level <= Logger.FINE) {        logger.log("(SRM) " + "Initiating source route connection to " + address);      }      pastryNode.broadcastChannelOpened(address.address, NetworkListener.REASON_SR);      boolean done = channel2.connect(address.getAddress());      if (done) {        pastryNode.getEnvironment().getSelectorManager().register(channel2, this, SelectionKey.OP_READ);      } else {        pastryNode.getEnvironment().getSelectorManager().register(channel2, this, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);      }      if (logger.level <= Logger.FINE) {        logger.log("(SRM) " + this + "   setting initial ops to " + SelectionKey.OP_READ + " for key 2");      }    }    /**     * DESCRIBE THE METHOD     *     * @return DESCRIBE THE RETURN VALUE     */    public String toString() {      String s1 = null;      if (channel1 != null) {        if (channel1.socket() != null) {          if (channel1.socket().getRemoteSocketAddress() != null) {            s1 = channel1.socket().getRemoteSocketAddress().toString();          } else {            s1 = channel1.socket().toString();          }        } else {          s1 = channel1.toString();        }      }      String s2 = null;      if (channel2 != null) {        if (channel2.socket() != null) {          if (channel2.socket().getRemoteSocketAddress() != null) {            s2 = channel2.socket().getRemoteSocketAddress().toString();          } else {            s2 = channel2.socket().toString();          }        } else {          s2 = channel2.toString();        }      }      return "SourceRouteManager " + s1 + " to " + s2;    }  }  /**   * Internal class which reads the greeting message off of a newly-accepted   * socket. This class determines whether this is a normal connection or a   * source-route request, and then hands the connection off to the appropriate   * handler (SocketManager or SourceRouteManager).   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  protected class SocketAccepter extends SelectionKeyHandler {    // the key to read from    private SelectionKey key;    // the buffer used to read the header    private ByteBuffer buffer;    /**     * Private method which is designed to examine the newly read buffer and     * handoff the connection to the approriate handler     *     * @exception IOException DESCRIBE THE EXCEPTION     */    ByteBuffer appTypeBuffer = null;    byte[] array = null;    /**     * Constructor which accepts an incoming connection, represented by the     * selection key. This constructor builds a new     * IntermediateSourceRouteManager, and waits until the greeting message is     * read from the other end. Once the greeting is received, the manager makes     * sure that a socket for this handle is not already open, and then proceeds     * as normal.     *     * @param key The server accepting key for the channel     * @exception IOException DESCRIBE THE EXCEPTION     */    public SocketAccepter(SelectionKey key) throws IOException {      this.buffer = ByteBuffer.allocateDirect(TOTAL_HEADER_SIZE);      acceptConnection(key);    }    /**     * Method which closes down this socket manager, by closing the socket,     * cancelling the key and setting the key to be interested in nothing     */    public void close() {      try {        if (key != null) {          key.channel().close();          key.cancel();          key.attach(null);          key = null;        }      } catch (IOException e) {        if (logger.level <= Logger.WARNING) {          logger.log("(SA) " + "ERROR: Recevied exception " + e + " while closing just accepted socket!");        }      }    }    /**     * Accepts a new connection on the given key     *     * @param serverKey The server socket key     * @exception IOException DESCRIBE THE EXCEPTION     */    protected void acceptConnection(SelectionKey serverKey) throws IOException {      final SocketChannel channel = (SocketChannel) ((ServerSocketChannel) serverKey.channel()).accept();      channel.socket().setSendBufferSize(SOCKET_BUFFER_SIZE);      channel.socket().setReceiveBufferSize(SOCKET_BUFFER_SIZE);      channel.configureBlocking(false);      if (logger.level <= Logger.FINE) {        logger.log("(SA) " + "Accepted incoming connection from " + channel.socket().getRemoteSocketAddress());      }      pastryNode.broadcastChannelOpened((InetSocketAddress) channel.socket().getRemoteSocketAddress(), NetworkListener.REASON_ACC_NORMAL);      key = pastryNode.getEnvironment().getSelectorManager().register(channel, this, SelectionKey.OP_READ);    }    /**     * Reads from the socket attached to this connector.     *     * @param key The selection key for this manager     */    public void read(SelectionKey key) {      try {        int read = ((SocketChannel) key.channel()).read(buffer);        if (logger.level <= Logger.FINE) {          logger.log("(SA)1 Read " + read + " bytes from newly accepted connection.");        }        // implies that the channel is closed        if (read == -1) {          throw new IOException("Error on read - the channel has been closed.");        }        // this could be a problem if a socket is opened and nothing, or not enough is being written        if (buffer.remaining() == 0) {          processBuffer();        }      } catch (IOException e) {        if (logger.level <= Logger.FINE) {          logger.log("(SA) ERROR " + e + " reading source route - cancelling.");        }        close();      }    }    /**     * DESCRIBE THE METHOD     *     * @exception IOException DESCRIBE THE EXCEPTION     */    private void processBuffer() throws IOException {      // NOTE: this is kind of a funky hack, the reason is that it is possible to      // read the header without reading the appId bytes.  So, this code makes      // read/processBuffer just keep being called until both arrive      // we don't want to touch buffer once we construct appTypeBuffer, and we      // return until it finishes reading      if (appTypeBuffer == null) {        // flip the buffer        buffer.flip();        array = new byte[HEADER_SIZE];        buffer.get(array, 0, HEADER_SIZE);        if (!Arrays.equals(array, PASTRY_MAGIC_NUMBER)) {          throw new IOException("Not a pastry socket:" + array[0] + "," + array[1] + "," + array[2] + "," + array[3]);        }        buffer.get(array, 0, HEADER_SIZE);        int version = MathUtils.byteArrayToInt(array);        if (!(version == 0)) {          throw new IOException("Unknown Version:" + version);        }        // allocate space for the header        buffer.get(array, 0, HEADER_SIZE);        appTypeBuffer = ByteBuffer.allocateDirect(4);      }      // verify the buffer      if (Arrays.equals(array, HEADER_DIRECT)) {        int read = ((SocketChannel) key.channel()).read(appTypeBuffer);        if (logger.level <= Logger.FINE) {          logger.log("(SA)2 Read " + read + " bytes from newly accepted connection.");        }        if (appTypeBuffer.hasRemaining()) {          return;        }        appTypeBuffer.flip();        byte[] appIDbytes = new byte[4];        appTypeBuffer.get(appIDbytes, 0, 4);        int appId = MathUtils.byteArrayToInt(appIDbytes);//        if (logger.level <= Logger.FINE) logger.log("Found connection with AppId "+appId);        // TODO: make this level FINE when done        if (appId == 0) {          unIdentifiedSM.add(new SocketManager(SocketCollectionManager.this, key));        } else {          if (logger.level <= Logger.FINE) {            logger.log("Found connection with AppId " + appId);          }          appSockets.add(new SocketAppSocket(SocketCollectionManager.this, key, appId));        }      } else if (Arrays.equals(array, HEADER_SOURCE_ROUTE)) {        new SourceRouteManager(key);      } else {        if (logger.level <= Logger.WARNING) {          logger.log("ERROR: Improperly formatted header received accepted connection - ignoring.");        }        if (logger.level <= Logger.WARNING) {          logger.log("READ " + array[0] + " " + array[1] + " " + array[2] + " " + array[3]);        }        throw new IOException("Improperly formatted header received - unknown header.");      }    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -