timescheduler.java
来自「JGRoups源码」· Java 代码 · 共 564 行 · 第 1/2 页
JAVA
564 行
* Set the thread state to running, create and start the thread */ private void _start() { thread_state=RUN; // only start if not yet running if(thread == null || !thread.isAlive()) { thread=new Thread(Util.getGlobalThreadGroup(), new Loop(), THREAD_NAME); thread.setDaemon(true); thread.start(); } } /** * Restart the suspended thread */ private void _unsuspend() { thread_state=RUN; // only start if not yet running if(thread == null || !thread.isAlive()) { thread=new Thread(Util.getGlobalThreadGroup(), new Loop(), THREAD_NAME); thread.setDaemon(true); thread.start(); } } /** * Set the thread state to suspended */ private void _suspend() { thread_state=SUSPEND; thread=null; } /** * Set the thread state to stopping */ private void _stopping() { thread_state=STOPPING; } /** * Set the thread state to stopped */ private void _stop() { thread_state=STOP; thread=null; } /** * If the task queue is empty, sleep until a task comes in or if slept * for too long, suspend the thread. * <p/> * Get the first task, if the running time hasn't been * reached then wait a bit and retry. Else reschedule the task and then * run it. */ private void _run() { IntTask intTask; Task task; long currTime, execTime, waitTime, intervalTime, schedTime; while(true) { synchronized(this) { if(thread == null || thread.isInterrupted()) return; } synchronized(queue) { while(true) { if(!queue.isEmpty()) break; try { queue.wait(suspend_interval); } catch(InterruptedException ex) { return; } if(!queue.isEmpty()) break; _suspend(); return; } intTask=queue.getFirst(); synchronized(intTask) { task=intTask.task; if(task.cancelled()) { queue.removeFirst(); continue; } currTime=System.currentTimeMillis(); execTime=intTask.sched; if((waitTime=execTime - currTime) <= 0) { // Reschedule the task intervalTime=task.nextInterval(); schedTime=intTask.relative ? currTime + intervalTime : execTime + intervalTime; queue.rescheduleFirst(schedTime); } } if(waitTime > 0) { //try { queue.wait(Math.min(waitTime, TICK_INTERVAL)); try { queue.wait(waitTime); } catch(InterruptedException ex) { return; } continue; } } long start=System.currentTimeMillis(), stop, diff; try { if(log.isDebugEnabled()) log.debug("Running task " + task); task.run(); stop=System.currentTimeMillis(); diff=stop-start; if(diff >= MAX_EXECUTION_TIME) { if(log.isWarnEnabled()) log.warn("task " + task + " took " + diff + "ms to execute, " + "please check why it is taking so long. It is delaying other tasks"); } } catch(Throwable ex) { log.error("failed running task " + task, ex); } } } /** * Create a scheduler that executes tasks in dynamically adjustable * intervals * * @param suspend_interval the time that the scheduler will wait for * at least one task to be placed in the task queue before suspending * the scheduling thread */ public TimeScheduler(long suspend_interval) { super(); queue=new TaskQueue(); this.suspend_interval=suspend_interval; } /** * Create a scheduler that executes tasks in dynamically adjustable * intervals */ public TimeScheduler() { this(SUSPEND_INTERVAL); } public void setSuspendInterval(long s) { this.suspend_interval=s; } public long getSuspendInterval() { return suspend_interval; } public String dumpTaskQueue() { return queue != null? queue.toString() : "<empty>"; } /** * Add a task for execution at adjustable intervals * * @param t the task to execute * @param relative scheduling scheme: * <p/> * <tt>true</tt>:<br> * Task is rescheduled relative to the last time it <i>actually</i> * started execution * <p/> * <tt>false</tt>:<br> * Task is scheduled relative to its <i>last</i> execution schedule. This * has the effect that the time between two consecutive executions of * the task remains the same. */ public void add(Task t, boolean relative) { long interval, sched; if((interval=t.nextInterval()) < 0) return; sched=System.currentTimeMillis() + interval; synchronized(queue) { queue.add(new IntTask(t, sched, relative)); switch(thread_state) { case RUN: queue.notifyAll(); break; case SUSPEND: _unsuspend(); break; case STOPPING: break; case STOP: break; } } } /** * Add a task for execution at adjustable intervals * * @param t the task to execute */ public void add(Task t) { add(t, true); } /** * Answers the number of tasks currently in the queue. * @return The number of tasks currently in the queue. */ public int size() { return queue.size(); } /** * Start the scheduler, if it's suspended or stopped */ public void start() { synchronized(queue) { switch(thread_state) { case RUN: break; case SUSPEND: _unsuspend(); break; case STOPPING: break; case STOP: _start(); break; } } } /** * Stop the scheduler if it's running. Switch to stopped, if it's * suspended. Clear the task queue. * * @throws InterruptedException if interrupted while waiting for thread * to return */ public void stop() throws InterruptedException {// i. Switch to STOPPING, interrupt thread// ii. Wait until thread ends// iii. Clear the task queue, switch to STOPPED, synchronized(queue) { switch(thread_state) { case RUN: _stopping(); break; case SUSPEND: _stop(); return; case STOPPING: return; case STOP: return; } thread.interrupt(); } thread.join(); synchronized(queue) { queue.clear(); _stop(); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?