⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectiontablenio.java

📁 用java实现的一个socket服务器。采用非阻塞模式
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        }
        start_port++;
        continue;
      }
      catch (SocketException bind_ex)
      {
        if (start_port == end_port)
        {
          throw (BindException) ( (new BindException(
              "No available port to bind to")).initCause(bind_ex));
        }
        start_port++;
        continue;
      }
      catch (IOException io_ex)
      {
        if (LOG.isErrorEnabled())
        {
          LOG.error("Attempt to bind serversocket failed, port=" + start_port +
                    ", bind addr=" + bind_addr, io_ex);
        }
        throw io_ex;
      }
      srv_port = start_port;
      break;
    }
    m_serverSocketChannel.register(this.m_acceptSelector,
                                   SelectionKey.OP_ACCEPT);
    return m_serverSocketChannel.socket();
  }

  protected void runRequest(IpAddress addr, ByteBuffer buf)
      throws
      InterruptedException
  {
    //    m_requestProcessors.execute(new ExecuteTask(addr, buf));
  }

  // Represents shutdown
  private static class Shutdown
  {
  }

  // ReadHandler has selector to deal with read, it runs in seperated thread
  private static class ReadHandler
      implements Runnable
  {
    private final Selector SELECTOR = initHandler();
    private final LinkedQueue QUEUE = new LinkedQueue();
    private final ConnectionTableNIO connectTable;

    ReadHandler(ConnectionTableNIO ct)
    {
      connectTable = ct;
    }

    public Selector initHandler()
    {
      // Open the selector
      try
      {
        return Selector.open();
      }
      catch (IOException e)
      {
        if (LOG.isErrorEnabled())
        {
          LOG.error(e);
        }
        throw new IllegalStateException(e.getMessage());
      }

    }

    /**
     * create instances of ReadHandler threads for receiving data.
     *
     * @param workerThreads is the number of threads to create.
     */
    private static ReadHandler[] create(int workerThreads,
                                        ConnectionTableNIO ct)
    {
      ReadHandler[] handlers = new ReadHandler[workerThreads];
      for (int looper = 0; looper < workerThreads; looper++)
      {
        handlers[looper] = new ReadHandler(ct);

        Thread thread = new Thread(handlers[looper], "nioReadHandlerThread");
        thread.setDaemon(true);
        thread.start();
      }
      return handlers;
    }

    private void add(Object conn)
        throws InterruptedException
    {
      QUEUE.insert(conn);
      wakeup();
    }

    private void wakeup()
    {
      SELECTOR.wakeup();
    }

    public void run()
    {
      while (true)
      { // m_s can be closed by the management thread
        int events;
        try
        {
          events = SELECTOR.select();
        }
        catch (IOException e)
        {
          if (LOG.isWarnEnabled())
          {
            LOG.warn("Select operation on socket failed", e);
          }
          continue; // Give up this time
        }
        catch (ClosedSelectorException e)
        {
          if (LOG.isWarnEnabled())
          {
            LOG.warn("Select operation on socket failed", e);
          }
          return; // Selector gets closed, thread stops
        }

        if (events > 0)
        { // there are read-ready channels
          Set readyKeys = SELECTOR.selectedKeys();
          for (Iterator i = readyKeys.iterator(); i.hasNext(); )
          {
            SelectionKey key = (SelectionKey) i.next();
            i.remove();
            // Do partial read and handle call back
            Connection conn = (Connection) key.attachment();
            try
            {
              if (conn.getSocketChannel().isOpen())
              {
                readOnce(conn);
              }
              else
              { // socket connection is already closed, clean up connection state
                conn.closed();
              }
            }
            catch (IOException e)
            {
              if (LOG.isWarnEnabled())
              {
                LOG.warn("Read operation on socket failed", e);
                // The connection must be bad, cancel the key, close socket, then
                // remove it from table!
              }
              key.cancel();
              conn.destroy();
              conn.closed();
            }
          }
        }

        // Now we look at the connection queue to get any new connections added
        Object o;
        // try {
        try
        {
          o = QUEUE.poll(0); // get a connection
          if (null == o)
          {
            continue;
          }

        }
        catch (InterruptedException ex)
        {
          o = null;
          ex.printStackTrace();
        }
        //   }
//        catch (InterruptedException e) {
//          if (LOG.isInfoEnabled()) {
//            LOG.info("Thread (" + Thread.currentThread().getName() +
//                     ") was interrupted while polling queue", e);
//            // We must give up
//          }
        //     continue;
        //   }

        if (o instanceof Shutdown)
        { // shutdown command?
          try
          {
            SELECTOR.close();
          }
          catch (IOException e)
          {
            if (LOG.isInfoEnabled())
            {
              LOG.info("Read selector close operation failed", e);
            }
          }
          return; // stop reading
        }
        Connection conn = (Connection) o; // must be a new connection
        SocketChannel sc = conn.getSocketChannel();
        try
        {
          sc.register(SELECTOR, SelectionKey.OP_READ, conn);
        }
        catch (ClosedChannelException e)
        {
          if (LOG.isInfoEnabled())
          {
            LOG.info(
                "Socket channel was closed while we were trying to register it to selector",
                e);
            // Channel becomes bad. The connection must be bad,
            // close socket, then remove it from table!
          }
          conn.destroy();
          conn.closed();
        }
      } // end of the while true loop
    }

    private void readOnce(Connection conn)
        throws IOException
    {
      ConnectionReadState readState = conn.getReadState();
      if (!readState.isHeadFinished())
      { // a brand new message coming or header is not completed
        // Begin or continue to read header
        int size = readHeader(conn);
        if (0 == size)
        { // header is not completed
          return;
        }
      }
      // Begin or continue to read body
      if (readBody(conn) > 0)
      { // not finish yet
        return;
      }
      IpAddress src_addr = conn.getPeerAddress();
      ByteBuffer body_buf = readState.getReadBodyBuffer();
      ByteBuffer header_buf = readState.getReadHeadBuffer();
      header_buf.position(conn.HEADER_SIZE);
      header_buf.flip();
      int len = header_buf.capacity() + body_buf.capacity();
      byte[] msg_buf = new byte[len];
      header_buf.get(msg_buf, 0, conn.HEADER_SIZE);
      body_buf.get(msg_buf, conn.HEADER_SIZE, body_buf.capacity());
      Message msg = new Message(connectTable.local_addr, src_addr, msg_buf);
      Event evt = new Event(Event.UP_REQUEST, msg);
      connectTable.up(evt);
      // Clear status
      readState.bodyFinished();
      // Assign worker thread to execute call back

    }

    private int readHeader(Connection conn)
        throws IOException
    {
      ConnectionReadState readState = conn.getReadState();
      ByteBuffer headBuf = readState.getReadHeadBuffer();

      SocketChannel sc = conn.getSocketChannel();
      while (headBuf.remaining() > 0)
      {
        int num = sc.read(headBuf);
        if ( -1 == num)
        { // EOS
          throw new IOException("Peer closed socket");
        }
        if (0 == num)
        { // no more data
          return 0;
        }
      }
      // OK, now we get the whole header, change the status and return message size
      return readState.headFinished();
    }

    private int readBody(Connection conn)
        throws IOException
    {
      ByteBuffer bodyBuf = conn.getReadState().getReadBodyBuffer();

      SocketChannel sc = conn.getSocketChannel();
      while (bodyBuf.remaining() > 0)
      {
        int num = sc.read(bodyBuf);
        if ( -1 == num)
        { // EOS
          throw new IOException(
              "Couldn't read from socket as peer closed the socket");
        }
        if (0 == num)
        { // no more data
          return bodyBuf.remaining();
        }
      }
      // OK, we finished reading the whole message! Flip it (not necessary though)
      bodyBuf.flip();
      return 0;
    }
  }

  private class ExecuteTask
      implements Runnable
  {
    IpAddress m_addr = null;
    ByteBuffer m_buf = null;

    public ExecuteTask(IpAddress addr, ByteBuffer buf)
    {
      m_addr = addr;
      m_buf = buf;
    }

    public void run()
    {
      // receive(m_addr, m_buf.array(), m_buf.arrayOffset(), m_buf.limit());
    }
  }

  private class ConnectionReadState
  {
    private final Connection m_conn;

    // Status for receiving message
    private boolean m_headFinished = false;
    private ByteBuffer m_readBodyBuf = null;
    private final ByteBuffer m_readHeadBuf = ByteBuffer.allocate(Connection.
        HEADER_SIZE);

    public ConnectionReadState(Connection conn)
    {
      m_conn = conn;
    }

    ByteBuffer getReadBodyBuffer()
    {
      return m_readBodyBuf;
    }

    ByteBuffer getReadHeadBuffer()
    {
      return m_readHeadBuf;
    }

    void bodyFinished()
    {
      m_headFinished = false;
      m_readHeadBuf.clear();
      m_readBodyBuf = null;
      m_conn.updateLastAccessed();
    }

    /**
     * Status change for finishing reading the message header (data already in buffer)
     *
     * @return message size
     */
    int headFinished()
    {
      m_headFinished = true;
      m_readHeadBuf.flip();
      int messageSize = m_readHeadBuf.getInt();
      m_readBodyBuf = ByteBuffer.allocate(messageSize - Connection.HEADER_SIZE);
      m_conn.updateLastAccessed();
      return messageSize;

    }

    boolean isHeadFinished()
    {
      return m_headFinished;
    }
  }

  class Connection
  {
    Socket sock = null; // socket to/from peer (result of srv_sock.accept() or new Socket())
    String sock_addr = null; // used for Thread.getName()
    DataOutputStream out = null; // for sending messages
    DataInputStream in = null; // for receiving messages
    Thread receiverThread = null; // thread for receiving messages
    IpAddress peer_addr = null; // address of the 'other end' of the connection
    final Object send_mutex = new Object(); // serialize sends
    long last_access = System.currentTimeMillis(); // last time a message was sent or received

    private SocketChannel sock_ch = null;
    private WriteHandler m_writeHandler;
    private SelectorWriteHandler m_selectorWriteHandler;
    private final ConnectionReadState m_readState;

    private static final int HEADER_SIZE = 12;
    final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);

    Connection(SocketChannel s, IpAddress peer_addr)
    {

      sock_ch = s;
      this.sock = s.socket();
      this.peer_addr = peer_addr;
      m_readState = new ConnectionReadState(this);
    }

    void updateLastAccessed()
    {
      last_access = System.currentTimeMillis();
    }

    private ConnectionReadState getReadState()
    {
      return m_readState;
    }

    private void setupWriteHandler(WriteHandler hdlr)
    {
      m_writeHandler = hdlr;
      m_selectorWriteHandler = hdlr.add(sock_ch);
    }

    void destroy()
    {
      closeSocket();
    }

    void doSend(byte[] buffie, int offset, int length)
        throws Exception
    {
      FutureResult result = new FutureResult();
      m_writeHandler.write(sock_ch, ByteBuffer.wrap(buffie, offset, length),
                           result, m_selectorWriteHandler);
      Exception ex = result.getException();
      if (ex != null)
      {
        if (LOG.isErrorEnabled())
        {
          LOG.error("failed sending message", ex);
        }
        if (ex.getCause() instanceof IOException)
        {
          throw (IOException) ex.getCause();
        }
        throw ex;
      }
      result.get();
    }

    SocketChannel getSocketChannel()
    {
      return sock_ch;
    }

    void closeSocket()
    {

      if (sock_ch != null)
      {
        try
        {
          if (sock_ch.isConnected() && sock_ch.isOpen())
          {
            sock_ch.close();
          }
        }
        catch (Exception e)
        {
          log.error("error closing socket connection", e);
        }
        sock_ch = null;
      }
    }

    IpAddress getPeerAddress()
    {
      return peer_addr;
    }

    void closed()
    {
      IpAddress peerAddr = getPeerAddress();
      synchronized (conns)
      {
        conns.remove(peerAddr);
      }
      // notifyConnectionClosed(peerAddr);
    }
  }

  /**
   * Handle writing to non-blocking NIO connection.
   */
  private static class WriteHandler
      implements Runnable
  {
    // Create a queue for write requests
    private final LinkedQueue QUEUE = new LinkedQueue();

    private final Selector SELECTOR = initSelector();
    private int m_pendingChannels; // count of the number of channels that have pending writes

    // allocate and reuse the header for all buffer write operations
    private ByteBuffer m_headerBuffer = ByteBuffer.allocate(Connection.
        HEADER_SIZE);

    Selector initSelector()
    {
      try
      {
        return SelectorProvider.provider().openSelector();
      }
      catch (IOException e)
      {
        if (LOG.isErrorEnabled())
        {
          LOG.error(e);
        }
        throw new IllegalStateException(e.getMessage());
      }
    }

    /**

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -