⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 defaulttcptransportmapping.java

📁 snmp4j
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            entry.getSocket().close();
          }
          logger.info("Socket to "+entry.getPeerAddress()+
                      " closed due to timeout");
        }
        catch (IOException ex) {
          logger.error(ex);
        }
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Scheduling " +
                       ((entry.getLastUse() + connectionTimeout) - now));
        }
        socketCleaner.schedule(new  SocketTimeout(entry),
                               (entry.getLastUse() + connectionTimeout) - now);
      }
    }
  }

  class ServerThread extends Thread {

    private static final int MINIMUM_HEADER_LENGTH = 6;

    private byte[] buf;
    private volatile boolean stop = false;
    private Throwable lastError = null;
    private ServerSocketChannel ssc;
    private Selector selector;

    private LinkedList pending = new LinkedList();

    public ServerThread() throws IOException {
      buf = new byte[getMaxInboundMessageSize()];
      // Selector for incoming requests
      selector = Selector.open();

      if (serverEnabled) {
        // Create a new server socket and set to non blocking mode
        ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // Bind the server socket
        InetSocketAddress isa = new InetSocketAddress(tcpAddress.getInetAddress(),
            tcpAddress.getPort());
        ssc.socket().bind(isa);
        // Register accepts on the server socket with the selector. This
        // step tells the selector that the socket wants to be put on the
        // ready list when accept operations occur, so allowing multiplexed
        // non-blocking I/O to take place.
        ssc.register(selector, SelectionKey.OP_ACCEPT);
      }
    }

    private void processPending() {
      synchronized (pending) {
        while (pending.size() > 0) {
          SocketEntry entry = (SocketEntry)pending.removeFirst();
          try {
            // Register the channel with the selector, indicating
            // interest in connection completion and attaching the
            // target object so that we can get the target back
            // after the key is added to the selector's
            // selected-key set
            if (entry.getSocket().isConnected()) {
              entry.getSocket().getChannel().register(selector,
                  SelectionKey.OP_WRITE,
                  entry);
            }
            else {
              entry.getSocket().getChannel().register(selector,
                                                      SelectionKey.OP_CONNECT,
                                                      entry);
            }

          }
          catch (IOException iox) {
            logger.error(iox);
            // Something went wrong, so close the channel and
            // record the failure
            try {
              entry.getSocket().getChannel().close();
            }
            catch (IOException ex) {
              logger.error(ex);
            }
            lastError = iox;
          }
        }
      }
    }

    public Throwable getLastError() {
      return lastError;
    }

    public void sendMessage(Address address, byte[] message)
        throws java.io.IOException
    {
      Socket s = null;
      SocketEntry entry = (SocketEntry) sockets.get(address);
      if (entry != null) {
        s = entry.getSocket();
      }
      if ((s == null) || (s.isClosed())) {
        SocketChannel sc = null;
        try {
            // Open the channel, set it to non-blocking, initiate connect
            sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.connect(new InetSocketAddress(((TcpAddress)address).getInetAddress(),
                                             ((TcpAddress)address).getPort()));
            s = sc.socket();
            entry = new SocketEntry((TcpAddress)address, s);
            entry.addMessage(message);
            sockets.put(address, entry);

            synchronized (pending) {
              pending.add(entry);
            }

            selector.wakeup();
            logger.debug("Trying to connect to "+address);
        }
        catch (IOException iox) {
          logger.error(iox);
          throw iox;
        }
      }
      else {
        entry.addMessage(message);
        synchronized (pending) {
          pending.add(entry);
        }
        selector.wakeup();
      }
    }


    public void run() {
      // Here's where everything happens. The select method will
      // return when any operations registered above have occurred, the
      // thread has been interrupted, etc.
      try {
        while (!stop) {
          if (selector.select() > 0) {
            if (stop) {
              break;
            }
            // Someone is ready for I/O, get the ready keys
            Set readyKeys = selector.selectedKeys();
            Iterator it = readyKeys.iterator();

            // Walk through the ready keys collection and process date requests.
            while (it.hasNext()) {
              SelectionKey sk = (SelectionKey) it.next();
              it.remove();
              SocketChannel readChannel = null;
              TcpAddress incomingAddress = null;
              if (sk.isAcceptable()) {
                // The key indexes into the selector so you
                // can retrieve the socket that's ready for I/O
                ServerSocketChannel nextReady =
                    (ServerSocketChannel) sk.channel();
                // Accept the date request and send back the date string
                Socket s = nextReady.accept().socket();
                readChannel = s.getChannel();
                readChannel.register(selector, SelectionKey.OP_READ);

                incomingAddress = new TcpAddress(s.getInetAddress(), s.getPort());
                SocketEntry entry = new SocketEntry(incomingAddress, s);
                sockets.put(incomingAddress, entry);
                timeoutSocket(entry);
              }
              else if (sk.isReadable()) {
                readChannel = (SocketChannel) sk.channel();
                incomingAddress =
                    new TcpAddress(readChannel.socket().getInetAddress(),
                                   readChannel.socket().getPort());
              }
              else if (sk.isConnectable()) {
                try {
                  SocketEntry entry = (SocketEntry) sk.attachment();
                  SocketChannel sc = (SocketChannel) sk.channel();
                  if (sc.finishConnect()) {
                    logger.debug("Connected to " + entry.getPeerAddress());
                    // make sure conncetion is closed if not used for timeout
                    // micro seconds
                    timeoutSocket(entry);
                    sc.register(selector, SelectionKey.OP_WRITE, entry);
                  }
                }
                catch (IOException iox) {
                  logger.warn(iox);
                }
              }
              else if (sk.isWritable()) {
                try {
                  SocketEntry entry = (SocketEntry) sk.attachment();
                  SocketChannel sc = (SocketChannel) sk.channel();
                  if (entry != null) {
                    writeMessage(entry, sc);
                  }
                }
                catch (IOException iox) {
                  logger.warn(iox);
                }
              }

              if (readChannel != null) {
                try {
                  readMessage(sk, readChannel, incomingAddress);
                }
                catch (IOException iox) {
                  logger.warn(iox);
                }
              }
            }
          }
          processPending();
        }
        if (ssc != null) {
          ssc.close();
        }
      }
      catch (IOException iox) {
        logger.error(iox);
        lastError = iox;
      }
    }

    private void readMessage(SelectionKey sk, SocketChannel readChannel,
                             TcpAddress incomingAddress) throws IOException {
      // note that socket has been used
      SocketEntry entry = (SocketEntry) sockets.get(incomingAddress);
      if (entry != null) {
        entry.used();
        ByteBuffer readBuffer = entry.getReadBuffer();
        if (readBuffer != null) {
          readChannel.read(readBuffer);
          if (readBuffer.hasRemaining()) {
            readChannel.register(selector, SelectionKey.OP_READ, entry);
          }
          else {
            dispatchMessage(incomingAddress, readBuffer, readBuffer.capacity());
          }
          return;
        }
      }
      ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
      byteBuffer.limit(MINIMUM_HEADER_LENGTH);
      long bytesRead = readChannel.read(byteBuffer);
      if (logger.isDebugEnabled()) {
        logger.debug("Reading header "+bytesRead+" bytes from " +
                     incomingAddress);
      }
      int messageLength = Integer.MIN_VALUE;
      int headerLength = 0;
      if (bytesRead == MINIMUM_HEADER_LENGTH) {
        MutableByte type = new MutableByte();
        BERInputStream is = new BERInputStream(ByteBuffer.wrap(buf));
        messageLength = BER.decodeHeader(is, type);
        if ((messageLength > getMaxInboundMessageSize()) || (messageLength <= 0)) {
          logger.error("Received message length "+messageLength+
                       " is greater than inboundBufferSize "+
                       getMaxInboundMessageSize());
          synchronized(entry) {
            entry.getSocket().close();
            logger.info("Socket to "+entry.getPeerAddress()+
                        " closed due to an error");
          }
        }
        else {
          headerLength = (int) is.getPosition();
          byteBuffer.limit(messageLength + headerLength);
          bytesRead += readChannel.read(byteBuffer);
          if (bytesRead == messageLength + headerLength) {
            dispatchMessage(incomingAddress, byteBuffer, bytesRead);
          }
          else {
            byte[] message = new byte[byteBuffer.limit()];
            byteBuffer.flip();
            byteBuffer.get(message, 0,
                           byteBuffer.limit() - byteBuffer.remaining());
            entry.setReadBuffer(ByteBuffer.wrap(message));
          }
          readChannel.register(selector, SelectionKey.OP_READ, entry);
        }
      }
      else if (bytesRead < 0) {
        logger.debug("Socket closed remotely");
        sk.cancel();
        readChannel.close();
      }
    }

    private void dispatchMessage(TcpAddress incomingAddress,
                                 ByteBuffer byteBuffer, long bytesRead) {
      byteBuffer.flip();
      if (logger.isDebugEnabled()) {
        logger.debug("Received message from " + incomingAddress +
                     " with length " + bytesRead + ": " +
                     new OctetString(byteBuffer.array(), 0,
                                     (int)bytesRead).toHexString());
      }
      for (int i = 0; i < messageDispatcher.size(); i++) {
        MessageDispatcher dispatcher;
        synchronized (DefaultTcpTransportMapping.this) {
          dispatcher = (MessageDispatcher) messageDispatcher.get(i);
        }
        ByteBuffer bis;
        if (isAsyncMsgProcessingSupported()) {
          byte[] bytes = new byte[(int)bytesRead];
          System.arraycopy(byteBuffer.array(), 0, bytes, 0, (int)bytesRead);
          bis = ByteBuffer.wrap(bytes);
        }
        else {
          bis = ByteBuffer.wrap(byteBuffer.array(),
                                0, (int) bytesRead);
        }
        dispatcher.processMessage(DefaultTcpTransportMapping.this,
                                  incomingAddress,
                                  new BERInputStream(bis));
      }
    }

    private void writeMessage(SocketEntry entry, SocketChannel sc) throws
        IOException {
      byte[] message = entry.nextMessage();
      ByteBuffer buffer = ByteBuffer.wrap(message);
      sc.write(buffer);
      if (logger.isDebugEnabled()) {
        logger.debug("Send message with length " +
                     message.length + " to " +
                     entry.getPeerAddress() + ": " +
                     new OctetString(message).toHexString());
      }
      sc.register(selector, SelectionKey.OP_READ);
    }

    public void close() {
      stop = true;
      server.interrupt();
    }
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -