⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 defaulttcptransportmapping.java

📁 snmp4j 1.8.2版 The org.snmp4j classes are capable of creating, sending, and receiving SNMPv1/v2c/v3
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
  public void setServerEnabled(boolean serverEnabled) {
    this.serverEnabled = serverEnabled;
  }

  /**
   * Sets the message length decoder. Default message length decoder is the
   * {@link SnmpMesssageLengthDecoder}. The message length decoder must be
   * able to decode the total length of a message for this transport mapping
   * protocol(s).
   * @param messageLengthDecoder
   *    a <code>MessageLengthDecoder</code> instance.
   */
  public void setMessageLengthDecoder(MessageLengthDecoder messageLengthDecoder) {
    if (messageLengthDecoder == null) {
      throw new NullPointerException();
    }
    this.messageLengthDecoder = messageLengthDecoder;
  }

  /**
   * Gets the inbound buffer size for incoming requests. When SNMP packets are
   * received that are longer than this maximum size, the messages will be
   * silently dropped and the connection will be closed.
   * @return
   *    the maximum inbound buffer size in bytes.
   */
  public int getMaxInboundMessageSize() {
    return super.getMaxInboundMessageSize();
  }

  /**
   * Sets the maximum buffer size for incoming requests. When SNMP packets are
   * received that are longer than this maximum size, the messages will be
   * silently dropped and the connection will be closed.
   * @param maxInboundMessageSize
   *    the length of the inbound buffer in bytes.
   */
  public void setMaxInboundMessageSize(int maxInboundMessageSize) {
    this.maxInboundMessageSize = maxInboundMessageSize;
  }


  private synchronized void timeoutSocket(SocketEntry entry) {
    if (connectionTimeout > 0) {
      socketCleaner.schedule(new SocketTimeout(entry), connectionTimeout);
    }
  }

  public boolean isListening() {
    return (server != null);
  }

  class SocketEntry {
    private Socket socket;
    private TcpAddress peerAddress;
    private long lastUse;
    private LinkedList message = new LinkedList();
    private ByteBuffer readBuffer = null;

    public SocketEntry(TcpAddress address, Socket socket) {
      this.peerAddress = address;
      this.socket = socket;
      this.lastUse = System.currentTimeMillis();
    }

    public long getLastUse() {
      return lastUse;
    }

    public void used() {
      lastUse = System.currentTimeMillis();
    }

    public Socket getSocket() {
      return socket;
    }

    public TcpAddress getPeerAddress() {
      return peerAddress;
    }

    public synchronized void addMessage(byte[] message) {
      this.message.add(message);
    }

    public byte[] nextMessage() {
      if (this.message.size() > 0) {
        return (byte[])this.message.removeFirst();
      }
      return null;
    }

    public void setReadBuffer(ByteBuffer byteBuffer) {
      this.readBuffer = byteBuffer;
    }

    public ByteBuffer getReadBuffer() {
      return readBuffer;
    }

    public String toString() {
      return "SocketEntry[peerAddress="+peerAddress+
          ",socket="+socket+",lastUse="+new Date(lastUse)+"]";
    }
  }

  public static class SnmpMesssageLengthDecoder implements MessageLengthDecoder {
    public int getMinHeaderLength() {
      return MIN_SNMP_HEADER_LENGTH;
    }
    public MessageLength getMessageLength(ByteBuffer buf) throws IOException {
      MutableByte type = new MutableByte();
      BERInputStream is = new BERInputStream(buf);
      int ml = BER.decodeHeader(is, type);
      int hl = (int)is.getPosition();
      MessageLength messageLength = new MessageLength(hl, ml);
      return messageLength;
    }
  }

  class SocketTimeout extends TimerTask {
    private SocketEntry entry;

    public SocketTimeout(SocketEntry entry) {
      this.entry = entry;
    }

    /**
     * run
     */
    public void run() {
      long now = System.currentTimeMillis();
      if ((socketCleaner == null) ||
          (now - entry.getLastUse() >= connectionTimeout)) {
        if (logger.isDebugEnabled()) {
          logger.debug("Socket has not been used for "+
                       (now - entry.getLastUse())+
                       " micro seconds, closing it");
        }
        sockets.remove(entry.getPeerAddress());
        try {
          synchronized (entry) {
            entry.getSocket().close();
          }
          logger.info("Socket to "+entry.getPeerAddress()+
                      " closed due to timeout");
        }
        catch (IOException ex) {
          logger.error(ex);
        }
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Scheduling " +
                       ((entry.getLastUse() + connectionTimeout) - now));
        }
        socketCleaner.schedule(new  SocketTimeout(entry),
                               (entry.getLastUse() + connectionTimeout) - now);
      }
    }
  }

  class ServerThread extends Thread {
    private byte[] buf;
    private volatile boolean stop = false;
    private Throwable lastError = null;
    private ServerSocketChannel ssc;
    private Selector selector;

    private LinkedList pending = new LinkedList();

    public ServerThread() throws IOException {
      setName("DefaultTCPTransportMapping_"+getAddress());
      buf = new byte[getMaxInboundMessageSize()];
      // Selector for incoming requests
      selector = Selector.open();

      if (serverEnabled) {
        // Create a new server socket and set to non blocking mode
        ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // Bind the server socket
        InetSocketAddress isa = new InetSocketAddress(tcpAddress.getInetAddress(),
            tcpAddress.getPort());
        ssc.socket().bind(isa);
        // Register accepts on the server socket with the selector. This
        // step tells the selector that the socket wants to be put on the
        // ready list when accept operations occur, so allowing multiplexed
        // non-blocking I/O to take place.
        ssc.register(selector, SelectionKey.OP_ACCEPT);
      }
    }

    private void processPending() {
      synchronized (pending) {
        for (int i=0; i<pending.size(); i++) {
          SocketEntry entry = (SocketEntry)pending.getFirst();
          try {
            // Register the channel with the selector, indicating
            // interest in connection completion and attaching the
            // target object so that we can get the target back
            // after the key is added to the selector's
            // selected-key set
            if (entry.getSocket().isConnected()) {
              entry.getSocket().getChannel().register(selector,
                  SelectionKey.OP_WRITE);
            }
            else {
              entry.getSocket().getChannel().register(selector,
                                                      SelectionKey.OP_CONNECT);
            }

          }
          catch (IOException iox) {
            logger.error(iox);
            // Something went wrong, so close the channel and
            // record the failure
            try {
              entry.getSocket().getChannel().close();
              TransportStateEvent e =
                  new TransportStateEvent(DefaultTcpTransportMapping.this,
                                          entry.getPeerAddress(),
                                          TransportStateEvent.STATE_CLOSED,
                                          iox);
              fireConnectionStateChanged(e);
            }
            catch (IOException ex) {
              logger.error(ex);
            }
            lastError = iox;
            if (SNMP4JSettings.isFowardRuntimeExceptions()) {
              throw new RuntimeException(iox);
            }
          }
        }
      }
    }

    public Throwable getLastError() {
      return lastError;
    }

    public void sendMessage(Address address, byte[] message)
        throws java.io.IOException
    {
      Socket s = null;
      SocketEntry entry = (SocketEntry) sockets.get(address);
      if (logger.isDebugEnabled()) {
        logger.debug("Looking up connection for destination '"+address+
                     "' returned: "+entry);
        logger.debug(sockets.toString());
      }
      if (entry != null) {
        s = entry.getSocket();
      }
      if ((s == null) || (s.isClosed()) || (!s.isConnected())) {
        if (logger.isDebugEnabled()) {
          logger.debug("Socket for address '"+address+
                       "' is closed, opening it...");
        }
        SocketChannel sc = null;
        try {
            // Open the channel, set it to non-blocking, initiate connect
            sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.connect(new InetSocketAddress(((TcpAddress)address).getInetAddress(),
                                             ((TcpAddress)address).getPort()));
            s = sc.socket();
            entry = new SocketEntry((TcpAddress)address, s);
            entry.addMessage(message);
            sockets.put(address, entry);

            synchronized (pending) {
              pending.add(entry);
            }

            selector.wakeup();
            logger.debug("Trying to connect to "+address);
        }
        catch (IOException iox) {
          logger.error(iox);
          throw iox;
        }
      }
      else {
        entry.addMessage(message);
        synchronized (pending) {
          pending.add(entry);
        }
        selector.wakeup();
      }
    }


    public void run() {
      // Here's where everything happens. The select method will
      // return when any operations registered above have occurred, the
      // thread has been interrupted, etc.
      try {
        while (!stop) {
          try {
            if (selector.select() > 0) {
              if (stop) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -