⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 defaulttcptransportmapping.java.svn-base

📁 snmp hibernate 源码, 类似hibernate的映射.
💻 SVN-BASE
📖 第 1 页 / 共 2 页
字号:
          logger.debug("Socket has not been used for "+
                       (now - entry.getLastUse())+
                       " micro seconds, closing it");
        }
        sockets.remove(entry.getPeerAddress());
        try {
          synchronized (entry) {
            entry.getSocket().close();
          }
          logger.info("Socket to "+entry.getPeerAddress()+
                      " closed due to timeout");
        }
        catch (IOException ex) {
          logger.error(ex);
        }
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Scheduling " +
                       ((entry.getLastUse() + connectionTimeout) - now));
        }
        socketCleaner.schedule(new  SocketTimeout(entry),
                               (entry.getLastUse() + connectionTimeout) - now);
      }
    }
  }

  class ServerThread extends Thread {
    private byte[] buf;
    private volatile boolean stop = false;
    private Throwable lastError = null;
    private ServerSocketChannel ssc;
    private Selector selector;

    private LinkedList pending = new LinkedList();

    public ServerThread() throws IOException {
      setName("DefaultTCPTransportMapping_"+getAddress());
      buf = new byte[getMaxInboundMessageSize()];
      // Selector for incoming requests
      selector = Selector.open();

      if (serverEnabled) {
        // Create a new server socket and set to non blocking mode
        ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // Bind the server socket
        InetSocketAddress isa = new InetSocketAddress(tcpAddress.getInetAddress(),
            tcpAddress.getPort());
        ssc.socket().bind(isa);
        // Register accepts on the server socket with the selector. This
        // step tells the selector that the socket wants to be put on the
        // ready list when accept operations occur, so allowing multiplexed
        // non-blocking I/O to take place.
        ssc.register(selector, SelectionKey.OP_ACCEPT);
      }
    }

    private void processPending() {
      synchronized (pending) {
        while (pending.size() > 0) {
          SocketEntry entry = (SocketEntry)pending.removeFirst();
          try {
            // Register the channel with the selector, indicating
            // interest in connection completion and attaching the
            // target object so that we can get the target back
            // after the key is added to the selector's
            // selected-key set
            if (entry.getSocket().isConnected()) {
              entry.getSocket().getChannel().register(selector,
                  SelectionKey.OP_WRITE,
                  entry);
            }
            else {
              entry.getSocket().getChannel().register(selector,
                                                      SelectionKey.OP_CONNECT,
                                                      entry);
            }

          }
          catch (IOException iox) {
            logger.error(iox);
            // Something went wrong, so close the channel and
            // record the failure
            try {
              entry.getSocket().getChannel().close();
              TransportStateEvent e =
                  new TransportStateEvent(DefaultTcpTransportMapping.this,
                                          entry.getPeerAddress(),
                                          TransportStateEvent.STATE_CLOSED,
                                          iox);
              fireConnectionStateChanged(e);
            }
            catch (IOException ex) {
              logger.error(ex);
            }
            lastError = iox;
          }
        }
      }
    }

    public Throwable getLastError() {
      return lastError;
    }

    public void sendMessage(Address address, byte[] message)
        throws java.io.IOException
    {
      Socket s = null;
      SocketEntry entry = (SocketEntry) sockets.get(address);
      if (logger.isDebugEnabled()) {
        logger.debug("Looking up connection for destination '"+address+
                     "' returned: "+entry);
        logger.debug(sockets.toString());
      }
      if (entry != null) {
        s = entry.getSocket();
      }
      if ((s == null) || (s.isClosed())) {
        if (logger.isDebugEnabled()) {
          logger.debug("Socket for address '"+address+
                       "' is closed, opening it...");
        }
        SocketChannel sc = null;
        try {
            // Open the channel, set it to non-blocking, initiate connect
            sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.connect(new InetSocketAddress(((TcpAddress)address).getInetAddress(),
                                             ((TcpAddress)address).getPort()));
            s = sc.socket();
            entry = new SocketEntry((TcpAddress)address, s);
            entry.addMessage(message);
            sockets.put(address, entry);

            synchronized (pending) {
              pending.add(entry);
            }

            selector.wakeup();
            logger.debug("Trying to connect to "+address);
        }
        catch (IOException iox) {
          logger.error(iox);
          throw iox;
        }
      }
      else {
        entry.addMessage(message);
        synchronized (pending) {
          pending.add(entry);
        }
        selector.wakeup();
      }
    }


    public void run() {
      // Here's where everything happens. The select method will
      // return when any operations registered above have occurred, the
      // thread has been interrupted, etc.
      try {
        while (!stop) {
          try {
            if (selector.select() > 0) {
              if (stop) {
                break;
              }
              // Someone is ready for I/O, get the ready keys
              Set readyKeys = selector.selectedKeys();
              Iterator it = readyKeys.iterator();

              // Walk through the ready keys collection and process date requests.
              while (it.hasNext()) {
                SelectionKey sk = (SelectionKey) it.next();
                it.remove();
                SocketChannel readChannel = null;
                TcpAddress incomingAddress = null;
                if (sk.isAcceptable()) {
                  // The key indexes into the selector so you
                  // can retrieve the socket that's ready for I/O
                  ServerSocketChannel nextReady =
                      (ServerSocketChannel) sk.channel();
                  // Accept the date request and send back the date string
                  Socket s = nextReady.accept().socket();
                  readChannel = s.getChannel();
                  readChannel.configureBlocking(false);
                  readChannel.register(selector,
                                       SelectionKey.OP_READ);

                  incomingAddress = new TcpAddress(s.getInetAddress(),
                      s.getPort());
                  SocketEntry entry = new SocketEntry(incomingAddress, s);
                  sockets.put(incomingAddress, entry);
                  timeoutSocket(entry);
                  TransportStateEvent e =
                      new TransportStateEvent(DefaultTcpTransportMapping.this,
                                              incomingAddress,
                                              TransportStateEvent.
                                              STATE_CONNECTED,
                                              null);
                  fireConnectionStateChanged(e);
                }
                else if (sk.isReadable()) {
                  readChannel = (SocketChannel) sk.channel();
                  incomingAddress =
                      new TcpAddress(readChannel.socket().getInetAddress(),
                                     readChannel.socket().getPort());
                }
                else if (sk.isWritable()) {
                  try {
                    SocketEntry entry = (SocketEntry) sk.attachment();
                    SocketChannel sc = (SocketChannel) sk.channel();
                    if (entry != null) {
                      writeMessage(entry, sc);
                    }
                  }
                  catch (IOException iox) {
                    if (logger.isDebugEnabled()) {
                      iox.printStackTrace();
                    }
                    logger.warn(iox);
                    TransportStateEvent e =
                        new TransportStateEvent(DefaultTcpTransportMapping.this,
                                                incomingAddress,
                                                TransportStateEvent.
                                                STATE_DISCONNECTED_REMOTELY,
                                                iox);
                    fireConnectionStateChanged(e);
                    sk.cancel();
                  }
                }
                else if (sk.isConnectable()) {
                  try {
                    SocketEntry entry = (SocketEntry) sk.attachment();
                    SocketChannel sc = (SocketChannel) sk.channel();
                    if ((!sc.isConnected()) && (sc.finishConnect())) {
                      sc.configureBlocking(false);
                      logger.debug("Connected to " + entry.getPeerAddress());
                      // make sure conncetion is closed if not used for timeout
                      // micro seconds
                      timeoutSocket(entry);
                      sc.register(selector,
                                  SelectionKey.OP_WRITE,
                                  entry);
                    }
                    TransportStateEvent e =
                        new TransportStateEvent(DefaultTcpTransportMapping.this,
                                                incomingAddress,
                                                TransportStateEvent.
                                                STATE_CONNECTED,
                                                null);
                    fireConnectionStateChanged(e);
                  }
                  catch (IOException iox) {
                    if (logger.isDebugEnabled()) {
                      iox.printStackTrace();
                    }
                    logger.warn(iox);
                    sk.cancel();
                  }
                }

                if (readChannel != null) {
                  try {
                    readMessage(sk, readChannel, incomingAddress);
                  }
                  catch (IOException iox) {
                    // IO exception -> channel closed remotely
                    if (logger.isDebugEnabled()) {
                      iox.printStackTrace();
                    }
                    logger.warn(iox);
                    sk.cancel();
                    readChannel.close();
                    TransportStateEvent e =
                        new TransportStateEvent(DefaultTcpTransportMapping.this,
                                                incomingAddress,
                                                TransportStateEvent.
                                                STATE_DISCONNECTED_REMOTELY,
                                                iox);
                    fireConnectionStateChanged(e);
                  }
                }
              }
            }
          }
          catch (NullPointerException npex) {
            // There seems to happen a NullPointerException within the select()
            npex.printStackTrace();
            logger.warn("NullPointerException within select()?");
          }
          processPending();
        }
        if (ssc != null) {
          ssc.close();
        }
      }
      catch (IOException iox) {
        logger.error(iox);
        lastError = iox;
      }
      if (!stop) {
        stop = true;
        synchronized (DefaultTcpTransportMapping.this) {
          server = null;
        }
      }
    }

    private void readMessage(SelectionKey sk, SocketChannel readChannel,
                             TcpAddress incomingAddress) throws IOException {
      // note that socket has been used
      SocketEntry entry = (SocketEntry) sockets.get(incomingAddress);
      if (entry != null) {
        entry.used();
        ByteBuffer readBuffer = entry.getReadBuffer();
        if (readBuffer != null) {
          readChannel.read(readBuffer);
          if (readBuffer.hasRemaining()) {
            readChannel.register(selector,
                                 SelectionKey.OP_READ,
                                 entry);
          }
          else {
            dispatchMessage(incomingAddress, readBuffer, readBuffer.capacity());
          }
          return;
        }
      }
      ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
      byteBuffer.limit(messageLengthDecoder.getMinHeaderLength());
      long bytesRead = readChannel.read(byteBuffer);
      if (logger.isDebugEnabled()) {
        logger.debug("Reading header "+bytesRead+" bytes from " +
                     incomingAddress);
      }
      MessageLength messageLength = new MessageLength(0, Integer.MIN_VALUE);
      if (bytesRead == messageLengthDecoder.getMinHeaderLength()) {
        messageLength =
            messageLengthDecoder.getMessageLength(ByteBuffer.wrap(buf));
        if (logger.isDebugEnabled()) {
          logger.debug("Message length is "+messageLength);
        }
        if ((messageLength.getMessageLength() > getMaxInboundMessageSize()) ||
            (messageLength.getMessageLength() <= 0)) {
          logger.error("Received message length "+messageLength+
                       " is greater than inboundBufferSize "+
                       getMaxInboundMessageSize());
          synchronized(entry) {
            entry.getSocket().close();
            logger.info("Socket to "+entry.getPeerAddress()+
                        " closed due to an error");
          }
        }
        else {
          byteBuffer.limit(messageLength.getMessageLength());
          bytesRead += readChannel.read(byteBuffer);
          if (bytesRead == messageLength.getMessageLength()) {
            dispatchMessage(incomingAddress, byteBuffer, bytesRead);
          }
          else {
            byte[] message = new byte[byteBuffer.limit()];
            byteBuffer.flip();
            byteBuffer.get(message, 0,
                           byteBuffer.limit() - byteBuffer.remaining());
            entry.setReadBuffer(ByteBuffer.wrap(message));
          }
          readChannel.register(selector,
                               SelectionKey.OP_READ,
                               entry);
        }
      }
      else if (bytesRead < 0) {
        logger.debug("Socket closed remotely");
        sk.cancel();
        readChannel.close();
        TransportStateEvent e =
            new TransportStateEvent(DefaultTcpTransportMapping.this,
                                    incomingAddress,
                                    TransportStateEvent.
                                    STATE_DISCONNECTED_REMOTELY,
                                    null);
        fireConnectionStateChanged(e);
      }
    }

    private void dispatchMessage(TcpAddress incomingAddress,
                                 ByteBuffer byteBuffer, long bytesRead) {
      byteBuffer.flip();
      if (logger.isDebugEnabled()) {
        logger.debug("Received message from " + incomingAddress +
                     " with length " + bytesRead + ": " +
                     new OctetString(byteBuffer.array(), 0,
                                     (int)bytesRead).toHexString());
      }
      ByteBuffer bis;
      if (isAsyncMsgProcessingSupported()) {
        byte[] bytes = new byte[(int)bytesRead];
        System.arraycopy(byteBuffer.array(), 0, bytes, 0, (int)bytesRead);
        bis = ByteBuffer.wrap(bytes);
      }
      else {
        bis = ByteBuffer.wrap(byteBuffer.array(),
                              0, (int) bytesRead);
      }
      fireProcessMessage(incomingAddress, bis);
    }

    private void writeMessage(SocketEntry entry, SocketChannel sc) throws
        IOException {
      byte[] message = entry.nextMessage();
      if (message != null) {
        ByteBuffer buffer = ByteBuffer.wrap(message);
        sc.write(buffer);
        if (logger.isDebugEnabled()) {
          logger.debug("Send message with length " +
                       message.length + " to " +
                       entry.getPeerAddress() + ": " +
                       new OctetString(message).toHexString());
        }
        sc.register(selector, SelectionKey.OP_READ);
      }
    }

    public void close() {
      stop = true;
      ServerThread st = server;
      if (st != null) {
        st.interrupt();
      }
    }
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -