actor.scala
来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 874 行 · 第 1/2 页
SCALA
874 行
/* __ *\** ________ ___ / / ___ Scala API **** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL **** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **** /____/\___/_/ |_/____/_/ | | **** |/ **\* */// $Id: Actor.scala 14557 2008-04-09 09:29:54Z phaller $package scala.actorsimport scala.collection.mutable.{HashSet, Queue}import scala.compat.Platform/** * The <code>Actor</code> object provides functions for the definition of * actors, as well as actor operations, such as * <code>receive</code>, <code>react</code>, <code>reply</code>, * etc. * * @version 0.9.14 * @author Philipp Haller */object Actor { private[actors] val tl = new ThreadLocal[Actor] /** * Returns the currently executing actor. Should be used instead * of <code>this</code> in all blocks of code executed by * actors. * * @return returns the currently executing actor. */ def self: Actor = { var a = tl.get.asInstanceOf[Actor] if (null eq a) { a = new ActorProxy(currentThread) tl.set(a) } a } /** * Resets an actor proxy associated with the current thread. * It replaces the implicit <code>ActorProxy</code> instance * of the current thread (if any) with a new instance. * * This permits to re-use the current thread as an actor * even if its <code>ActorProxy</code> has died for some reason. */ def resetProxy { val a = tl.get.asInstanceOf[Actor] if ((null ne a) && a.isInstanceOf[ActorProxy]) tl.set(new ActorProxy(currentThread)) } /** * Removes any reference to an <code>Actor</code> instance * currently stored in thread-local storage. * * This allows to release references from threads that are * potentially long-running or being re-used (e.g. inside * a thread pool). Permanent references in thread-local storage * are a potential memory leak. */ def clearSelf { tl.set(null) } /** * <p>This function is used for the definition of actors.</p> * <p>The following example demonstrates its usage:</p><pre> * import scala.actors.Actor._ * ... * val a = actor { * ... * } * </pre> * * @param body the code block to be executed by the newly created actor * @return the newly created actor. Note that it is automatically started. */ def actor(body: => Unit): Actor = { val actor = new Actor { def act() = body } actor.start() actor } /** * Receives the next message from the mailbox of the current actor * <code>self</code>. */ def ? : Any = self.? /** * Receives a message from the mailbox of * <code>self</code>. Blocks if no message matching any of the * cases of <code>f</code> can be received. * * @param f a partial function specifying patterns and actions * @return the result of processing the received message */ def receive[A](f: PartialFunction[Any, A]): A = self.receive(f) /** * Receives a message from the mailbox of * <code>self</code>. Blocks at most <code>msec</code> * milliseconds if no message matching any of the cases of * <code>f</code> can be received. If no message could be * received the <code>TIMEOUT</code> action is executed if * specified. * * @param msec the time span before timeout * @param f a partial function specifying patterns and actions * @return the result of processing the received message */ def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = self.receiveWithin(msec)(f) /** * Lightweight variant of <code>receive</code>. * * Actions in <code>f</code> have to contain the rest of the * computation of <code>self</code>, as this method will never * return. * * @param f a partial function specifying patterns and actions * @return this function never returns */ def react(f: PartialFunction[Any, Unit]): Nothing = self.react(f) /** * Lightweight variant of <code>receiveWithin</code>. * * Actions in <code>f</code> have to contain the rest of the * computation of <code>self</code>, as this method will never * return. * * @param msec the time span before timeout * @param f a partial function specifying patterns and actions * @return this function never returns */ def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = self.reactWithin(msec)(f) def eventloop(f: PartialFunction[Any, Unit]): Nothing = self.react(new RecursiveProxyHandler(self, f)) private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit]) extends PartialFunction[Any, Unit] { def isDefinedAt(m: Any): Boolean = true // events are immediately removed from the mailbox def apply(m: Any) { if (f.isDefinedAt(m)) f(m) self.react(this) } } /** * Returns the actor which sent the last received message. */ def sender: OutputChannel[Any] = self.sender /** * Send <code>msg</code> to the actor waiting in a call to * <code>!?</code>. */ def reply(msg: Any): Unit = self.reply(msg) /** * Send <code>()</code> to the actor waiting in a call to * <code>!?</code>. */ def reply(): Unit = self.reply(()) /** * Returns the number of messages in <code>self</code>'s mailbox * * @return the number of messages in <code>self</code>'s mailbox */ def mailboxSize: Int = self.mailboxSize private[actors] trait Body[a] { def andThen[b](other: => b): Unit } implicit def mkBody[a](body: => a) = new Body[a] { def andThen[b](other: => b): Unit = self.seq(body, other) } /** * Causes <code>self</code> to repeatedly execute * <code>body</code>. * * @param body the code block to be executed */ def loop(body: => Unit): Unit = body andThen loop(body) /** * Causes <code>self</code> to repeatedly execute * <code>body</code> while the condition * <code>cond</code> is <code>true</code>. * * @param cond the condition to test * @param body the code block to be executed */ def loopWhile(cond: => Boolean)(body: => Unit): Unit = if (cond) { body andThen loopWhile(cond)(body) } else continue /** * Links <code>self</code> to actor <code>to</code>. * * @param to the actor to link to * @return */ def link(to: Actor): Actor = self.link(to) /** * Links <code>self</code> to actor defined by <code>body</code>. * * @param body ... * @return ... */ def link(body: => Unit): Actor = self.link(body) /** * Unlinks <code>self</code> from actor <code>from</code>. * * @param from the actor to unlink from */ def unlink(from: Actor): Unit = self.unlink(from) /** * <p> * Terminates execution of <code>self</code> with the following * effect on linked actors: * </p> * <p> * For each linked actor <code>a</code> with * <code>trapExit</code> set to <code>true</code>, send message * <code>Exit(self, reason)</code> to <code>a</code>. * </p> * <p> * For each linked actor <code>a</code> with * <code>trapExit</code> set to <code>false</code> (default), * call <code>a.exit(reason)</code> if * <code>reason != 'normal</code>. * </p> */ def exit(reason: AnyRef): Nothing = self.exit(reason) /** * <p> * Terminates execution of <code>self</code> with the following * effect on linked actors: * </p> * <p> * For each linked actor <code>a</code> with * <code>trapExit</code> set to <code>true</code>, send message * <code>Exit(self, 'normal)</code> to <code>a</code>. * </p> */ def exit(): Nothing = self.exit() def continue: Unit = throw new KillActorException}/** * <p> * This class provides an implementation of event-based actors. * The main ideas of our approach are explained in the two papers * </p> * <ul> * <li> * <a href="http://lampwww.epfl.ch/~odersky/papers/jmlc06.pdf"> * <span style="font-weight:bold; white-space:nowrap;">Event-Based * Programming without Inversion of Control</span></a>,<br/> * Philipp Haller and Martin Odersky, <i>Proc. JMLC 2006</i>, and * </li> * <li> * <a href="http://lamp.epfl.ch/~phaller/doc/haller07coord.pdf"> * <span style="font-weight:bold; white-space:nowrap;">Actors that * Unify Threads and Events</span></a>,<br/> * Philipp Haller and Martin Odersky, <i>Proc. COORDINATION 2007</i>. * </li> * </ul> * * @version 0.9.14 * @author Philipp Haller */@serializabletrait Actor extends OutputChannel[Any] { private var received: Option[Any] = None private val waitingForNone = (m: Any) => false private var waitingFor: Any => Boolean = waitingForNone private var isSuspended = false private val mailbox = new MessageQueue private var sessions: List[OutputChannel[Any]] = Nil /** * Returns the number of messages in this actor's mailbox * * @return the number of messages in this actor's mailbox */ def mailboxSize: Int = synchronized { mailbox.size } /** * Sends <code>msg</code> to this actor (asynchronous) supplying * explicit reply destination. * * @param msg the message to send * @param replyTo the reply destination */ def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { tick() if (waitingFor(msg)) { received = Some(msg) if (isSuspended) sessions = replyTo :: sessions else sessions = List(replyTo) waitingFor = waitingForNone if (timeoutPending) { timeoutPending = false TimerThread.trashRequest(this) } if (isSuspended) resumeActor() else // assert continuation != null Scheduler.execute(new Reaction(this, continuation, msg)) } else { mailbox.append(msg, replyTo) } } /** * Receives a message from this actor's mailbox. * * @param f a partial function with message patterns and actions * @return result of processing the received value */ def receive[R](f: PartialFunction[Any, R]): R = { assert(Actor.self == this, "receive from channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt isSuspended = true suspendActor() } else { received = Some(qel.msg) sessions = qel.session :: sessions } waitingFor = waitingForNone isSuspended = false } val result = f(received.get) sessions = sessions.tail result } /** * Receives a message from this actor's mailbox within a certain * time span. * * @param msec the time span before timeout * @param f a partial function with message patterns and actions * @return result of processing the received value */ def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { assert(Actor.self == this, "receive from channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { tick() // first, remove spurious TIMEOUT message from mailbox if any val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { if (msec == 0) { if (f.isDefinedAt(TIMEOUT)) return f(TIMEOUT) else error("unhandled timeout") } else { waitingFor = f.isDefinedAt isSuspended = true received = None suspendActorFor(msec) if (received.isEmpty) { if (f.isDefinedAt(TIMEOUT)) { waitingFor = waitingForNone isSuspended = false val result = f(TIMEOUT) return result } else error("unhandled timeout") } } } else { received = Some(qel.msg) sessions = qel.session :: sessions } waitingFor = waitingForNone isSuspended = false } val result = f(received.get) sessions = sessions.tail result } /** * Receives a message from this actor's mailbox. * <p> * This method never returns. Therefore, the rest of the computation * has to be contained in the actions of the partial function. *
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?