⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 defaulttcptransportmapping.java

📁 snmp4j 1.8.2版 The org.snmp4j classes are capable of creating, sending, and receiving SNMPv1/v2c/v3
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                break;
              }
              // Someone is ready for I/O, get the ready keys
              Set readyKeys = selector.selectedKeys();
              Iterator it = readyKeys.iterator();

              // Walk through the ready keys collection and process date requests.
              while (it.hasNext()) {
                SelectionKey sk = (SelectionKey) it.next();
                it.remove();
                SocketChannel readChannel = null;
                TcpAddress incomingAddress = null;
                if (sk.isAcceptable()) {
                  // The key indexes into the selector so you
                  // can retrieve the socket that's ready for I/O
                  ServerSocketChannel nextReady =
                      (ServerSocketChannel) sk.channel();
                  Socket s = nextReady.accept().socket();
                  readChannel = s.getChannel();
                  readChannel.configureBlocking(false);
                  readChannel.register(selector,
                                       SelectionKey.OP_READ);

                  incomingAddress = new TcpAddress(s.getInetAddress(),
                      s.getPort());
                  SocketEntry entry = new SocketEntry(incomingAddress, s);
                  sockets.put(incomingAddress, entry);
                  timeoutSocket(entry);
                  TransportStateEvent e =
                      new TransportStateEvent(DefaultTcpTransportMapping.this,
                                              incomingAddress,
                                              TransportStateEvent.
                                              STATE_CONNECTED,
                                              null);
                  fireConnectionStateChanged(e);
                  if (e.isCancelled()) {
                    logger.warn("Incoming connection cancelled");
                    s.close();
                    sockets.remove(incomingAddress);
                    readChannel = null;
                  }
                }
                else if (sk.isReadable()) {
                  readChannel = (SocketChannel) sk.channel();
                  incomingAddress =
                      new TcpAddress(readChannel.socket().getInetAddress(),
                                     readChannel.socket().getPort());
                }
                else if (sk.isWritable()) {
                  try {
                    SocketChannel sc = (SocketChannel) sk.channel();
                    SocketEntry entry;
                    synchronized (pending) {
                      try {
                        entry = (SocketEntry) pending.removeFirst();
                      }
                      catch (NoSuchElementException nsex) {
                        // ignore
                        entry = null;
                      }
                    }
                    if (entry != null) {
                      writeMessage(entry, sc);
                    }
                  }
                  catch (IOException iox) {
                    if (logger.isDebugEnabled()) {
                      iox.printStackTrace();
                    }
                    logger.warn(iox);
                    TransportStateEvent e =
                        new TransportStateEvent(DefaultTcpTransportMapping.this,
                                                incomingAddress,
                                                TransportStateEvent.
                                                STATE_DISCONNECTED_REMOTELY,
                                                iox);
                    fireConnectionStateChanged(e);
                    sk.cancel();
                  }
                }
                else if (sk.isConnectable()) {
                  try {
                    SocketEntry entry;
                    synchronized (pending) {
                      try {
                        entry = (SocketEntry) pending.getFirst();
                        if (entry != null) {
                          SocketChannel sc = (SocketChannel) sk.channel();
                          if ((!sc.isConnected()) && (sc.finishConnect())) {
                            sc.configureBlocking(false);
                            logger.debug("Connected to " + entry.getPeerAddress());
                            // make sure conncetion is closed if not used for timeout
                            // micro seconds
                            timeoutSocket(entry);
                            sc.register(selector,
                                        SelectionKey.OP_WRITE);
                          }
                        }
                      }
                      catch (NoSuchElementException nsex) {
                        // ignore
                        entry = null;
                      }
                    }
                    if (entry != null) {
                      TransportStateEvent e =
                          new TransportStateEvent(DefaultTcpTransportMapping.this,
                                                  incomingAddress,
                                                  TransportStateEvent.
                                                  STATE_CONNECTED,
                                                  null);
                      fireConnectionStateChanged(e);
                    }
                    else {
                      logger.warn("Message not found on finish connection");
                    }
                  }
                  catch (IOException iox) {
                    if (logger.isDebugEnabled()) {
                      iox.printStackTrace();
                    }
                    logger.warn(iox);
                    sk.cancel();
                  }
                }

                if (readChannel != null) {
                  try {
                    readMessage(sk, readChannel, incomingAddress);
                  }
                  catch (IOException iox) {
                    // IO exception -> channel closed remotely
                    if (logger.isDebugEnabled()) {
                      iox.printStackTrace();
                    }
                    logger.warn(iox);
                    sk.cancel();
                    readChannel.close();
                    TransportStateEvent e =
                        new TransportStateEvent(DefaultTcpTransportMapping.this,
                                                incomingAddress,
                                                TransportStateEvent.
                                                STATE_DISCONNECTED_REMOTELY,
                                                iox);
                    fireConnectionStateChanged(e);
                  }
                }
              }
            }
          }
          catch (NullPointerException npex) {
            // There seems to happen a NullPointerException within the select()
            npex.printStackTrace();
            logger.warn("NullPointerException within select()?");
          }
          processPending();
        }
        if (ssc != null) {
          ssc.close();
        }
        if (selector != null) {
          selector.close();
        }
      }
      catch (IOException iox) {
        logger.error(iox);
        lastError = iox;
      }
      if (!stop) {
        stop = true;
        synchronized (DefaultTcpTransportMapping.this) {
          server = null;
        }
      }
    }

    private void readMessage(SelectionKey sk, SocketChannel readChannel,
                             TcpAddress incomingAddress) throws IOException {
      // note that socket has been used
      SocketEntry entry = (SocketEntry) sockets.get(incomingAddress);
      if (entry != null) {
        entry.used();
        ByteBuffer readBuffer = entry.getReadBuffer();
        if (readBuffer != null) {
          readChannel.read(readBuffer);
          if (readBuffer.hasRemaining()) {
            readChannel.register(selector,
                                 SelectionKey.OP_READ,
                                 entry);
          }
          else {
            dispatchMessage(incomingAddress, readBuffer, readBuffer.capacity());
          }
          return;
        }
      }
      ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
      byteBuffer.limit(messageLengthDecoder.getMinHeaderLength());
      long bytesRead = readChannel.read(byteBuffer);
      if (logger.isDebugEnabled()) {
        logger.debug("Reading header "+bytesRead+" bytes from " +
                     incomingAddress);
      }
      MessageLength messageLength = new MessageLength(0, Integer.MIN_VALUE);
      if (bytesRead == messageLengthDecoder.getMinHeaderLength()) {
        messageLength =
            messageLengthDecoder.getMessageLength(ByteBuffer.wrap(buf));
        if (logger.isDebugEnabled()) {
          logger.debug("Message length is "+messageLength);
        }
        if ((messageLength.getMessageLength() > getMaxInboundMessageSize()) ||
            (messageLength.getMessageLength() <= 0)) {
          logger.error("Received message length "+messageLength+
                       " is greater than inboundBufferSize "+
                       getMaxInboundMessageSize());
          synchronized(entry) {
            entry.getSocket().close();
            logger.info("Socket to "+entry.getPeerAddress()+
                        " closed due to an error");
          }
        }
        else {
          byteBuffer.limit(messageLength.getMessageLength());
          bytesRead += readChannel.read(byteBuffer);
          if (bytesRead == messageLength.getMessageLength()) {
            dispatchMessage(incomingAddress, byteBuffer, bytesRead);
          }
          else {
            byte[] message = new byte[byteBuffer.limit()];
            byteBuffer.flip();
            byteBuffer.get(message, 0,
                           byteBuffer.limit() - byteBuffer.remaining());
            entry.setReadBuffer(ByteBuffer.wrap(message));
          }
          readChannel.register(selector,
                               SelectionKey.OP_READ,
                               entry);
        }
      }
      else if (bytesRead < 0) {
        logger.debug("Socket closed remotely");
        sk.cancel();
        readChannel.close();
        TransportStateEvent e =
            new TransportStateEvent(DefaultTcpTransportMapping.this,
                                    incomingAddress,
                                    TransportStateEvent.
                                    STATE_DISCONNECTED_REMOTELY,
                                    null);
        fireConnectionStateChanged(e);
      }
    }

    private void dispatchMessage(TcpAddress incomingAddress,
                                 ByteBuffer byteBuffer, long bytesRead) {
      byteBuffer.flip();
      if (logger.isDebugEnabled()) {
        logger.debug("Received message from " + incomingAddress +
                     " with length " + bytesRead + ": " +
                     new OctetString(byteBuffer.array(), 0,
                                     (int)bytesRead).toHexString());
      }
      ByteBuffer bis;
      if (isAsyncMsgProcessingSupported()) {
        byte[] bytes = new byte[(int)bytesRead];
        System.arraycopy(byteBuffer.array(), 0, bytes, 0, (int)bytesRead);
        bis = ByteBuffer.wrap(bytes);
      }
      else {
        bis = ByteBuffer.wrap(byteBuffer.array(),
                              0, (int) bytesRead);
      }
      fireProcessMessage(incomingAddress, bis);
    }

    private void writeMessage(SocketEntry entry, SocketChannel sc) throws
        IOException {
      byte[] message = entry.nextMessage();
      if (message != null) {
        ByteBuffer buffer = ByteBuffer.wrap(message);
        sc.write(buffer);
        if (logger.isDebugEnabled()) {
          logger.debug("Send message with length " +
                       message.length + " to " +
                       entry.getPeerAddress() + ": " +
                       new OctetString(message).toHexString());
        }
        sc.register(selector, SelectionKey.OP_READ);
      }
    }

    public void close() {
      stop = true;
      ServerThread st = server;
      if (st != null) {
        st.interrupt();
      }
    }
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -