⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectiontablenio.java

📁 用java实现的一个socket服务器。采用非阻塞模式
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
     * create instances of WriteHandler threads for sending data.
     *
     * @param workerThreads is the number of threads to create.
     */
    private static WriteHandler[] create(int workerThreads)
    {
      WriteHandler[] handlers = new WriteHandler[workerThreads];
      for (int looper = 0; looper < workerThreads; looper++)
      {
        handlers[looper] = new WriteHandler();

        Thread thread = new Thread(handlers[looper], "nioWriteHandlerThread");
        thread.setDaemon(true);
        thread.start();
      }
      return handlers;
    }

    /**
     * Add a new channel to be handled.
     *
     * @param channel
     */
    private SelectorWriteHandler add(SocketChannel channel)
    {
      return new SelectorWriteHandler(channel, SELECTOR, m_headerBuffer);
    }

    private void write(SocketChannel channel, ByteBuffer buffer,
                       FutureResult notification, SelectorWriteHandler hdlr)
        throws
        InterruptedException
    {
      QUEUE.insert(new WriteRequest(channel, buffer, notification, hdlr));
    }

    private void close(SelectorWriteHandler entry)
    {
      entry.cancel();
    }

    private void handleChannelError(SelectorWriteHandler entry, Throwable error)
    {

      do
      {
        if (error != null)
        {
          entry.notifyError(error);
        }
      }
      while (entry.next());
      close(entry);
    }

    // process the write operation
    private void processWrite(Selector selector)
    {
      Set keys = selector.selectedKeys();
      Object arr[] = keys.toArray();
      for (int looper = 0; looper < arr.length; looper++)
      {
        SelectionKey key = (SelectionKey) arr[looper];
        SelectorWriteHandler entry = (SelectorWriteHandler) key.attachment();
        boolean needToDecrementPendingChannels = false;
        try
        {
          if (0 == entry.write())
          { // write the buffer and if the remaining bytes is zero,
            // notify the caller of number of bytes written.
            entry.notifyObject(new Integer(entry.getBytesWritten()));
            // switch to next write buffer or clear interest bit on socket channel.
            if (!entry.next())
            {
              needToDecrementPendingChannels = true;
            }
          }

        }
        catch (IOException e)
        {
          needToDecrementPendingChannels = true;
          // connection must of closed
          handleChannelError(entry, e);
        }
        finally
        {
          if (needToDecrementPendingChannels)
          {
            m_pendingChannels--;
          }
        }
      }
      keys.clear();
    }

    public void run()
    {
      while (SELECTOR.isOpen())
      {
        //   try {
        WriteRequest queueEntry;
        Object o;

        // When there are no more commands in the Queue, we will hit the blocking code after this loop.
        try
        {
          while (null != (o = QUEUE.poll(0)))
          {
            if (o instanceof Shutdown)
            { // Stop the thread
              try
              {
                SELECTOR.close();
              }
              catch (IOException e)
              {
                if (LOG.isInfoEnabled())
                {
                  LOG.info("Write selector close operation failed", e);
                }
              }
              return;
            }
            queueEntry = (WriteRequest) o;

            if (queueEntry.getHandler().add(queueEntry))
            {

              m_pendingChannels++;
            }

            try
            {
              // process any connections ready to be written to.
              if (SELECTOR.selectNow() > 0)
              {
                processWrite(SELECTOR);
              }
            }
            catch (IOException e)
            { // need to understand what causes this error so we can handle it properly
              if (LOG.isErrorEnabled())
              {
                LOG.error("SelectNow operation on write selector failed, didn't expect this to occur, please report this",
                          e);
              }
              return; // if select fails, give up so we don't go into a busy loop.
            }
          }
        }
        catch (InterruptedException ex)
        {
        }

        // if there isn't any pending work to do, block on queue to get next request.
        if (m_pendingChannels == 0)
        {
          try
          {
            o = QUEUE.take();
          }
          catch (InterruptedException ex1)
          {
            o = null;
            ex1.printStackTrace();
            continue;
          }
          if (o instanceof Shutdown)
          { // Stop the thread
            try
            {
              SELECTOR.close();
            }
            catch (IOException e)
            {
              if (LOG.isInfoEnabled())
              {
                LOG.info("Write selector close operation failed", e);
              }
            }
            return;
          }
          queueEntry = (WriteRequest) o;
          if (queueEntry.getHandler().add(queueEntry))
          {
            m_pendingChannels++;
          }
        }
        // otherwise do a blocking wait select operation.
        else
        {
          try
          {
            if ( (SELECTOR.select()) > 0)
            {
              processWrite(SELECTOR);
            }
          }
          catch (IOException e)
          { // need to understand what causes this error
            if (LOG.isErrorEnabled())
            {
              LOG.error("Failure while writing to socket", e);
            }
          }
        }
        //  }
//        catch (InterruptedException e) {
//          if (LOG.isErrorEnabled()) {
//            LOG.error("Thread (" + Thread.currentThread().getName() +
//                      ") was interrupted", e);
//          }
//        }
//        catch (Throwable e) { // We are a daemon thread so we shouldn't prevent the process from terminating if
//          // the controlling thread decides that should happen.
//          if (LOG.isErrorEnabled()) {
//            LOG.error("Thread (" + Thread.currentThread().getName() +
//                      ") caught Throwable", e);
//          }
//        }
      }
    }
  }

  // Wrapper class for passing Write requests.  There will be an instance of this class for each socketChannel
  // mapped to a Selector.
  public static class SelectorWriteHandler
  {

    private final LinkedList m_writeRequests = new LinkedList(); // Collection of writeRequests
    private boolean m_headerSent = false;
    private SocketChannel m_channel;
    private SelectionKey m_key;
    private Selector m_selector;
    private int m_bytesWritten = 0;
    private boolean m_enabled = false;
    private ByteBuffer m_headerBuffer;

    SelectorWriteHandler(SocketChannel channel, Selector selector,
                         ByteBuffer headerBuffer)
    {
      m_channel = channel;
      m_selector = selector;
      m_headerBuffer = headerBuffer;
    }

    private void register(Selector selector, SocketChannel channel)
        throws
        ClosedChannelException
    {
      // register the channel but don't enable OP_WRITE until we have a write request.
      m_key = channel.register(selector, 0, this);
    }

    // return true if selection key is enabled when it wasn't previous to call.
    private boolean enable()
    {
      boolean rc = false;

      try
      {
        if (m_key == null)
        { // register the socket on first access,
          // we are the only thread using this variable, so no sync needed.
          register(m_selector, m_channel);
        }
      }
      catch (ClosedChannelException e)
      {
        return rc;
      }

      if (!m_enabled)
      {
        rc = true;
        try
        {
          m_key.interestOps(SelectionKey.OP_WRITE);
        }
        catch (CancelledKeyException e)
        { // channel must of closed
          return false;
        }
        m_enabled = true;
      }
      return rc;
    }

    private void disable()
    {
      if (m_enabled)
      {
        try
        {
          m_key.interestOps(0); // pass zero which means that we are not interested in being
          // notified of anything for this channel.
        }
        catch (CancelledKeyException eat)
        { // we probably don't need to throw this exception (if they try to write
          // again, we will then throw an exception).
        }
        m_enabled = false;
      }
    }

    private void cancel()
    {
      m_key.cancel();
    }

    boolean add(WriteRequest entry)
    {
      m_writeRequests.add(entry);
      return enable();
    }

    WriteRequest getCurrentRequest()
    {
      return (WriteRequest) m_writeRequests.getFirst();
    }

    SocketChannel getChannel()
    {
      return m_channel;
    }

    ByteBuffer getBuffer()
    {
      return getCurrentRequest().getBuffer();
    }

    FutureResult getCallback()
    {
      return getCurrentRequest().getCallback();
    }

    int getBytesWritten()
    {
      return m_bytesWritten;
    }

    void notifyError(Throwable error)
    {
      if (getCallback() != null)
      {
        getCallback().setException(error);
      }
    }

    void notifyObject(Object result)
    {
      if (getCallback() != null)
      {
        getCallback().set(result);
      }
    }

    boolean next()
    {
      m_headerSent = false;
      m_bytesWritten = 0;

      m_writeRequests.removeFirst(); // remove current entry
      boolean rc = !m_writeRequests.isEmpty();
      if (!rc)
      { // disable select for this channel if no more entries
        disable();
      }
      return rc;
    }

    int write()
        throws IOException
    {

      if (!m_headerSent)
      {
        m_headerSent = true;
        m_headerBuffer.clear();
        m_headerBuffer.putInt(getBuffer().remaining());
        m_headerBuffer.flip();
        do
        {
          getChannel().write(m_headerBuffer);
        } // we should be able to handle writing the header in one action but just in case, just do a busy loop
        while (m_headerBuffer.remaining() > 0);

      }

      m_bytesWritten += (getChannel().write(getBuffer()));

      return getBuffer().remaining();
    }

  }

  public static class WriteRequest
  {
    private final SocketChannel m_channel;
    private final ByteBuffer m_buffer;
    private final FutureResult m_callback;
    private final SelectorWriteHandler m_hdlr;

    WriteRequest(SocketChannel channel, ByteBuffer buffer,
                 FutureResult callback, SelectorWriteHandler hdlr)
    {
      m_channel = channel;
      m_buffer = buffer;
      m_callback = callback;
      m_hdlr = hdlr;
    }

    SelectorWriteHandler getHandler()
    {
      return m_hdlr;
    }

    SocketChannel getChannel()
    {
      return m_channel;
    }

    ByteBuffer getBuffer()
    {
      return m_buffer;
    }

    FutureResult getCallback()
    {
      return m_callback;
    }

  }

  class Reaper
      implements Runnable
  {
    Thread t = null;

    Reaper()
    {
      ;
    }

    public void start()
    {

      if (t != null && !t.isAlive())
      {
        t = null;
      }
      if (t == null)
      {
        //RKU 7.4.2003, put in threadgroup
        t = new Thread(thread_group, this, "ConnectionTable.ReaperThread");
        //  t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
        t.start();

      }
    }

    public void stop()
    {
      if (t != null)
      {
        t = null;
      }
    }

    public boolean isRunning()
    {
      return t != null;
    }

    public void run()
    {
      Connection value;
      Map.Entry entry;
      long curr_time;

      if (log.isInfoEnabled())
      {
        log.info("connection reaper thread was started. Number of connections=" +
                 conns.size() + ", reaper_interval=" + reaper_interval +
                 ", conn_expire_time=" +
                 conn_expire_time);

      }
      while (conns.size() > 0 && t != null && t.equals(Thread.currentThread()))
      {

        // Util.sleep(reaper_interval);
        synchronized (conns)
        {
          curr_time = System.currentTimeMillis();
          for (Iterator it = conns.entrySet().iterator(); it.hasNext(); )
          {
            entry = (Map.Entry) it.next();
            value = (Connection) entry.getValue();
            if (log.isInfoEnabled())
            {
//              log.info("connection is " +
//                       ( (curr_time - value.last_access) / 1000) +
//                       " seconds old (curr-time=" +
//                       curr_time + ", last_access=" + value.last_access + ')');
            }
            if (value.last_access + conn_expire_time < curr_time)
            {
              if (log.isInfoEnabled())
              {
                log.info("connection " + value +
                         " has been idle for too long (conn_expire_time=" +
                         conn_expire_time +
                         "), will be removed");
              }
              value.destroy();
              it.remove();
            }
          }
        }
      }
      if (log.isInfoEnabled())
      {
        log.info("reaper terminated");
      }
      t = null;
    }
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -