scheduler.java

来自「opennms得相关源码 请大家看看」· Java 代码 · 共 545 行 · 第 1/2 页

JAVA
545
字号
            } else if (log.isDebugEnabled()) {                log.debug("schedule: queue element added, notification not performed");            }        } catch (InterruptedException ie) {            if (log.isInfoEnabled())                log.info("schedule: failed to add new ready runnable instance "+runnable+" to scheduler", ie);            Thread.currentThread().interrupt();        } catch (FifoQueueException ex) {            if (log.isInfoEnabled())                log.info("schedule: failed to add new ready runnable instance "+runnable+" to scheduler", ex);            throw new UndeclaredThrowableException(ex);        }    }    /**     * This method is used to schedule a ready runnable in the system. The     * interval is used as the key for determining which queue to add the     * runnable.     * @param interval     *            The queue to add the runnable to.     * @param runnable     *            The element to run when interval expires.     *      * @throws java.lang.RuntimeException     *             Thrown if an error occurs adding the element to the queue.     */    public synchronized void schedule(long interval, final ReadyRunnable runnable) {        final long timeToRun = getCurrentTime()+interval;        ReadyRunnable timeKeeper = new ReadyRunnable() {            public boolean isReady() {                return getCurrentTime() >= timeToRun && runnable.isReady();            }                        public void run() {                runnable.run();            }                        public String toString() { return runnable.toString()+" (ready in "+Math.max(0, timeToRun-getCurrentTime())+"ms)"; }        };        schedule(timeKeeper, interval);    }        /**     * This returns the current time for the scheduler     */    public long getCurrentTime() {        return System.currentTimeMillis();    }    /**     * Starts the fiber.     *      * @throws java.lang.IllegalStateException     *             Thrown if the fiber is already running.     */    public synchronized void start() {        if (m_worker != null)            throw new IllegalStateException("The fiber has already run or is running");        Category log = ThreadCategory.getInstance(getClass());        m_runner.start();        m_worker = new Thread(this, getName());        m_worker.start();        m_status = STARTING;        if (log.isDebugEnabled())            log.debug("start: scheduler started");    }    /**     * Stops the fiber. If the fiber has never been run then an exception is     * generated.     *      * @throws java.lang.IllegalStateException     *             Throws if the fiber has never been started.     */    public synchronized void stop() {        if (m_worker == null)            throw new IllegalStateException("The fiber has never been started");        Category log = ThreadCategory.getInstance(getClass());        m_status = STOP_PENDING;        m_worker.interrupt();        m_runner.stop();        if (log.isDebugEnabled())            log.debug("stop: scheduler stopped");    }    /**     * Pauses the scheduler if it is current running. If the fiber has not been     * run or has already stopped then an exception is generated.     *      * @throws java.lang.IllegalStateException     *             Throws if the operation could not be completed due to the     *             fiber's state.     */    public synchronized void pause() {        if (m_worker == null)            throw new IllegalStateException("The fiber has never been started");        if (m_status == STOPPED || m_status == STOP_PENDING)            throw new IllegalStateException("The fiber is not running or a stop is pending");        if (m_status == PAUSED)            return;        m_status = PAUSE_PENDING;        notifyAll();    }    /**     * Resumes the scheduler if it has been paused. If the fiber has not been     * run or has already stopped then an exception is generated.     *      * @throws java.lang.IllegalStateException     *             Throws if the operation could not be completed due to the     *             fiber's state.     */    public synchronized void resume() {        if (m_worker == null)            throw new IllegalStateException("The fiber has never been started");        if (m_status == STOPPED || m_status == STOP_PENDING)            throw new IllegalStateException("The fiber is not running or a stop is pending");        if (m_status == RUNNING)            return;        m_status = RESUME_PENDING;        notifyAll();    }    /**     * Returns the current of this fiber.     *      * @return The current status.     */    public synchronized int getStatus() {        if (m_worker != null && m_worker.isAlive() == false)            m_status = STOPPED;        return m_status;    }    /**     * Returns the name of this fiber.     *      */    public String getName() {        return m_runner.getName();    }    /**     * The main method of the scheduler. This method is responsible for checking     * the runnable queues for ready objects and then enqueuing them into the     * thread pool for execution.     *      */    public void run() {        Category log = ThreadCategory.getInstance(getClass());        synchronized (this) {            m_status = RUNNING;        }        if (log.isDebugEnabled())            log.debug("run: scheduler running");        // Loop until a fatal exception occurs or until        // the thread is interrupted.        //        for (;;) {            // block if there is nothing in the queue(s)            // When something is added to the queue it            // signals us to wakeup            //            synchronized (this) {                if (m_status != RUNNING && m_status != PAUSED && m_status != PAUSE_PENDING && m_status != RESUME_PENDING) {                    if (log.isDebugEnabled())                        log.debug("run: status = " + m_status + ", time to exit");                    break;                }                if (m_scheduled == 0) {                    try {                        if (log.isDebugEnabled())                            log.debug("run: no interfaces scheduled, waiting...");                        wait();                    } catch (InterruptedException ex) {                        break;                    }                }            }            // cycle through the queues checking for            // what's ready to run. The queues are keyed            // by the interval, but the mapped elements            // are peekable fifo queues.            //            int runned = 0;            FifoQueue out = m_runner.getRunQueue();            synchronized (m_queues) {                // get an iterator so that we can cycle                // through the queue elements.                //                Iterator iter = m_queues.entrySet().iterator();                while (iter.hasNext()) {                    // Peak for Runnable objects until                    // there are no more ready runnables                    //                    // Also, only go through each queue once!                    // if we didn't add a count then it would                    // be possible to starve other queues.                    //                    PeekableFifoQueue in = (PeekableFifoQueue) ((Map.Entry) iter.next()).getValue();                    ReadyRunnable readyRun = null;                    int maxLoops = in.size();                    do {                        try {                            readyRun = (ReadyRunnable) in.peek();                            if (readyRun != null && readyRun.isReady()) {                                if (log.isDebugEnabled()) {                                    log.debug("run: found ready runnable "+readyRun);                                }                                // Pop the interface/readyRunnable from the                                // queue for execution.                                //                                in.remove();                                // Add runnable to the execution queue                                out.add(readyRun);                                ++runned;                            }                        } catch (InterruptedException ex) {                            return; // jump all the way out                        } catch (FifoQueueException qe) {                            throw new UndeclaredThrowableException(qe);                        }                    } while (readyRun != null && readyRun.isReady() && --maxLoops > 0);                }            }            // Wait for 1 second if there were no runnables            // executed during this loop, otherwise just            // start over.            //            synchronized (this) {                m_scheduled -= runned;                if (runned == 0) {                    try {                        wait(1000);                    } catch (InterruptedException ex) {                        break; // exit for loop                    }                }            }        } // end for(;;)        if (log.isDebugEnabled())            log.debug("run: scheduler exiting, state = STOPPED");        synchronized (this) {            m_status = STOPPED;        }    } // end run}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?