scheduler.scala
来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 305 行
SCALA
305 行
/* __ *\** ________ ___ / / ___ Scala API **** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL **** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **** /____/\___/_/ |_/____/_/ | | **** |/ **\* */// $Id: Scheduler.scala 13978 2008-02-13 17:32:01Z phaller $package scala.actorsimport compat.Platformimport java.lang.{Runnable, Thread, InterruptedException}import scala.collection.Setimport scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}/** * The <code>Scheduler</code> object is used by * <code>Actor</code> to execute tasks of an execution of an actor. * * @version 0.9.10 * @author Philipp Haller */object Scheduler { private var sched: IScheduler = { var s: IScheduler = new FJTaskScheduler2 s.start() s } def impl = sched def impl_= (scheduler: IScheduler) = { sched = scheduler sched.start() } private var tasks: LinkedQueue = null private var pendingCount = 0 def snapshot(): Unit = { tasks = sched.snapshot() pendingCount = sched.asInstanceOf[FJTaskScheduler2].getPendingCount sched.shutdown() } def restart(): Unit = synchronized { sched = { var s: IScheduler = new FJTaskScheduler2 s.asInstanceOf[FJTaskScheduler2].setPendingCount(pendingCount) s.start() s } TimerThread.restart() while (!tasks.isEmpty()) { sched.execute(tasks.take().asInstanceOf[FJTask]) } tasks = null } def start(task: Runnable) = sched.start(task) def execute(task: Runnable) = { val t = currentThread if (t.isInstanceOf[FJTaskRunner]) { val tr = t.asInstanceOf[FJTaskRunner] tr.push(new FJTask { def run() { task.run() } }) } else sched.execute(task) } def tick(a: Actor) = sched.tick(a) def terminated(a: Actor) = sched.terminated(a) def pendReaction: Unit = sched.pendReaction private val termHandlers = new HashMap[Actor, () => Unit] def onTerminate(a: Actor)(f: => Unit) { termHandlers += (a -> (() => f)) } def unPendReaction(a: Actor) = synchronized { // execute registered termination handler (if any) termHandlers.get(a) match { case Some(handler) => handler() // remove mapping termHandlers -= a case None => // do nothing } // notify scheduler sched.unPendReaction } def shutdown() = sched.shutdown() def onLockup(handler: () => Unit) = sched.onLockup(handler) def onLockup(millis: Int)(handler: () => Unit) = sched.onLockup(millis)(handler) def printActorDump = sched.printActorDump}/** * This abstract class provides a common interface for all * schedulers used to execute actor tasks. * * @version 0.9.8 * @author Philipp Haller */trait IScheduler { def start(): Unit def start(task: Runnable): Unit def execute(task: Runnable): Unit def getTask(worker: WorkerThread): Runnable def tick(a: Actor): Unit def terminated(a: Actor): Unit def pendReaction: Unit def unPendReaction: Unit def snapshot(): LinkedQueue def shutdown(): Unit def onLockup(handler: () => Unit): Unit def onLockup(millis: Int)(handler: () => Unit): Unit def printActorDump: Unit val QUIT_TASK = new Reaction(null) { override def run(): Unit = {} override def toString() = "QUIT_TASK" }}/** * This scheduler executes the tasks of an actor on a single * thread (the current thread). * * @version 0.9.9 * @author Philipp Haller */class SingleThreadedScheduler extends IScheduler { def start() {} val taskQ = new scala.collection.mutable.Queue[Runnable] def start(task: Runnable) { // execute task immediately on same thread task.run() while (taskQ.length > 0) { val nextTask = taskQ.dequeue nextTask.run() } } def execute(task: Runnable) { val a = Actor.tl.get.asInstanceOf[Actor] if ((null ne a) && a.isInstanceOf[ActorProxy]) { // execute task immediately on same thread task.run() while (taskQ.length > 0) { val nextTask = taskQ.dequeue nextTask.run() } } else { // queue task for later execution taskQ += task } } def getTask(worker: WorkerThread): Runnable = null def tick(a: Actor) {} def terminated(a: Actor) {} def pendReaction {} def unPendReaction {} def shutdown() {} def snapshot(): LinkedQueue = { null } def onLockup(handler: () => Unit) {} def onLockup(millis: Int)(handler: () => Unit) {} def printActorDump {}}/** * The <code>QuickException</code> class is used to manage control flow * of certain schedulers and worker threads. * * @version 0.9.8 * @author Philipp Haller */private[actors] class QuitException extends Throwable { /* For efficiency reasons we do not fill in the execution stack trace. */ override def fillInStackTrace(): Throwable = this}/** * <p> * The class <code>WorkerThread</code> is used by schedulers to execute * actor tasks on multiple threads. * </p> * <p> * !!ACHTUNG: If you change this, make sure you understand the following * proof of deadlock-freedom!! * </p> * <p> * We proof that there is no deadlock between the scheduler and * any worker thread possible. For this, note that the scheduler * only acquires the lock of a worker thread by calling * <code>execute</code>. This method is only called when the worker thread * is in the idle queue of the scheduler. On the other hand, a * worker thread only acquires the lock of the scheduler when it * calls <code>getTask</code>. At the only callsite of <code>getTask</code>, * the worker thread holds its own lock. * </p> * <p> * Thus, deadlock can only occur when a worker thread calls * <code>getTask</code> while it is in the idle queue of the scheduler, * because then the scheduler might call (at any time!) <code>execute</code> * which tries to acquire the lock of the worker thread. In such * a situation the worker thread would be waiting to acquire the * lock of the scheduler and vice versa. * </p> * <p> * Therefore, to prove deadlock-freedom, it suffices to ensure * that a worker thread will never call <code>getTask</code> when * it is in the idle queue of the scheduler. * </p> * <p> * A worker thread enters the idle queue of the scheduler when * <code>getTask</code> returns <code>null</code>. Then it will also stay * in the while-loop W (<code>while (task eq null)</code>) until * <code>task</code> becomes non-null. The only way this can happen is * through a call of <code>execute</code> by the scheduler. Before every * call of <code>execute</code> the worker thread is removed from the idle * queue of the scheduler. Only then--after executing its task-- * the worker thread may call <code>getTask</code>. However, the scheduler * is unable to call <code>execute</code> as the worker thread is not in * the idle queue any more. In fact, the scheduler made sure * that this is the case even _before_ calling <code>execute</code> and * thus releasing the worker thread from the while-loop W. Thus, * the property holds for every possible interleaving of thread * execution. QED * </p> * * @version 0.9.8 * @author Philipp Haller */class WorkerThread(sched: IScheduler) extends Thread { private var task: Runnable = null private[actors] var running = true def execute(r: Runnable) = synchronized { task = r notify() } override def run(): Unit = try { while (running) { if (task ne null) { try { task.run() } catch { case consumed: InterruptedException => if (!running) throw new QuitException } } this.synchronized { task = sched.getTask(this) while (task eq null) { try { wait() } catch { case consumed: InterruptedException => if (!running) throw new QuitException } } if (task == sched.QUIT_TASK) { running = false } } } } catch { case consumed: QuitException => // allow thread to quit }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?