scheduler.java
来自「opennms得相关源码 请大家看看」· Java 代码 · 共 545 行 · 第 1/2 页
JAVA
545 行
} else if (log.isDebugEnabled()) { log.debug("schedule: queue element added, notification not performed"); } } catch (InterruptedException ie) { if (log.isInfoEnabled()) log.info("schedule: failed to add new ready runnable instance "+runnable+" to scheduler", ie); Thread.currentThread().interrupt(); } catch (FifoQueueException ex) { if (log.isInfoEnabled()) log.info("schedule: failed to add new ready runnable instance "+runnable+" to scheduler", ex); throw new UndeclaredThrowableException(ex); } } /** * This method is used to schedule a ready runnable in the system. The * interval is used as the key for determining which queue to add the * runnable. * @param interval * The queue to add the runnable to. * @param runnable * The element to run when interval expires. * * @throws java.lang.RuntimeException * Thrown if an error occurs adding the element to the queue. */ public synchronized void schedule(long interval, final ReadyRunnable runnable) { final long timeToRun = getCurrentTime()+interval; ReadyRunnable timeKeeper = new ReadyRunnable() { public boolean isReady() { return getCurrentTime() >= timeToRun && runnable.isReady(); } public void run() { runnable.run(); } public String toString() { return runnable.toString()+" (ready in "+Math.max(0, timeToRun-getCurrentTime())+"ms)"; } }; schedule(timeKeeper, interval); } /** * This returns the current time for the scheduler */ public long getCurrentTime() { return System.currentTimeMillis(); } /** * Starts the fiber. * * @throws java.lang.IllegalStateException * Thrown if the fiber is already running. */ public synchronized void start() { if (m_worker != null) throw new IllegalStateException("The fiber has already run or is running"); Category log = ThreadCategory.getInstance(getClass()); m_runner.start(); m_worker = new Thread(this, getName()); m_worker.start(); m_status = STARTING; if (log.isDebugEnabled()) log.debug("start: scheduler started"); } /** * Stops the fiber. If the fiber has never been run then an exception is * generated. * * @throws java.lang.IllegalStateException * Throws if the fiber has never been started. */ public synchronized void stop() { if (m_worker == null) throw new IllegalStateException("The fiber has never been started"); Category log = ThreadCategory.getInstance(getClass()); m_status = STOP_PENDING; m_worker.interrupt(); m_runner.stop(); if (log.isDebugEnabled()) log.debug("stop: scheduler stopped"); } /** * Pauses the scheduler if it is current running. If the fiber has not been * run or has already stopped then an exception is generated. * * @throws java.lang.IllegalStateException * Throws if the operation could not be completed due to the * fiber's state. */ public synchronized void pause() { if (m_worker == null) throw new IllegalStateException("The fiber has never been started"); if (m_status == STOPPED || m_status == STOP_PENDING) throw new IllegalStateException("The fiber is not running or a stop is pending"); if (m_status == PAUSED) return; m_status = PAUSE_PENDING; notifyAll(); } /** * Resumes the scheduler if it has been paused. If the fiber has not been * run or has already stopped then an exception is generated. * * @throws java.lang.IllegalStateException * Throws if the operation could not be completed due to the * fiber's state. */ public synchronized void resume() { if (m_worker == null) throw new IllegalStateException("The fiber has never been started"); if (m_status == STOPPED || m_status == STOP_PENDING) throw new IllegalStateException("The fiber is not running or a stop is pending"); if (m_status == RUNNING) return; m_status = RESUME_PENDING; notifyAll(); } /** * Returns the current of this fiber. * * @return The current status. */ public synchronized int getStatus() { if (m_worker != null && m_worker.isAlive() == false) m_status = STOPPED; return m_status; } /** * Returns the name of this fiber. * */ public String getName() { return m_runner.getName(); } /** * The main method of the scheduler. This method is responsible for checking * the runnable queues for ready objects and then enqueuing them into the * thread pool for execution. * */ public void run() { Category log = ThreadCategory.getInstance(getClass()); synchronized (this) { m_status = RUNNING; } if (log.isDebugEnabled()) log.debug("run: scheduler running"); // Loop until a fatal exception occurs or until // the thread is interrupted. // for (;;) { // block if there is nothing in the queue(s) // When something is added to the queue it // signals us to wakeup // synchronized (this) { if (m_status != RUNNING && m_status != PAUSED && m_status != PAUSE_PENDING && m_status != RESUME_PENDING) { if (log.isDebugEnabled()) log.debug("run: status = " + m_status + ", time to exit"); break; } if (m_scheduled == 0) { try { if (log.isDebugEnabled()) log.debug("run: no interfaces scheduled, waiting..."); wait(); } catch (InterruptedException ex) { break; } } } // cycle through the queues checking for // what's ready to run. The queues are keyed // by the interval, but the mapped elements // are peekable fifo queues. // int runned = 0; FifoQueue out = m_runner.getRunQueue(); synchronized (m_queues) { // get an iterator so that we can cycle // through the queue elements. // Iterator iter = m_queues.entrySet().iterator(); while (iter.hasNext()) { // Peak for Runnable objects until // there are no more ready runnables // // Also, only go through each queue once! // if we didn't add a count then it would // be possible to starve other queues. // PeekableFifoQueue in = (PeekableFifoQueue) ((Map.Entry) iter.next()).getValue(); ReadyRunnable readyRun = null; int maxLoops = in.size(); do { try { readyRun = (ReadyRunnable) in.peek(); if (readyRun != null && readyRun.isReady()) { if (log.isDebugEnabled()) { log.debug("run: found ready runnable "+readyRun); } // Pop the interface/readyRunnable from the // queue for execution. // in.remove(); // Add runnable to the execution queue out.add(readyRun); ++runned; } } catch (InterruptedException ex) { return; // jump all the way out } catch (FifoQueueException qe) { throw new UndeclaredThrowableException(qe); } } while (readyRun != null && readyRun.isReady() && --maxLoops > 0); } } // Wait for 1 second if there were no runnables // executed during this loop, otherwise just // start over. // synchronized (this) { m_scheduled -= runned; if (runned == 0) { try { wait(1000); } catch (InterruptedException ex) { break; // exit for loop } } } } // end for(;;) if (log.isDebugEnabled()) log.debug("run: scheduler exiting, state = STOPPED"); synchronized (this) { m_status = STOPPED; } } // end run}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?