timescheduler.java

来自「JGRoups源码」· Java 代码 · 共 564 行 · 第 1/2 页

JAVA
564
字号
// $Id: TimeScheduler.java,v 1.14 2006/10/11 19:01:37 vlada Exp $package org.jgroups.util;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.util.Iterator;import java.util.SortedSet;import java.util.TreeSet;/** * Fixed-delay & fixed-rate single thread scheduler * <p/> * The scheduler supports varying scheduling intervals by asking the task * every time for its next preferred scheduling interval. Scheduling can * either be <i>fixed-delay</i> or <i>fixed-rate</i>. The notions are * borrowed from <tt>java.util.Timer</tt> and retain the same meaning. * I.e. in fixed-delay scheduling, the task's new schedule is calculated * as:<br> * new_schedule = time_task_starts + scheduling_interval * <p/> * In fixed-rate scheduling, the next schedule is calculated as:<br> * new_schedule = time_task_was_supposed_to_start + scheduling_interval * <p/> * The scheduler internally holds a queue of tasks sorted in ascending order * according to their next execution time. A task is removed from the queue * if it is cancelled, i.e. if <tt>TimeScheduler.Task.isCancelled()</tt> * returns true. * <p/> * The scheduler internally uses a <tt>java.util.SortedSet</tt> to keep tasks * sorted. <tt>java.util.Timer</tt> uses an array arranged as a binary heap * that doesn't shrink. It is likely that the latter arrangement is faster. * <p/> * Initially, the scheduler is in <tt>SUSPEND</tt>ed mode, <tt>start()</tt> * need not be called: if a task is added, the scheduler gets started * automatically. Calling <tt>start()</tt> starts the scheduler if it's * suspended or stopped else has no effect. Once <tt>stop()</tt> is called, * added tasks will not restart it: <tt>start()</tt> has to be called to * restart the scheduler. */public class TimeScheduler {    /**     * The interface that submitted tasks must implement     */    public interface Task {        /**         * @return true if task is cancelled and shouldn't be scheduled         *         again         */        boolean cancelled();        /**         * @return the next schedule interval         */        long nextInterval();        /**         * Execute the task         */        void run();    }    public interface CancellableTask extends Task {        /**         * Cancels the task. After calling this, {@link #cancelled()} return true. If the task was already cancelled,         * this is a no-op         */        void cancel();    }    /**     * Internal task class.     */    private static class IntTask implements Comparable {        /**         * The user task         */        public final Task task;        /**         * The next execution time         */        public long sched;        /**         * Whether this task is scheduled fixed-delay or fixed-rate         */        public final boolean relative;        /**         * @param task     the task to schedule & execute         * @param sched    the next schedule         * @param relative whether scheduling for this task is soft or hard         *                 (see <tt>TimeScheduler.add()</tt>)         */        public IntTask(Task task, long sched, boolean relative) {            this.task=task;            this.sched=sched;            this.relative=relative;        }        /**         * @param obj the object to compare against         *            <p/>         *            <pre>         *            If obj is not instance of <tt>IntTask</tt>, then return -1         *            If obj is instance of <tt>IntTask</tt>, compare the         *            contained tasks' next execution times. If these times are equal,         *            then order them randomly <b>but</b> consistently!: return the diff         *            of their <tt>hashcode()</tt> values         *            </pre>         */        public int compareTo(Object obj) {            IntTask other;            if(!(obj instanceof IntTask)) return (-1);            other=(IntTask)obj;            if(sched < other.sched) return (-1);            if(sched > other.sched) return (1);            return (task.hashCode() - other.task.hashCode());        }        public String toString() {            if(task == null)                return "<unnamed>";            else                return task.getClass().getName();        }    }    /**     * The scheduler thread's main loop     */    private class Loop implements Runnable {        public void run() {            try {                _run();            }            catch(Throwable t) {                log.error("exception in loop", t);            }        }    }    /**     * The task queue used by the scheduler. Tasks are ordered in increasing     * order of their next execution time     */    private static class TaskQueue {        /**         * Sorted list of <tt>IntTask</tt>s         */        private final SortedSet set;        public TaskQueue() {            super();            set=new TreeSet();        }        public void add(IntTask t) {            set.add(t);        }        public void remove(IntTask t) {            set.remove(t);        }        public IntTask getFirst() {            return ((IntTask)set.first());        }        public void removeFirst() {            Iterator it=set.iterator();            it.next();            it.remove();        }        public void rescheduleFirst(long sched) {            Iterator it=set.iterator();            IntTask t=(IntTask)it.next();            it.remove();            t.sched=sched;            set.add(t);        }        public boolean isEmpty() {            return (set.isEmpty());        }        public void clear() {            set.clear();        }        public int size() {            return set.size();        }        public String toString() {            return set.toString();        }    }    /**     * Default suspend interval (ms)     */    private static final long SUSPEND_INTERVAL=30000;    /** if it takes more than this to run a task, we emit a warning */    private static final long MAX_EXECUTION_TIME=5000;    /**     * Thread is running     * <p/>     * A call to <code>start()</code> has no effect on the thread<br>     * A call to <code>stop()</code> will stop the thread<br>     * A call to <code>add()</code> has no effect on the thread     */    private static final int RUN=0;    /**     * Thread is suspended     * <p/>     * A call to <code>start()</code> will recreate the thread<br>     * A call to <code>stop()</code> will switch the state from suspended     * to stopped<br>     * A call to <code>add()</code> will recreate the thread <b>only</b>     * if it is suspended     */    private static final int SUSPEND=1;    /**     * A shutdown of the thread is in progress     * <p/>     * A call to <code>start()</code> has no effect on the thread<br>     * A call to <code>stop()</code> has no effect on the thread<br>     * A call to <code>add()</code> has no effect on the thread<br>     */    private static final int STOPPING=2;    /**     * Thread is stopped     * <p/>     * A call to <code>start()</code> will recreate the thread<br>     * A call to <code>stop()</code> has no effect on the thread<br>     * A call to <code>add()</code> has no effect on the thread<br>     */    private static final int STOP=3;    /**     * TimeScheduler thread name     */    private static final String THREAD_NAME="TimeScheduler.Thread";    /**     * The scheduler thread     */    private Thread thread=null;    /**     * The thread's running state     */    private int thread_state=SUSPEND;    /**     * Time that task queue is empty before suspending the scheduling     * thread     */    private long suspend_interval=SUSPEND_INTERVAL;    /**     * The task queue ordered according to task's next execution time     */    private final TaskQueue queue;    protected static final Log log=LogFactory.getLog(TimeScheduler.class);    /**

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?