timescheduler.java

来自「JGRoups源码」· Java 代码 · 共 564 行 · 第 1/2 页

JAVA
564
字号
     * Set the thread state to running, create and start the thread     */    private void _start() {        thread_state=RUN;        // only start if not yet running        if(thread == null || !thread.isAlive()) {            thread=new Thread(Util.getGlobalThreadGroup(), new Loop(), THREAD_NAME);            thread.setDaemon(true);            thread.start();        }    }    /**     * Restart the suspended thread     */    private void _unsuspend() {        thread_state=RUN;        // only start if not yet running        if(thread == null || !thread.isAlive()) {            thread=new Thread(Util.getGlobalThreadGroup(), new Loop(), THREAD_NAME);            thread.setDaemon(true);            thread.start();        }    }    /**     * Set the thread state to suspended     */    private void _suspend() {        thread_state=SUSPEND;        thread=null;    }    /**     * Set the thread state to stopping     */    private void _stopping() {        thread_state=STOPPING;    }    /**     * Set the thread state to stopped     */    private void _stop() {        thread_state=STOP;        thread=null;    }    /**     * If the task queue is empty, sleep until a task comes in or if slept     * for too long, suspend the thread.     * <p/>     * Get the first task, if the running time hasn't been     * reached then wait a bit and retry. Else reschedule the task and then     * run it.     */    private void _run() {        IntTask intTask;        Task task;        long currTime, execTime, waitTime, intervalTime, schedTime;        while(true) {            synchronized(this) {                if(thread == null || thread.isInterrupted()) return;            }            synchronized(queue) {                while(true) {                    if(!queue.isEmpty()) break;                    try {                        queue.wait(suspend_interval);                    }                    catch(InterruptedException ex) {                        return;                    }                    if(!queue.isEmpty()) break;                    _suspend();                    return;                }                intTask=queue.getFirst();                synchronized(intTask) {                    task=intTask.task;                    if(task.cancelled()) {                        queue.removeFirst();                        continue;                    }                    currTime=System.currentTimeMillis();                    execTime=intTask.sched;                    if((waitTime=execTime - currTime) <= 0) {                        // Reschedule the task                        intervalTime=task.nextInterval();                        schedTime=intTask.relative ?                                currTime + intervalTime : execTime + intervalTime;                        queue.rescheduleFirst(schedTime);                    }                }                if(waitTime > 0) {                    //try { queue.wait(Math.min(waitTime, TICK_INTERVAL));                    try {                        queue.wait(waitTime);                    }                    catch(InterruptedException ex) {                        return;                    }                    continue;                }            }            long start=System.currentTimeMillis(), stop, diff;            try {                if(log.isDebugEnabled())                   log.debug("Running task " + task);                                task.run();                stop=System.currentTimeMillis();                diff=stop-start;                if(diff >= MAX_EXECUTION_TIME) {                    if(log.isWarnEnabled())                        log.warn("task " + task + " took " + diff + "ms to execute, " +                                "please check why it is taking so long. It is delaying other tasks");                }            }            catch(Throwable ex) {                log.error("failed running task " + task, ex);            }        }    }    /**     * Create a scheduler that executes tasks in dynamically adjustable     * intervals     *     * @param suspend_interval the time that the scheduler will wait for     *                         at least one task to be placed in the task queue before suspending     *                         the scheduling thread     */    public TimeScheduler(long suspend_interval) {        super();        queue=new TaskQueue();        this.suspend_interval=suspend_interval;    }    /**     * Create a scheduler that executes tasks in dynamically adjustable     * intervals     */    public TimeScheduler() {        this(SUSPEND_INTERVAL);    }    public void setSuspendInterval(long s) {        this.suspend_interval=s;    }    public long getSuspendInterval() {        return suspend_interval;    }    public String dumpTaskQueue() {        return queue != null? queue.toString() : "<empty>";    }    /**     * Add a task for execution at adjustable intervals     *     * @param t        the task to execute     * @param relative scheduling scheme:     *                 <p/>     *                 <tt>true</tt>:<br>     *                 Task is rescheduled relative to the last time it <i>actually</i>     *                 started execution     *                 <p/>     *                 <tt>false</tt>:<br>     *                 Task is scheduled relative to its <i>last</i> execution schedule. This     *                 has the effect that the time between two consecutive executions of     *                 the task remains the same.     */    public void add(Task t, boolean relative) {        long interval, sched;        if((interval=t.nextInterval()) < 0) return;        sched=System.currentTimeMillis() + interval;        synchronized(queue) {            queue.add(new IntTask(t, sched, relative));            switch(thread_state) {                case RUN:                    queue.notifyAll();                    break;                case SUSPEND:                    _unsuspend();                    break;                case STOPPING:                    break;                case STOP:                    break;            }        }    }    /**     * Add a task for execution at adjustable intervals     *     * @param t the task to execute     */    public void add(Task t) {        add(t, true);    }    /**     * Answers the number of tasks currently in the queue.     * @return The number of tasks currently in the queue.     */    public int size() {        return queue.size();    }    /**     * Start the scheduler, if it's suspended or stopped     */    public void start() {        synchronized(queue) {            switch(thread_state) {                case RUN:                    break;                case SUSPEND:                    _unsuspend();                    break;                case STOPPING:                    break;                case STOP:                    _start();                    break;            }        }    }    /**     * Stop the scheduler if it's running. Switch to stopped, if it's     * suspended. Clear the task queue.     *     * @throws InterruptedException if interrupted while waiting for thread     *                              to return     */    public void stop() throws InterruptedException {// i. Switch to STOPPING, interrupt thread// ii. Wait until thread ends// iii. Clear the task queue, switch to STOPPED,        synchronized(queue) {            switch(thread_state) {                case RUN:                    _stopping();                    break;                case SUSPEND:                    _stop();                    return;                case STOPPING:                    return;                case STOP:                    return;            }            thread.interrupt();        }        thread.join();        synchronized(queue) {            queue.clear();            _stop();        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?