⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aprendpoint.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
                timeout = soTimeout;
            }
            if (comet) {
                // FIXME: Find an appropriate timeout value, for now, "longer than usual"
                // semms appropriate
                timeout = soTimeout * 50;
            }
            serverPollset = allocatePoller(size, pool, timeout);
            if (serverPollset == 0 && size > 1024) {
                size = 1024;
                serverPollset = allocatePoller(size, pool, timeout);
            }
            if (serverPollset == 0) {
                size = 62;
                serverPollset = allocatePoller(size, pool, timeout);
            }
            desc = new long[size * 2];
            keepAliveCount = 0;
            addS = new long[size];
            addCount = 0;
        }

        /**
         * Destroy the poller.
         */
        protected void destroy() {
            // Wait for polltime before doing anything, so that the poller threads
            // exit, otherwise parallel descturction of sockets which are still
            // in the poller can cause problems
            try {
                synchronized (this) {
                    this.wait(pollTime / 1000);
                }
            } catch (InterruptedException e) {
                // Ignore
            }
            // Close all sockets in the add queue
            for (int i = 0; i < addCount; i++) {
                if (comet) {
                    processSocket(addS[i], SocketStatus.STOP);
                } else {
                    Socket.destroy(addS[i]);
                }
            }
            // Close all sockets still in the poller
            int rv = Poll.pollset(serverPollset, desc);
            if (rv > 0) {
                for (int n = 0; n < rv; n++) {
                    if (comet) {
                        processSocket(desc[n*2+1], SocketStatus.STOP);
                    } else {
                        Socket.destroy(desc[n*2+1]);
                    }
                }
            }
            Pool.destroy(pool);
            keepAliveCount = 0;
            addCount = 0;
        }

        /**
         * Add specified socket and associated pool to the poller. The socket will
         * be added to a temporary array, and polled first after a maximum amount
         * of time equal to pollTime (in most cases, latency will be much lower,
         * however).
         *
         * @param socket to add to the poller
         */
        public void add(long socket) {
            synchronized (this) {
                // Add socket to the list. Newly added sockets will wait
                // at most for pollTime before being polled
                if (addCount >= addS.length) {
                    // Can't do anything: close the socket right away
                    if (comet) {
                        processSocket(socket, SocketStatus.ERROR);
                    } else {
                        Socket.destroy(socket);
                    }
                    return;
                }
                addS[addCount] = socket;
                addCount++;
                this.notify();
            }
        }

        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        public void run() {

            long maintainTime = 0;
            // Loop until we receive a shutdown command
            while (running) {
                // Loop if endpoint is paused
                while (paused) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                while (keepAliveCount < 1 && addCount < 1) {
                    // Reset maintain time.
                    maintainTime = 0;
                    try {
                        synchronized (this) {
                            this.wait();
                        }
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                try {
                    // Add sockets which are waiting to the poller
                    if (addCount > 0) {
                        synchronized (this) {
                            for (int i = (addCount - 1); i >= 0; i--) {
                                int rv = Poll.add
                                    (serverPollset, addS[i], Poll.APR_POLLIN);
                                if (rv == Status.APR_SUCCESS) {
                                    keepAliveCount++;
                                } else {
                                    // Can't do anything: close the socket right away
                                    if (comet) {
                                        processSocket(addS[i], SocketStatus.ERROR);
                                    } else {
                                        Socket.destroy(addS[i]);
                                    }
                                }
                            }
                            addCount = 0;
                        }
                    }

                    maintainTime += pollTime;
                    // Pool for the specified interval
                    int rv = Poll.poll(serverPollset, pollTime, desc, true);
                    if (rv > 0) {
                        keepAliveCount -= rv;
                        for (int n = 0; n < rv; n++) {
                            // Check for failed sockets and hand this socket off to a worker
                            if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
                                    || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
                                    || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN))) 
                                    || (!comet && (!processSocket(desc[n*2+1])))) {
                                // Close socket and clear pool
                                if (comet) {
                                    processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
                                } else {
                                    Socket.destroy(desc[n*2+1]);
                                }
                                continue;
                            }
                        }
                    } else if (rv < 0) {
                        int errn = -rv;
                        /* Any non timeup or interrupted error is critical */
                        if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
                            if (errn >  Status.APR_OS_START_USERERR) {
                                errn -=  Status.APR_OS_START_USERERR;
                            }
                            log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
                            // Handle poll critical failure
                            synchronized (this) {
                                destroy();
                                init();
                            }
                            continue;
                        }
                    }
                    if (soTimeout > 0 && maintainTime > 1000000L && running) {
                        rv = Poll.maintain(serverPollset, desc, true);
                        maintainTime = 0;
                        if (rv > 0) {
                            keepAliveCount -= rv;
                            for (int n = 0; n < rv; n++) {
                                // Close socket and clear pool
                                if (comet) {
                                    processSocket(desc[n], SocketStatus.TIMEOUT);
                                } else {
                                    Socket.destroy(desc[n]);
                                }
                            }
                        }
                    }
                } catch (Throwable t) {
                    log.error(sm.getString("endpoint.poll.error"), t);
                }

            }

            synchronized (this) {
                this.notifyAll();
            }

        }
        
    }


    // ----------------------------------------------------- Worker Inner Class


    /**
     * Server processor class.
     */
    protected class Worker implements Runnable {


        protected Thread thread = null;
        protected boolean available = false;
        protected long socket = 0;
        protected SocketStatus status = null;
        protected boolean options = false;


        /**
         * Process an incoming TCP/IP connection on the specified socket.  Any
         * exception that occurs during processing must be logged and swallowed.
         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
         * must assign it to our own thread so that multiple simultaneous
         * requests can be handled.
         *
         * @param socket TCP socket to process
         */
        protected synchronized void assignWithOptions(long socket) {

            // Wait for the Processor to get the previous Socket
            while (available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }

            // Store the newly available Socket and notify our thread
            this.socket = socket;
            status = null;
            options = true;
            available = true;
            notifyAll();

        }


        /**
         * Process an incoming TCP/IP connection on the specified socket.  Any
         * exception that occurs during processing must be logged and swallowed.
         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
         * must assign it to our own thread so that multiple simultaneous
         * requests can be handled.
         *
         * @param socket TCP socket to process
         */
        protected synchronized void assign(long socket) {

            // Wait for the Processor to get the previous Socket
            while (available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }

            // Store the newly available Socket and notify our thread
            this.socket = socket;
            status = null;
            options = false;
            available = true;
            notifyAll();

        }


        protected synchronized void assign(long socket, SocketStatus status) {

            // Wait for the Processor to get the previous Socket
            while (available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }

            // Store the newly available Socket and notify our thread
            this.socket = socket;
            this.status = status;
            options = false;
            available = true;
            notifyAll();

        }


        /**
         * Await a newly assigned Socket from our Connector, or <code>null</code>
         * if we are supposed to shut down.
         */
        protected synchronized long await() {

            // Wait for the Connector to provide a new Socket
            while (!available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }

            // Notify the Connector that we have received this Socket
            long socket = this.socket;
            available = false;
            notifyAll();

            return (socket);

        }


        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        public void run() {

            // Process requests until we receive a shutdown signal
            while (running) {

                // Wait for the next socket to be assigned
                long socket = await();
                if (socket == 0)
                    continue;

                // Process the request from this socket
                if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
                    // Close socket and pool
                    Socket.destroy(socket);
                    socket = 0;
                } else if ((status == null) && ((options && !setSocketOptions(socket)) 
                        || handler.process(socket) == Handler.SocketState.CLOSED)) {
                    // Close socket and pool
                    Socket.destroy(socket);
                    socket = 0;
                }

                // Finish up this request
                recycleWorkerThread(this);

            }

        }


        /**
         * Start the background processing thread.
         */
        public void start() {
            thread = new Thread(this);
            thread.setName(getName() + "-" + (++curThreads));
            thread.setDaemon(true);
            thread.start();
        }


    }


    // ----------------------------------------------- SendfileData Inner Class


    /**
     * SendfileData class.
     */
    public static class SendfileData {
        // File
        public String fileName;
        public long fd;
        public long fdpool;
        // Range information
        public long start;
        public long end;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -