⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nioendpoint.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        return workerThread;
    }


    /**
     * Recycle the specified Processor so that it can be used again.
     *
     * @param workerThread The processor to be recycled
     */
    protected void recycleWorkerThread(Worker workerThread) {
        synchronized (workers) {
            workers.push(workerThread);
            curThreadsBusy--;
            workers.notify();
        }
    }


    protected boolean processSocket(SocketChannel socket) {
        try {
            if (executor == null) {
                getWorkerThread().assign(socket);
            }  else {
                executor.execute(new SocketOptionsProcessor(socket));
            }
        } catch (Throwable t) {
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }
    /**
     * Process given socket.
     */
    protected boolean processSocket(NioChannel socket) {
        try {
            if (executor == null) {
                getWorkerThread().assign(socket);
            }  else {
                executor.execute(new SocketProcessor(socket));
            }
        } catch (Throwable t) {
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }


    /**
     * Process given socket for an event.
     */
    protected boolean processSocket(NioChannel socket, SocketStatus status) {
        try {
            if (executor == null) {
                getWorkerThread().assign(socket, status);
            } else {
                executor.execute(new SocketEventProcessor(socket, status));
            }
        } catch (Throwable t) {
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }


    // --------------------------------------------------- Acceptor Inner Class


    /**
     * Server socket acceptor thread.
     */
    protected class Acceptor implements Runnable {


        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        public void run() {

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                try {
                    // Accept the next incoming connection from the server socket
                    SocketChannel socket = serverSock.accept();
                    // Hand this socket off to an appropriate processor
                    if ( running && (!paused) && socket != null ) processSocket(socket);
                } catch (Throwable t) {
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }

                // The processor will recycle itself when it finishes

            }

        }

    }


    // ----------------------------------------------------- Poller Inner Classes

    /**
     * 
     * PollerEvent, cacheable object for poller events to avoid GC
     */
    public class PollerEvent implements Runnable {
        
        protected NioChannel socket;
        protected int interestOps;
        protected KeyAttachment key;
        public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
            reset(ch, k, intOps);
        }
    
        public void reset(NioChannel ch, KeyAttachment k, int intOps) {
            socket = ch;
            interestOps = intOps;
            key = k;
        }
    
        public void reset() {
            reset(null, null, 0);
        }
    
        public void run() {
            if ( interestOps == OP_REGISTER ) {
                try {
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x);
                }
            } else {
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                final KeyAttachment att = (KeyAttachment) key.attachment();
                try {
                    if (key != null) {
                        key.interestOps(interestOps);
                        att.interestOps(interestOps);
                    }
                }
                catch (CancelledKeyException ckx) {
                    try {
                        if (key != null && key.attachment() != null) {
                            KeyAttachment ka = (KeyAttachment) key.attachment();
                            ka.setError(true); //set to collect this socket immediately
                        }
                        try {
                            socket.close();
                        }
                        catch (Exception ignore) {}
                        if (socket.isOpen())
                            socket.close(true);
                    }
                    catch (Exception ignore) {}
                }
            }//end if
        }//run
        
        public String toString() {
            return super.toString()+"[intOps="+this.interestOps+"]";
        }
    }
    /**
     * Poller class.
     */
    public class Poller implements Runnable {

        protected Selector selector;
        protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
        
        protected boolean close = false;
        protected long nextExpiration = 0;//optimize expiration handling

        protected int keepAliveCount = 0;
        public int getKeepAliveCount() { return keepAliveCount; }
        
        protected AtomicLong wakeupCounter = new AtomicLong(0l);



        public Poller() throws IOException {
            this.selector = Selector.open();
        }
        
        public Selector getSelector() { return selector;}

        /**
         * Create the poller. With some versions of APR, the maximum poller size will
         * be 62 (reocmpiling APR is necessary to remove this limitation).
         */
        protected void init() {
            keepAliveCount = 0;
        }

        /**
         * Destroy the poller.
         */
        protected void destroy() {
            // Wait for polltime before doing anything, so that the poller threads
            // exit, otherwise parallel descturction of sockets which are still
            // in the poller can cause problems
            close = true;
            events.clear();
            selector.wakeup();
        }
        
        public void addEvent(Runnable event) {
            events.offer(event);
            if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup();
        }

        /**
         * Add specified socket and associated pool to the poller. The socket will
         * be added to a temporary array, and polled first after a maximum amount
         * of time equal to pollTime (in most cases, latency will be much lower,
         * however).
         *
         * @param socket to add to the poller
         */
        public void add(final NioChannel socket) {
            add(socket,SelectionKey.OP_READ);
        }
        
        public void add(final NioChannel socket, final int interestOps) {
            PollerEvent r = eventCache.poll();
            if ( r==null) r = new PollerEvent(socket,null,interestOps);
            else r.reset(socket,null,interestOps);
            addEvent(r);
        }
        
        public boolean events() {
            boolean result = false;
            //synchronized (events) {
                Runnable r = null;
                result = (events.size() > 0);
                while ( (r = (Runnable)events.poll()) != null ) {
                    try {
                        r.run();
                        if ( r instanceof PollerEvent ) {
                            ((PollerEvent)r).reset();
                            eventCache.offer((PollerEvent)r);
                        }
                    } catch ( Exception x ) {
                        log.error("",x);
                    }
                }
                //events.clear();
            //}
            return result;
        }
        
        public void register(final NioChannel socket)
        {
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment();
            ka.reset(this,socket);
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }
        
        public void cancelledKey(SelectionKey key, SocketStatus status) {
            try {
                KeyAttachment ka = (KeyAttachment) key.attachment();
                if (ka != null && ka.getComet()) {
                    //the comet event takes care of clean up
                    processSocket(ka.getChannel(), status);
                }else {
                    if (key.isValid()) key.cancel();
                    if (key.channel().isOpen()) key.channel().close();
                    key.attach(null);
                }
            } catch (Throwable e) {
                if ( log.isDebugEnabled() ) log.error("",e);
                // Ignore
            }
        }
        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        public void run() {
            // Loop until we receive a shutdown command
            while (running) {
                // Loop if endpoint is paused
                while (paused) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                boolean hasEvents = false;

                hasEvents = (hasEvents | events());
                // Time to terminate?
                if (close) return;

                int keyCount = 0;
                try {
                    keyCount = selector.select(selectorTimeout);
                    wakeupCounter.set(0);
                    if ( close ) { selector.close(); return; }
                } catch ( NullPointerException x ) {
                    //sun bug 5076772 on windows JDK 1.5
                    if ( wakeupCounter == null || selector == null ) throw x;
                    continue;
                } catch ( CancelledKeyException x ) {
                    //sun bug 5076772 on windows JDK 1.5
                    if ( wakeupCounter == null || selector == null ) throw x;
                    continue;
                } catch (Throwable x) {
                    log.error("",x);
                    continue;
                }
                //either we timed out or we woke up, process events first
                if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = (SelectionKey) iterator.next();
                    iterator.remove();
                    KeyAttachment attachment = (KeyAttachment)sk.attachment();
                    try {
                        if ( sk.isValid() && attachment != null ) {
                            attachment.access();
                            sk.attach(attachment);
                            sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket
                            attachment.interestOps(0);
                            NioChannel channel = attachment.getChannel();
                            if (sk.isReadable() || sk.isWritable() ) {
                                if ( attachment.getComet() ) {
                                    if (!processSocket(channel, SocketStatus.OPEN))
                                        processSocket(channel, SocketStatus.DISCONNECT);
                                } else {
                                    boolean close = (!processSocket(channel));
                                    if ( close ) {
                                        channel.close();
                                        channel.getIOChannel().socket().close();
                                    }
                                }
                            } 
                        } else {
                            //invalid key
                            cancelledKey(sk, SocketStatus.ERROR);
                        }
                    } catch ( CancelledKeyException ckx ) {
                        cancelledKey(sk, SocketStatus.ERROR);
                    } catch (Throwable t) {
                        log.error("",t);
                    }
                }//while
                //process timeouts
                timeout(keyCount,hasEvents);
            }//while
            synchronized (this) {
                this.notifyAll();
            }

        }
        protected void timeout(int keyCount, boolean hasEvents) {
            long now = System.currentTimeMillis();
            //don't process timeouts too frequently, but if the selector simply timed out
            //then we can check timeouts to avoid gaps
            if ( (now < nextExpiration) && (keyCount>0 || hasEvents) ) return;
            nextExpiration = now + (long)socketProperties.getSoTimeout();
            //timeout
            Set<SelectionKey> keys = selector.keys();
            for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {
                SelectionKey key = iter.next();
                try {
                    KeyAttachment ka = (KeyAttachment) key.attachment();
                    if ( ka == null ) {
                        cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments
                    } else if ( ka.getError() ) {
                        cancelledKey(key, SocketStatus.DISCONNECT);
                    }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                        //only timeout sockets that we are waiting for a read from
                        long delta = now - ka.getLastAccess();
                        long timeout = (ka.getTimeout()==-1)?((long) socketProperties.getSoTimeout()):(ka.getTimeout());
                        boolean isTimedout = delta > timeout;
                        if (isTimedout) {
                            key.interestOps(0); 
                            ka.interestOps(0); //avoid duplicate timeout calls
                            cancelledKey(key, SocketStatus.TIMEOUT);
                        } else {
                            long nextTime = now+(timeout-delta);
                            nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration;
                        }
                    }//end if
                }catch ( CancelledKeyException ckx ) {
                    cancelledKey(key, SocketStatus.ERROR);
                }
            }//for
        }
    }
    
    public static class KeyAttachment {
        
        public KeyAttachment() {
            
        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -