tickedscheduler.scala
来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 178 行
SCALA
178 行
/* __ *\** ________ ___ / / ___ Scala API **** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL **** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **** /____/\___/_/ |_/____/_/ | | **** |/ **\* */// $Id: TickedScheduler.scala 14416 2008-03-19 01:17:25Z mihaylov $package scala.actorsimport java.lang.{Thread, InterruptedException}import scala.collection.Setimport scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue}import scala.compat.Platform/** * <p>This scheduler uses a thread pool to execute tasks that are generated * by the execution of actors.</p> * * @version 0.9.8 * @author Philipp Haller */class TickedScheduler extends Thread with IScheduler { private val tasks = new Queue[Runnable] // Worker threads private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] private val idle = new Queue[WorkerThread] private val ticks = new HashMap[WorkerThread, Long] private var terminating = false private var lastActivity = Platform.currentTime private var pendingReactions = 0 def pendReaction: Unit = synchronized { pendingReactions += 1 } def unPendReaction: Unit = synchronized { pendingReactions -= 1 } def printActorDump {} def start(task: Runnable): Unit = synchronized { pendingReactions += 1 execute(task) } def terminated(a: Actor) {} private var TICK_FREQ = 5 private var CHECK_FREQ = 50 for (i <- List.range(0, 2)) { val worker = new WorkerThread(this) workers += worker worker.start() } def onLockup(handler: () => Unit) { lockupHandler = handler } def onLockup(millis: Int)(handler: () => Unit) { //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ lockupHandler = handler } private var lockupHandler: () => Unit = null override def run() { try { while (!terminating) { this.synchronized { try { wait(CHECK_FREQ) } catch { case _: InterruptedException => if (terminating) throw new QuitException } if (tasks.length > 0) { // check if we need more threads if (Platform.currentTime - lastActivity >= TICK_FREQ) { val newWorker = new WorkerThread(this) workers += newWorker // dequeue item to be processed val item = tasks.dequeue newWorker.execute(item) newWorker.start() } } // tasks.length > 0 else { if (pendingReactions == 0) { // if all worker threads idle terminate if (workers.length == idle.length) { val idleThreads = idle.elements while (idleThreads.hasNext) { val worker = idleThreads.next worker.running = false worker.interrupt() } // terminate timer thread TimerThread.shutdown() throw new QuitException } } } } // sync } // while (!terminating) } catch { case _: QuitException => // allow thread to exit } } /** * @param item the task to be executed. */ def execute(item: Runnable): Unit = synchronized { if (!terminating) { if (idle.length > 0) { val wt = idle.dequeue wt.execute(item) } else tasks += item } } def snapshot(): LinkedQueue = null /** * @param worker the worker thread executing tasks * @return the executed task */ def getTask(worker: WorkerThread) = synchronized { if (terminating) QUIT_TASK if (tasks.length > 0) { val item = tasks.dequeue item } else { idle += worker null } } /** * @param a the actor */ def tick(a: Actor) { lastActivity = Platform.currentTime } /** Shuts down all idle worker threads. */ def shutdown(): Unit = synchronized { terminating = true val idleThreads = idle.elements while (idleThreads.hasNext) { val worker = idleThreads.next worker.running = false worker.interrupt() } // terminate timer thread TimerThread.shutdown() }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?