⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channelniosocket.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
    }

    public MBeanNotificationInfo[] getNotificationInfo() {
        return notifInfo;
    }

    protected class SocketConnection implements ThreadPoolRunnable {
        MsgContext ep;
        MsgAjp recv = new MsgAjp(packetSize);
        boolean inProgress = false;

        SocketConnection(MsgContext ep) {
            this.ep=ep;
        }

        public Object[] getInitData() {
            return null;
        }
    
        public void runIt(Object perTh[]) {
            if(!processConnection(ep)) {
                unregister(ep);
            }
        }

        public boolean isRunning() {
            return inProgress;
        }

        public  void setFinished() {
            inProgress = false;
        }

        /** Process a single ajp connection.
         */
        boolean processConnection(MsgContext ep) {
            try {
                InputStream sis = (InputStream)ep.getNote(isNote);
                boolean haveInput = true;
                while(haveInput) {
                    if( !running || paused ) {
                        return false;
                    }
                    int status= receive( recv, ep );
                    if( status <= 0 ) {
                        if( status==-3)
                            log.debug( "server has been restarted or reset this connection" );
                        else 
                            log.warn("Closing ajp connection " + status );
                        return false;
                    }
                    ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
                    
                    ep.setType( 0 );
                    // Will call next
                    status= invoke( recv, ep );
                    if( status != JkHandler.OK ) {
                        log.warn("processCallbacks status " + status );
                        return false;
                    }
                    synchronized(this) {
                        synchronized(sis) {
                            haveInput = sis.available() > 0;
                        }
                        if(!haveInput) {
                            setFinished();
                        } else {
                            if(log.isDebugEnabled())
                                log.debug("KeepAlive: "+sis.available());
                        }
                    }
                } 
            } catch( Exception ex ) {
                String msg = ex.getMessage();
                if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
                    log.debug( "Server has been restarted or reset this connection");
                else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
                    log.debug( "connection timeout reached");            
                else
                    log.error( "Error, processing connection", ex);
                return false;
            } 
            return true;
        }

        synchronized void  process(SelectionKey sk) {
            if(!sk.isValid()) {
                return;
            }
            if(sk.isReadable()) {
                SocketInputStream sis = (SocketInputStream)ep.getNote(isNote);
                boolean isok = sis.readAvailable();
                if(!inProgress) {
                    if(isok) {
                        if(sis.available() > 0 || !nioIsBroken){
                            inProgress = true;
                            tp.runIt(this);
                        }
                    } else {
                        unregister(ep);
                        return;
                    }
                } 
            }
            if(sk.isWritable()) {
                Object os = ep.getNote(osNote);
                synchronized(os) {
                    os.notify();
                }
            }
        }

        synchronized void unregister(MsgContext ep) {
            try{
                close(ep);
            } catch(Exception e) {
                log.error("Error closing connection", e);
            }
            try{
                Request req = (Request)ep.getRequest();
                if( req != null ) {
                    ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
                    if( roname != null ) {
                        Registry.getRegistry(null, null).unregisterComponent(roname);
                    }
                    req.getRequestProcessor().setGlobalProcessor(null);
                }
            } catch( Exception ee) {
                log.error( "Error, releasing connection",ee);
            }
        }

        void register(MsgContext ep) {
            Socket s = (Socket)ep.getNote(socketNote);
            try {
                s.getChannel().register(selector, SelectionKey.OP_READ, this);
            } catch(IOException iex) {
                log.error("Unable to register connection",iex);
                unregister(ep);
            }
        }

    }

    protected class Poller implements ThreadPoolRunnable {

        Poller() {
        }

        public Object[] getInitData() {
            return null;
        }
    
        public void runIt(Object perTh[]) {
            while(running) {
                try {
                    int ns = selector.select(serverTimeout);
                    if(log.isDebugEnabled())
                        log.debug("Selecting "+ns+" channels");
                    if(ns > 0) {
                        Set sels = selector.selectedKeys();
                        Iterator it = sels.iterator();
                        while(it.hasNext()) {
                            SelectionKey sk = (SelectionKey)it.next();
                            if(sk.isValid()) {
                                if(sk.isAcceptable()) {
                                    acceptConnections();
                                } else {
                                    SocketConnection sc = (SocketConnection)sk.attachment();
                                    sc.process(sk);
                                }
                            } else {
                                sk.cancel();
                            }
                            it.remove();
                        }
                    }
                } catch(ClosedSelectorException cse) {
                    log.debug("Selector is closed");
                    return;
                } catch(CancelledKeyException cke) {
                    log.debug("Key Cancelled", cke);
                } catch(IOException iex) {
                    log.warn("IO Error in select",iex);
                } catch(Exception ex) {
                    log.warn("Error processing select",ex);
                }
            }
        }
    }

    protected class SocketInputStream extends InputStream {
        final int BUFFER_SIZE = 8200;
        private ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
        private SocketChannel channel;
        private boolean blocking = false;
        private boolean isClosed = false;
        private volatile boolean dataAvailable = false;

        SocketInputStream(SocketChannel channel) {
            this.channel = channel;
            buffer.limit(0);
        }

        public int available() {
            return buffer.remaining();
        }

        public void mark(int readlimit) {
            buffer.mark();
        }

        public boolean markSupported() {
            return true;
        }

        public void reset() {
            buffer.reset();
        }

        public synchronized int read() throws IOException {
            if(!checkAvailable(1)) {
                block(1);
            }
            return buffer.get();
        }

        private boolean checkAvailable(int nbyte) throws IOException {
            if(isClosed) {
                throw new ClosedChannelException();
            }
            return buffer.remaining() >=  nbyte;
        }

        private int fill(int nbyte) throws IOException {
            int rem = nbyte;
            int read = 0;
            boolean eof = false;
            byte [] oldData = null;
            if(buffer.remaining() > 0) {
                // should rarely happen, so short-lived GC shouldn't hurt
                // as much as allocating a long-lived buffer for this
                if(log.isDebugEnabled())
                    log.debug("Saving old buffer: "+buffer.remaining());
                oldData = new byte[buffer.remaining()];
                buffer.get(oldData);
            }
            buffer.clear();
            if(oldData != null) {
                buffer.put(oldData);
            }
            while(rem > 0) {
                int count = channel.read(buffer);
                if(count < 0) {
                    eof = true;
                    break;
                } else if(count == 0) {
                    log.debug("Failed to recieve signaled read: ");
                    break;
                }
                read += count;
                rem -= count;
            }
            buffer.flip();
            return eof ? -1 : read;
        }

        synchronized boolean readAvailable() {
            if(blocking) {
                dataAvailable = true;
                notify();
            } else if(dataAvailable) {
                log.debug("Race Condition");
            } else {
                int nr=0;

                try {
                    nr = fill(1);
                } catch(ClosedChannelException cce) {
                    log.debug("Channel is closed",cce);
                    nr = -1;
                } catch(IOException iex) {
                    log.warn("Exception processing read",iex);
                    nr = -1; // Can't handle this yet
                }
                if(nr < 0) {
                    isClosed = true;
                    notify();
                    return false;
                } else if(nr == 0) {
                    if(!nioIsBroken) {
                        dataAvailable = (buffer.remaining() <= 0);
                    }
                }
            }
            return true;
        }

        public int read(byte [] data) throws IOException {
            return read(data, 0, data.length);
        }

        public synchronized int read(byte [] data, int offset, int len) throws IOException {
            int olen = len;
            while(!checkAvailable(len)) {
                int avail = buffer.remaining();
                if(avail > 0) {
                    buffer.get(data, offset, avail);
                }
                len -= avail;
                offset += avail;
                block(len);
            }
            buffer.get(data, offset, len);
            return olen;
        }

        private void block(int len) throws IOException {
            if(len <= 0) {
                return;
            }
            if(!dataAvailable) {
                blocking = true;
                if(log.isDebugEnabled())
                    log.debug("Waiting for "+len+" bytes to be available");
                try{
                    wait(socketTimeout);
                }catch(InterruptedException iex) {
                    log.debug("Interrupted",iex);
                }
                blocking = false;
            }
            if(dataAvailable) {
                dataAvailable = false;
                if(fill(len) < 0) {
                    isClosed = true;
                } 
            }
        }
    }

    protected class SocketOutputStream extends OutputStream {
        ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
        SocketChannel channel;

        SocketOutputStream(SocketChannel channel) {
            this.channel = channel;
        }

        public void write(int b) throws IOException {
            if(!checkAvailable(1)) {
                flush();
            }
            buffer.put((byte)b);
        }

        public void write(byte [] data) throws IOException {
            write(data, 0, data.length);
        }

        public void write(byte [] data, int offset, int len) throws IOException {
            if(!checkAvailable(len)) {
                flush();
            }
            buffer.put(data, offset, len);
        }

        public void flush() throws IOException {
            buffer.flip();
            while(buffer.hasRemaining()) {
                int count = channel.write(buffer);
                if(count == 0) {
                    synchronized(this) {
                        SelectionKey key = channel.keyFor(selector);
                        key.interestOps(SelectionKey.OP_WRITE);
                        if(log.isDebugEnabled())
                            log.debug("Blocking for channel write: "+buffer.remaining());
                        try {
                            wait();
                        } catch(InterruptedException iex) {
                            // ignore, since can't happen
                        }
                        key.interestOps(SelectionKey.OP_READ);
                    }
                }
            }
            buffer.clear();
        }

        private boolean checkAvailable(int len) {
            return buffer.remaining() >= len;
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -