scheduler.java

来自「JGRoups源码」· Java 代码 · 共 246 行

JAVA
246
字号
// $Id: Scheduler.java,v 1.13 2006/09/11 14:09:51 belaban Exp $package org.jgroups.util;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Global;/** * Implementation of a priority scheduler. The scheduler maintains a queue to the end of which * all tasks are added. It continually looks at the first queue element, assigns a thread to * it, runs the thread and waits for completion. When a new <em>priority task</em> is added, * it will be added to the head of the queue and the scheduler will be interrupted. In this * case, the currently handled task is suspended, and the one at the head of the queue * handled. This is recursive: a priority task can always be interrupted by another priority * task.  Resursion ends when no more priority tasks are added, or when the thread pool is * exhausted. *  * @author Bela Ban */public class Scheduler implements Runnable {    final Queue              queue=new Queue();    Thread             sched_thread=null;    Task               current_task=null;    ThreadPool         pool=null;    SchedulerListener  listener=null;    protected static final Log log=LogFactory.getLog(Scheduler.class);    /** Process items on the queue concurrently. The default is to wait until the processing of an item     * has completed before fetching the next item from the queue. Note that setting this to true     * may destroy the properties of a protocol stack, e.g total or causal order may not be     * guaranteed. Set this to true only if you know what you're doing ! */    boolean            concurrent_processing=false;    /** max number of threads, will only be allocated when needed */    int                NUM_THREADS=128;    static final int          WAIT_FOR_THREAD_AVAILABILITY=3000;    static final int          THREAD_JOIN_TIMEOUT=1000;    public Scheduler() {        String tmp=Util.getProperty(new String[]{Global.SCHEDULER_MAX_THREADS}, null, null, false, "128");        this.NUM_THREADS=Integer.parseInt(tmp);    }    public Scheduler(int num_threads) {        this.NUM_THREADS=num_threads;    }    public void setListener(SchedulerListener l) {        listener=l;    }    public boolean getConcurrentProcessing() {        return concurrent_processing;    }    public void setConcurrentProcessing(boolean process_concurrently) {        this.concurrent_processing=process_concurrently;    }    public void run() {        while(sched_thread != null) {            if(queue.closed()) break;            try {                current_task=(Task)queue.peek(); // get the first task in the queue (blocks until available)                if(current_task == null) { // @remove                    if(log.isWarnEnabled()) log.warn("current task is null, queue.size()=" + queue.size() +                            ", queue.closed()=" + queue.closed() + ", continuing");                    continue;                }                if(current_task.suspended) {                    current_task.suspended=false;                    current_task.thread.resume();                    if(listener != null) listener.resumed(current_task.target);                }                else {                    if(current_task.thread == null) {                        current_task.thread=pool.getThread();                        if(current_task.thread == null) { // thread pool exhausted                            if(log.isWarnEnabled()) log.warn("thread pool exhausted, waiting for " +                                    WAIT_FOR_THREAD_AVAILABILITY + "ms before retrying");                            Util.sleep(WAIT_FOR_THREAD_AVAILABILITY);                            continue;                        }                    }                    // if we get here, current_task.thread and current_task.target are guaranteed to be non-null                    if(listener != null) listener.started(current_task.target);                    if(current_task.thread.assignTask(current_task.target) == false)                        continue;                }                if(sched_thread.isInterrupted()) { // will continue at "catch(InterruptedException)" below                    // sched_thread.interrupt();                    // changed on suggestion from Victor Cardoso: sched_thread.interrupt() does *not* throw an                    // InterruptedException, so we don't land in the catch clause, but rather execute the code below                    // (which we don't want) - bela April 15 2004                    throw new InterruptedException();                }                if(concurrent_processing == false) { // this is the default: process serially                    synchronized(current_task.thread) {                        while(!current_task.thread.done() && !current_task.thread.suspended)                            current_task.thread.wait();                    }                    if(listener != null) listener.stopped(current_task.target);                }                queue.removeElement(current_task);            }            catch(InterruptedException interrupted) {                if(sched_thread == null || queue.closed()) break;                if(current_task.thread != null) {                    current_task.thread.suspend();                    if(listener != null) listener.suspended(current_task.target);                    current_task.suspended=true;                }                Thread.interrupted(); // clears the interrupt-flag            }            catch(QueueClosedException closed_ex) {                return;            }            catch(Throwable ex) {                if(log.isErrorEnabled()) log.error("exception=" + Util.print(ex));            }        }         if(log.isTraceEnabled()) log.trace("scheduler thread terminated");    }    public void addPrio(Runnable task) {        Task new_task=new Task(task);        boolean do_interrupt=false;        try {            synchronized(queue) { // sync against add()                if(queue.size() == 0)                    queue.add(new_task);                else {                    queue.addAtHead(new_task);                    do_interrupt=true;                }            }            if(do_interrupt) // moved out of 'synchronized(queue)' to minimize lock contention                sched_thread.interrupt();        }        catch(Throwable e) {            if(log.isErrorEnabled()) log.error("exception=" + e);        }    }    public void add(Runnable task) {        Task new_task=new Task(task);        try {            synchronized(queue) { // sync against addPrio()                queue.add(new_task);            }        }        catch(Exception e) {            if(log.isErrorEnabled()) log.error("exception=" + e);        }    }    public void start() {        if(queue.closed())            queue.reset();        if(sched_thread == null) {            pool=new ThreadPool(NUM_THREADS);            sched_thread=new Thread(this, "Scheduler main thread");            sched_thread.setDaemon(true);            sched_thread.start();        }    }    /**     * Stop the scheduler thread. The thread may be waiting for its next task (queue.peek()) or it may be waiting on     * the currently executing thread. In the first case, closing the queue will throw a QueueClosed exception which     * terminates the scheduler thread. In the second case, after closing the queue, we interrupt the scheduler thread,     * which then checks whether the queue is closed. If this is the case, the scheduler thread terminates.     */    public void stop() {        Thread tmp=null;        // 1. Close the queue        queue.close(false); // will stop thread at next peek();        // 2. Interrupt the scheduler thread        if(sched_thread != null && sched_thread.isAlive()) {            tmp=sched_thread;            sched_thread=null;            tmp.interrupt();            try {                tmp.join(THREAD_JOIN_TIMEOUT);            }            catch(Exception ex) {            }            if(tmp.isAlive())                if(log.isErrorEnabled()) log.error("scheduler thread is still not dead  !!!");        }        sched_thread=null;        // 3. Delete the thread pool        if(pool != null) {            pool.destroy();            pool=null;        }    }    public static class Task {        ReusableThread thread=null;        Runnable target=null;        boolean suspended=false;        Task(Runnable target) {            this.target=target;        }        public String toString() {            return "[thread=" + thread + ", target=" + target + ", suspended=" + suspended + ']';        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?