⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectiontablenio.java

📁 用java实现的一个socket服务器。采用非阻塞模式
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
package com.huawei.comm.smap;

import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import sun.rmi.runtime.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Vector;
import java.util.HashMap;
import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.util.Map;

public class ConnectionTableNIO
    implements Runnable
{
  final HashMap conns = new HashMap(); // keys: Addresses (peer address), values: Connection

  //Receiver            receiver=null;
  boolean use_send_queues = true;
  InetAddress bind_addr = null;
  IpAddress local_addr = null; // bind_addr + port of srv_sock
  int srv_port = 7800;
  int recv_buf_size = 120000;
  int send_buf_size = 60000;
  final Vector conn_listeners = new Vector(); // listeners to be notified when a conn is established/torn down
  final Object recv_mutex = new Object(); // to serialize simultaneous access to receive() from multiple Connections

  Reaper reaper = null; // closes conns that have been idle for more than n secs
  long reaper_interval = 60000; // reap unused conns once a minute
  long conn_expire_time = 300000; // connections can be idle for 5 minutes before they are reaped
  int sock_conn_timeout = 1000; // max time in millis to wait for Socket.connect() to return
  ThreadGroup thread_group = null;
  protected final Log log = LogFactory.getLog(getClass());

  boolean use_reaper = false; // by default we don't reap idle conns
  static final int backlog = 20; // 20 conn requests are queued by ServerSocket (addtl will be discarded)
  ServerSocket srv_sock = null;
  boolean reuse_addr = false;

  InetAddress external_addr = null;
  int max_port = 0; // maximum port to bind to (if < srv_port, no limit)
  Thread acceptor = null;

  private ServerSocketChannel m_serverSocketChannel;
  private Selector m_acceptSelector;
  protected final static Log LOG = LogFactory.getLog(ConnectionTableNIO.class);

  private WriteHandler[] m_writeHandlers;
  private int m_nextWriteHandler = 0;
  private final Object m_lockNextWriteHandler = new Object();

  private ReadHandler[] m_readHandlers;
  private int m_nextReadHandler = 0;
  private final Object m_lockNextReadHandler = new Object();

  // int max_port = 0; // maximum port to bind to (if < srv_port, no limit)
  //InetAddress external_addr = null;
  private Protocol prot;
  private volatile boolean serverStopping = false;

  /**
   * @param srv_port
   * @throws Exception
   */
  public ConnectionTableNIO(Protocol prot, int srv_port)
      throws Exception
  {
    System.out.println("------------------------");
    this.srv_port = srv_port;
    this.prot = prot;

  }

  public void up(Event evt)
  {
    prot.up(evt);
  }

  public void down(Event evt)
  {
    // prot.down(evt);
  }

  /**
   * @param srv_port
   * @param reaper_interval
   * @param conn_expire_time
   * @throws Exception
   */
  public ConnectionTableNIO(int srv_port, long reaper_interval,
                            long conn_expire_time)
      throws Exception
  {
    this.srv_port = srv_port;
    this.reaper_interval = reaper_interval;
    this.conn_expire_time = conn_expire_time;

  }

  /**
   * Try to obtain correct Connection (or create one if not yet existent)
   */
  Connection getConnection(IpAddress dest)
      throws Exception
  {
    Connection conn;
    SocketChannel sock_ch;

    synchronized (conns)
    {
      conn = (Connection) conns.get(dest);
      if (conn == null)
      {
        InetSocketAddress destAddress = new InetSocketAddress( ( (IpAddress)
            dest).getIpAddress(),
            ( (IpAddress) dest).getPort());
        sock_ch = SocketChannel.open(destAddress);
        conn = new Connection(sock_ch, dest);

        //   conn.sendLocalAddress(local_addr);
        // This outbound connection is ready

        sock_ch.configureBlocking(false);

        try
        {
          if (LOG.isTraceEnabled())
          {
            LOG.trace("About to change new connection send buff size from " +
                      sock_ch.socket().getSendBufferSize() + " bytes");
          }
          sock_ch.socket().setSendBufferSize(send_buf_size);
          if (LOG.isTraceEnabled())
          {
            LOG.trace("Changed new connection send buff size to " +
                      sock_ch.socket().getSendBufferSize() + " bytes");
          }
        }
        catch (IllegalArgumentException ex)
        {
          if (log.isErrorEnabled())
          {
            log.error("exception setting send buffer size to " +
                      send_buf_size + " bytes: " + ex);
          }
        }
        try
        {
          if (LOG.isTraceEnabled())
          {
            LOG.trace("About to change new connection receive buff size from " +
                      sock_ch.socket().getReceiveBufferSize() + " bytes");
          }
          sock_ch.socket().setReceiveBufferSize(recv_buf_size);
          if (LOG.isTraceEnabled())
          {
            LOG.trace("Changed new connection receive buff size to " +
                      sock_ch.socket().getReceiveBufferSize() + " bytes");
          }
        }
        catch (IllegalArgumentException ex)
        {
          if (log.isErrorEnabled())
          {
            log.error("exception setting receive buffer size to " +
                      send_buf_size + " bytes: " + ex);
          }
        }

        int idx;
        synchronized (m_lockNextWriteHandler)
        {
          idx = m_nextWriteHandler = (m_nextWriteHandler + 1) %
              m_writeHandlers.length;
        }
        conn.setupWriteHandler(m_writeHandlers[idx]);

        // Put the new connection to the queue
        try
        {
          synchronized (m_lockNextReadHandler)
          {
            idx = m_nextReadHandler = (m_nextReadHandler + 1) %
                m_readHandlers.length;
          }
          m_readHandlers[idx].add(conn);

        }
        catch (InterruptedException e)
        {
          if (LOG.isWarnEnabled())
          {
            LOG.warn("Thread (" + Thread.currentThread().getName() +
                     ") was interrupted, closing connection", e);
            // What can we do? Remove it from table then.
          }
          conn.destroy();
          throw e;
        }

        // Add connection to table
        addConnection(dest, conn);

        // notifyConnectionOpened(dest);
        if (LOG.isInfoEnabled())
        {
          LOG.info("created socket to " + dest);
        }
      }
      return conn;
    }
  }

  void addConnection(IpAddress peer, Connection c)
  {
    conns.put(peer, c);
    if (reaper != null && !reaper.isRunning())
    {
      reaper.start();
    }
  }

  public final void start()
      throws Exception
  {
    init();
    srv_sock = createServerSocket(srv_port, max_port);

    if (external_addr != null)
    {
      local_addr = new IpAddress(external_addr, srv_sock.getLocalPort());
    }
    else if (bind_addr != null)
    {
      local_addr = new IpAddress(bind_addr, srv_sock.getLocalPort());
    }
    else
    {
      local_addr = new IpAddress(srv_sock.getLocalPort());

    }
    if (log.isInfoEnabled())
    {
      log.info("server socket created on " + local_addr);

    }
    thread_group = new ThreadGroup(new ThreadGroup("JGroups threads"),
                                   "ConnectionTableGroup");

    acceptor = new Thread(thread_group, this, "ConnectionTable.AcceptorThread");
    acceptor.setDaemon(true);
    acceptor.start();
    use_reaper = true;
    // start the connection reaper - will periodically remove unused connections
//    if (use_reaper && reaper == null)
//    {
//      reaper = new Reaper();
//      reaper.start();
//
//    }
  }

  protected void init()
      throws Exception
  {

    m_writeHandlers = WriteHandler.create(3);
    m_readHandlers = ReadHandler.create(3, this);
  }

  /**
   * Closes all open sockets, the server socket and all threads waiting for incoming messages
   */
  public void stop()
  {

    serverStopping = true;
    // Stop the main selector
    m_acceptSelector.wakeup();

    // Stop selector threads
    for (int i = 0; i < m_readHandlers.length; i++)
    {
      try
      {
        m_readHandlers[i].add(new Shutdown());
      }
      catch (InterruptedException e)
      {
        LOG.error("Thread (" + Thread.currentThread().getName() +
                  ") was interrupted, failed to shutdown selector", e);
      }
    }
    for (int i = 0; i < m_writeHandlers.length; i++)
    {

      //  m_writeHandlers[i].QUEUE.put(new Shutdown());
      m_writeHandlers[i].SELECTOR.wakeup();

    }

    // then close the connections
    synchronized (conns)
    {
      Iterator it = conns.values().iterator();
      while (it.hasNext())
      {
        Connection conn = (Connection) it.next();
        conn.destroy();
      }
      conns.clear();
    }

  }

  /**
   * Acceptor thread. Continuously accept new connections and assign readhandler/writehandler
   * to them.
   */
  public void run()
  {
    Connection conn;

    while (m_serverSocketChannel.isOpen() && !serverStopping)
    {
      int num;
      try
      {
        num = m_acceptSelector.select();
        LOG.warn("Select operation :" + num);

      }
      catch (IOException e)
      {
        if (LOG.isWarnEnabled())
        {
          LOG.warn("Select operation on listening socket failed", e);
        }
        continue; // Give up this time
      }

      if (num > 0)
      {
        Set readyKeys = m_acceptSelector.selectedKeys();
        for (Iterator i = readyKeys.iterator(); i.hasNext(); )
        {
          SelectionKey key = (SelectionKey) i.next();
          i.remove();
          // We only deal with new incoming connections

          ServerSocketChannel readyChannel = (ServerSocketChannel) key.channel();
          SocketChannel client_sock_ch;
          try
          {
            client_sock_ch = readyChannel.accept();
            client_sock_ch.configureBlocking(false);
          }
          catch (IOException e)
          {
            if (LOG.isWarnEnabled())
            {
              LOG.warn(
                  "Attempt to accept new connection from listening socket failed",
                  e);
              // Give up this connection
            }
            continue;
          }

          if (LOG.isInfoEnabled())
          {
            LOG.info("accepted connection, client_sock=" +
                     client_sock_ch.socket());

          }
          try
          {

            if (LOG.isTraceEnabled())
            {
              LOG.trace("About to change new connection send buff size from " +
                        client_sock_ch.socket().getSendBufferSize() + " bytes");
            }
            client_sock_ch.socket().setSendBufferSize(send_buf_size);
            if (LOG.isTraceEnabled())
            {
              LOG.trace("Changed new connection send buff size to " +
                        client_sock_ch.socket().getSendBufferSize() + " bytes");
            }
          }
          catch (IllegalArgumentException ex)
          {
            if (log.isErrorEnabled())
            {
              log.error("exception setting send buffer size to " +
                        send_buf_size + " bytes: ", ex);
            }
          }
          catch (SocketException e)
          {
            if (log.isErrorEnabled())
            {
              log.error("exception setting send buffer size to " +
                        send_buf_size + " bytes: ", e);
            }
          }

          try
          {
            if (LOG.isTraceEnabled())
            {
              LOG.trace(
                  "About to change new connection receive buff size from " +
                  client_sock_ch.socket().getReceiveBufferSize() + " bytes");
            }
            client_sock_ch.socket().setReceiveBufferSize(recv_buf_size);
            if (LOG.isTraceEnabled())
            {
              LOG.trace("Changed new connection receive buff size to " +
                        client_sock_ch.socket().getReceiveBufferSize() +
                        " bytes");
            }
          }
          catch (IllegalArgumentException ex)
          {
            if (log.isErrorEnabled())
            {
              log.error("exception setting receive buffer size to " +
                        send_buf_size + " bytes: ", ex);
            }
          }
          catch (SocketException e)
          {
            if (log.isErrorEnabled())
            {
              log.error("exception setting receive buffer size to " +
                        recv_buf_size + " bytes: ", e);
            }
          }

          conn = new Connection(client_sock_ch, null);
          addConnection(conn.getPeerAddress(), conn);

          int idx;
          synchronized (m_lockNextWriteHandler)
          {
            idx = m_nextWriteHandler = (m_nextWriteHandler + 1) %
                m_writeHandlers.length;
          }
          conn.setupWriteHandler(m_writeHandlers[idx]);

          try
          {
            synchronized (m_lockNextReadHandler)
            {
              idx = m_nextReadHandler = (m_nextReadHandler + 1) %
                  m_readHandlers.length;
            }
            m_readHandlers[idx].add(conn);

          }
          catch (InterruptedException e)
          {
            if (LOG.isWarnEnabled())
            {
              LOG.warn(
                  "Attempt to configure read handler for accepted connection failed",
                  e);
              // close connection
            }
            conn.destroy();
          }
        } // end of iteration
      } // end of selected key > 0
    } // end of thread

    if (m_serverSocketChannel.isOpen())
    {
      try
      {
        m_serverSocketChannel.close();
      }
      catch (Exception e)
      {
        log.error("exception closing server listening socket", e);
      }
    }
    if (LOG.isTraceEnabled())
    {
      LOG.trace("acceptor thread terminated");

    }
  }

  /**
   * Finds first available port starting at start_port and returns server socket. Sets srv_port
   */
  protected ServerSocket createServerSocket(int start_port, int end_port)
      throws
      Exception
  {
    this.m_acceptSelector = Selector.open();
    m_serverSocketChannel = ServerSocketChannel.open();
    m_serverSocketChannel.configureBlocking(false);
    while (true)
    {
      try
      {
        SocketAddress sockAddr;
        if (bind_addr == null)
        {
          sockAddr = new InetSocketAddress(start_port);
          m_serverSocketChannel.socket().bind(sockAddr);
        }
        else
        {
          sockAddr = new InetSocketAddress(bind_addr, start_port);
          m_serverSocketChannel.socket().bind(sockAddr, backlog);
        }
      }
      catch (BindException bind_ex)
      {
        if (start_port == end_port)
        {
          throw (BindException) ( (new BindException(
              "No available port to bind to")).initCause(bind_ex));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -