fjtaskscheduler2.scala
来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 219 行
SCALA
219 行
/* __ *\** ________ ___ / / ___ Scala API **** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL **** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **** /____/\___/_/ |_/____/_/ | | **** |/ **\* */// $Id: FJTaskScheduler2.scala 14416 2008-03-19 01:17:25Z mihaylov $package scala.actorsimport compat.Platformimport java.lang.{Runnable, Thread, InterruptedException, System, Runtime}import scala.collection.Setimport scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}import java.lang.ref.{WeakReference, ReferenceQueue}/** * FJTaskScheduler2 * * @version 0.9.12 * @author Philipp Haller */class FJTaskScheduler2 extends Thread with IScheduler { // as long as this thread runs, JVM should not exit setDaemon(false) var printStats = false val rt = Runtime.getRuntime() val minNumThreads = 4 val coreProp = try { System.getProperty("actors.corePoolSize") } catch { case ace: java.security.AccessControlException => null } val maxProp = try { System.getProperty("actors.maxPoolSize") } catch { case ace: java.security.AccessControlException => null } val initCoreSize = if (null ne coreProp) Integer.parseInt(coreProp) else { val numCores = rt.availableProcessors() if (2 * numCores > minNumThreads) 2 * numCores else minNumThreads } val maxSize = if (null ne maxProp) Integer.parseInt(maxProp) else 256 private var coreSize = initCoreSize private val executor = new FJTaskRunnerGroup(coreSize) private var terminating = false private var suspending = false private var lastActivity = Platform.currentTime private var submittedTasks = 0 private var pendingReactions = 0 def pendReaction: Unit = synchronized { pendingReactions += 1 } def unPendReaction: Unit = synchronized { pendingReactions -= 1 } def getPendingCount = synchronized { pendingReactions } def setPendingCount(cnt: Int) = synchronized { pendingReactions = cnt } def printActorDump {} def terminated(a: Actor) {} private val TICK_FREQ = 50 private val CHECK_FREQ = 100 def onLockup(handler: () => Unit) = lockupHandler = handler def onLockup(millis: Int)(handler: () => Unit) = { //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ lockupHandler = handler } private var lockupHandler: () => Unit = null override def run() { try { while (!terminating) { this.synchronized { try { wait(CHECK_FREQ) } catch { case _: InterruptedException => if (terminating) throw new QuitException } if (!suspending) { // check for unreachable actors def drainRefQ() { val wr = refQ.poll if (wr != null) { unPendReaction // continue draining drainRefQ() } } drainRefQ() // check if we need more threads if (Platform.currentTime - lastActivity >= TICK_FREQ && coreSize < maxSize && executor.checkPoolSize()) { //Debug.info(this+": increasing thread pool size") coreSize += 1 lastActivity = Platform.currentTime } else { if (pendingReactions <= 0) { // if all worker threads idle terminate if (executor.getActiveCount() == 0) { Debug.info(this+": initiating shutdown...") // Note that we don't have to shutdown // the FJTaskRunnerGroup since there is // no separate thread associated with it, // and FJTaskRunner threads have daemon status. // terminate timer thread TimerThread.shutdown() throw new QuitException } } } } } // sync } // while (!terminating) } catch { case _: QuitException => // allow thread to exit if (printStats) executor.stats() } } /** * @param item the task to be executed. */ def execute(task: Runnable) { executor.execute(task) } private val refQ = new ReferenceQueue[Actor] private var storedRefs: List[WeakReference[Actor]] = List() def start(task: Runnable) { if (task.isInstanceOf[Reaction]) { val reaction = task.asInstanceOf[Reaction] val wr = new WeakReference[Actor](reaction.a, refQ) //Debug.info("created "+wr+" pointing to "+reaction.a) storedRefs = wr :: storedRefs } pendReaction executor.execute(task) } /** * @param worker the worker thread executing tasks * @return the executed task */ def getTask(worker: WorkerThread) = null /** * @param a the actor */ def tick(a: Actor) { lastActivity = Platform.currentTime } /** Shuts down all idle worker threads. */ def shutdown(): Unit = synchronized { terminating = true // terminate timer thread TimerThread.shutdown() } def snapshot(): LinkedQueue = { suspending = true executor.snapshot() }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?