actor.scala

来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 874 行 · 第 1/2 页

SCALA
874
字号
/*                     __                                               *\**     ________ ___   / /  ___     Scala API                            ****    / __/ __// _ | / /  / _ |    (c) 2005-2007, LAMP/EPFL             ****  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **** /____/\___/_/ |_/____/_/ | |                                         ****                          |/                                          **\*                                                                      */// $Id: Actor.scala 14557 2008-04-09 09:29:54Z phaller $package scala.actorsimport scala.collection.mutable.{HashSet, Queue}import scala.compat.Platform/** * The <code>Actor</code> object provides functions for the definition of * actors, as well as actor operations, such as * <code>receive</code>, <code>react</code>, <code>reply</code>, * etc. * * @version 0.9.14 * @author Philipp Haller */object Actor {  private[actors] val tl = new ThreadLocal[Actor]  /**   * Returns the currently executing actor. Should be used instead   * of <code>this</code> in all blocks of code executed by   * actors.   *   * @return returns the currently executing actor.   */  def self: Actor = {    var a = tl.get.asInstanceOf[Actor]    if (null eq a) {      a = new ActorProxy(currentThread)      tl.set(a)    }    a  }  /**   * Resets an actor proxy associated with the current thread.   * It replaces the implicit <code>ActorProxy</code> instance   * of the current thread (if any) with a new instance.   *   * This permits to re-use the current thread as an actor   * even if its <code>ActorProxy</code> has died for some reason.   */  def resetProxy {    val a = tl.get.asInstanceOf[Actor]    if ((null ne a) && a.isInstanceOf[ActorProxy])      tl.set(new ActorProxy(currentThread))  }  /**   * Removes any reference to an <code>Actor</code> instance   * currently stored in thread-local storage.   *   * This allows to release references from threads that are   * potentially long-running or being re-used (e.g. inside   * a thread pool). Permanent references in thread-local storage   * are a potential memory leak.   */  def clearSelf {    tl.set(null)  }  /**   * <p>This function is used for the definition of actors.</p>   * <p>The following example demonstrates its usage:</p><pre>   * import scala.actors.Actor._   * ...   * val a = actor {   *   ...   * }   * </pre>   *   * @param  body  the code block to be executed by the newly created actor   * @return       the newly created actor. Note that it is automatically started.   */  def actor(body: => Unit): Actor = {    val actor = new Actor {      def act() = body    }    actor.start()    actor  }  /**   * Receives the next message from the mailbox of the current actor   * <code>self</code>.   */  def ? : Any = self.?  /**   * Receives a message from the mailbox of   * <code>self</code>. Blocks if no message matching any of the   * cases of <code>f</code> can be received.   *   * @param  f a partial function specifying patterns and actions   * @return   the result of processing the received message   */  def receive[A](f: PartialFunction[Any, A]): A =    self.receive(f)  /**   * Receives a message from the mailbox of   * <code>self</code>. Blocks at most <code>msec</code>   * milliseconds if no message matching any of the cases of   * <code>f</code> can be received. If no message could be   * received the <code>TIMEOUT</code> action is executed if   * specified.   *   * @param  msec the time span before timeout   * @param  f    a partial function specifying patterns and actions   * @return      the result of processing the received message   */  def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R =    self.receiveWithin(msec)(f)  /**   * Lightweight variant of <code>receive</code>.   *   * Actions in <code>f</code> have to contain the rest of the   * computation of <code>self</code>, as this method will never   * return.   *   * @param  f a partial function specifying patterns and actions   * @return   this function never returns   */  def react(f: PartialFunction[Any, Unit]): Nothing =    self.react(f)  /**   * Lightweight variant of <code>receiveWithin</code>.   *   * Actions in <code>f</code> have to contain the rest of the   * computation of <code>self</code>, as this method will never   * return.   *   * @param  msec the time span before timeout   * @param  f    a partial function specifying patterns and actions   * @return      this function never returns   */  def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing =    self.reactWithin(msec)(f)  def eventloop(f: PartialFunction[Any, Unit]): Nothing =    self.react(new RecursiveProxyHandler(self, f))  private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit])          extends PartialFunction[Any, Unit] {    def isDefinedAt(m: Any): Boolean =      true // events are immediately removed from the mailbox    def apply(m: Any) {      if (f.isDefinedAt(m)) f(m)      self.react(this)    }  }  /**   * Returns the actor which sent the last received message.   */  def sender: OutputChannel[Any] = self.sender  /**   * Send <code>msg</code> to the actor waiting in a call to   * <code>!?</code>.   */  def reply(msg: Any): Unit = self.reply(msg)  /**   * Send <code>()</code> to the actor waiting in a call to   * <code>!?</code>.   */  def reply(): Unit = self.reply(())  /**   * Returns the number of messages in <code>self</code>'s mailbox   *   * @return the number of messages in <code>self</code>'s mailbox   */  def mailboxSize: Int = self.mailboxSize  private[actors] trait Body[a] {    def andThen[b](other: => b): Unit  }  implicit def mkBody[a](body: => a) = new Body[a] {    def andThen[b](other: => b): Unit = self.seq(body, other)  }  /**   * Causes <code>self</code> to repeatedly execute   * <code>body</code>.   *   * @param body the code block to be executed   */  def loop(body: => Unit): Unit = body andThen loop(body)  /**   * Causes <code>self</code> to repeatedly execute   * <code>body</code> while the condition   * <code>cond</code> is <code>true</code>.   *   * @param cond the condition to test   * @param body the code block to be executed   */  def loopWhile(cond: => Boolean)(body: => Unit): Unit =    if (cond) { body andThen loopWhile(cond)(body) }    else continue  /**   * Links <code>self</code> to actor <code>to</code>.   *   * @param  to the actor to link to   * @return    */  def link(to: Actor): Actor = self.link(to)  /**   * Links <code>self</code> to actor defined by <code>body</code>.   *   * @param body ...   * @return     ...   */  def link(body: => Unit): Actor = self.link(body)  /**   * Unlinks <code>self</code> from actor <code>from</code>.   *   * @param from the actor to unlink from   */  def unlink(from: Actor): Unit = self.unlink(from)  /**   * <p>   *   Terminates execution of <code>self</code> with the following   *   effect on linked actors:   * </p>   * <p>   *   For each linked actor <code>a</code> with   *   <code>trapExit</code> set to <code>true</code>, send message   *   <code>Exit(self, reason)</code> to <code>a</code>.   * </p>   * <p>   *   For each linked actor <code>a</code> with   *   <code>trapExit</code> set to <code>false</code> (default),   *   call <code>a.exit(reason)</code> if   *   <code>reason != 'normal</code>.   * </p>   */  def exit(reason: AnyRef): Nothing = self.exit(reason)  /**   * <p>   *   Terminates execution of <code>self</code> with the following   *   effect on linked actors:   * </p>   * <p>   *   For each linked actor <code>a</code> with   *   <code>trapExit</code> set to <code>true</code>, send message   *   <code>Exit(self, 'normal)</code> to <code>a</code>.   * </p>   */  def exit(): Nothing = self.exit()  def continue: Unit = throw new KillActorException}/** * <p> *   This class provides an implementation of event-based actors. *   The main ideas of our approach are explained in the two papers * </p> * <ul> *   <li> *     <a href="http://lampwww.epfl.ch/~odersky/papers/jmlc06.pdf"> *     <span style="font-weight:bold; white-space:nowrap;">Event-Based *     Programming without Inversion of Control</span></a>,<br/> *     Philipp Haller and Martin Odersky, <i>Proc. JMLC 2006</i>, and *   </li> *   <li> *     <a href="http://lamp.epfl.ch/~phaller/doc/haller07coord.pdf"> *     <span style="font-weight:bold; white-space:nowrap;">Actors that *     Unify Threads and Events</span></a>,<br/> *     Philipp Haller and Martin Odersky, <i>Proc. COORDINATION 2007</i>. *   </li> * </ul> * * @version 0.9.14 * @author Philipp Haller */@serializabletrait Actor extends OutputChannel[Any] {  private var received: Option[Any] = None  private val waitingForNone = (m: Any) => false  private var waitingFor: Any => Boolean = waitingForNone  private var isSuspended = false  private val mailbox = new MessageQueue  private var sessions: List[OutputChannel[Any]] = Nil  /**   * Returns the number of messages in this actor's mailbox   *   * @return the number of messages in this actor's mailbox   */  def mailboxSize: Int = synchronized {    mailbox.size  }  /**   * Sends <code>msg</code> to this actor (asynchronous) supplying   * explicit reply destination.   *   * @param  msg      the message to send   * @param  replyTo  the reply destination   */  def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {    tick()    if (waitingFor(msg)) {      received = Some(msg)      if (isSuspended)        sessions = replyTo :: sessions      else        sessions = List(replyTo)      waitingFor = waitingForNone      if (timeoutPending) {        timeoutPending = false        TimerThread.trashRequest(this)      }      if (isSuspended)        resumeActor()      else // assert continuation != null        Scheduler.execute(new Reaction(this, continuation, msg))    } else {      mailbox.append(msg, replyTo)    }  }  /**   * Receives a message from this actor's mailbox.   *   * @param  f    a partial function with message patterns and actions   * @return      result of processing the received value   */  def receive[R](f: PartialFunction[Any, R]): R = {    assert(Actor.self == this, "receive from channel belonging to other actor")    if (shouldExit) exit() // links    this.synchronized {      tick()      val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))      if (null eq qel) {        waitingFor = f.isDefinedAt        isSuspended = true        suspendActor()      } else {        received = Some(qel.msg)        sessions = qel.session :: sessions      }      waitingFor = waitingForNone      isSuspended = false    }    val result = f(received.get)    sessions = sessions.tail    result  }  /**   * Receives a message from this actor's mailbox within a certain   * time span.   *   * @param  msec the time span before timeout   * @param  f    a partial function with message patterns and actions   * @return      result of processing the received value   */  def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {    assert(Actor.self == this, "receive from channel belonging to other actor")    if (shouldExit) exit() // links    this.synchronized {      tick()      // first, remove spurious TIMEOUT message from mailbox if any      val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)      val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))      if (null eq qel) {        if (msec == 0) {          if (f.isDefinedAt(TIMEOUT))            return f(TIMEOUT)          else            error("unhandled timeout")        }        else {          waitingFor = f.isDefinedAt          isSuspended = true          received = None          suspendActorFor(msec)          if (received.isEmpty) {            if (f.isDefinedAt(TIMEOUT)) {              waitingFor = waitingForNone              isSuspended = false              val result = f(TIMEOUT)              return result            }            else              error("unhandled timeout")          }        }      } else {        received = Some(qel.msg)        sessions = qel.session :: sessions      }      waitingFor = waitingForNone      isSuspended = false    }    val result = f(received.get)    sessions = sessions.tail    result  }  /**   * Receives a message from this actor's mailbox.   * <p>   * This method never returns. Therefore, the rest of the computation   * has to be contained in the actions of the partial function.   *

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?