actor.scala

来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 874 行 · 第 1/2 页

SCALA
874
字号
   * @param  f    a partial function with message patterns and actions   */  def react(f: PartialFunction[Any, Unit]): Nothing = {    assert(Actor.self == this, "react on channel belonging to other actor")    if (shouldExit) exit() // links    this.synchronized {      tick()      val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))      if (null eq qel) {        waitingFor = f.isDefinedAt        continuation = f        isDetached = true      } else {        sessions = List(qel.session)        scheduleActor(f, qel.msg)      }      throw new SuspendActorException    }  }  /**   * Receives a message from this actor's mailbox within a certain   * time span.   * <p>   * This method never returns. Therefore, the rest of the computation   * has to be contained in the actions of the partial function.   *   * @param  msec the time span before timeout   * @param  f    a partial function with message patterns and actions   */  def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {    assert(Actor.self == this, "react on channel belonging to other actor")    if (shouldExit) exit() // links    this.synchronized {      tick()      // first, remove spurious TIMEOUT message from mailbox if any      val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)      val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))      if (null eq qel) {        waitingFor = f.isDefinedAt        TimerThread.requestTimeout(this, f, msec)        timeoutPending = true        continuation = f        isDetached = true      } else {        sessions = List(qel.session)        scheduleActor(f, qel.msg)      }      throw new SuspendActorException    }  }  /**   * The behavior of an actor is specified by implementing this   * abstract method. Note that the preferred way to create actors   * is through the <code>actor</code> method   * defined in object <code>Actor</code>.   */  def act(): Unit  /**   * Sends <code>msg</code> to this actor (asynchronous).   */  def !(msg: Any) {    send(msg, Actor.self)  }  /**   * Forwards <code>msg</code> to this actor (asynchronous).   */  def forward(msg: Any) {    send(msg, Actor.sender)  }  /**   * Sends <code>msg</code> to this actor and awaits reply   * (synchronous).   *   * @param  msg the message to be sent   * @return     the reply   */  def !?(msg: Any): Any = {    val replyCh = Actor.self.freshReplyChannel    send(msg, replyCh)    replyCh.receive {      case x => x    }  }  /**   * Sends <code>msg</code> to this actor and awaits reply   * (synchronous) within <code>msec</code> milliseconds.   *   * @param  msec the time span before timeout   * @param  msg  the message to be sent   * @return      <code>None</code> in case of timeout, otherwise   *              <code>Some(x)</code> where <code>x</code> is the reply   */  def !?(msec: Long, msg: Any): Option[Any] = {    val replyCh = Actor.self.freshReplyChannel    send(msg, replyCh)    replyCh.receiveWithin(msec) {      case TIMEOUT => None      case x => Some(x)    }  }  /**   * Sends <code>msg</code> to this actor and immediately   * returns a future representing the reply value.   */  def !!(msg: Any): Future[Any] = {    val ftch = new Channel[Any](Actor.self)    send(msg, ftch)    new Future[Any](ftch) {      def apply() =        if (isSet) value.get        else ch.receive {          case any => value = Some(any); any        }      def isSet = value match {        case None => ch.receiveWithin(0) {          case TIMEOUT => false          case any => value = Some(any); true        }        case Some(_) => true      }    }  }  /**   * Sends <code>msg</code> to this actor and immediately   * returns a future representing the reply value.   * The reply is post-processed using the partial function   * <code>f</code>. This also allows to recover a more   * precise type for the reply value.   */  def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {    val ftch = new Channel[Any](Actor.self)    send(msg, ftch)    new Future[A](ftch) {      def apply() =        if (isSet) value.get        else ch.receive {          case any => value = Some(f(any)); value.get        }      def isSet = value match {        case None => ch.receiveWithin(0) {          case TIMEOUT => false          case any => value = Some(f(any)); true        }        case Some(_) => true      }    }  }  /**   * Replies with <code>msg</code> to the sender.   */  def reply(msg: Any) {    sender ! msg  }  private var rc: Channel[Any] = null  private[actors] def replyChannel = rc  private[actors] def freshReplyChannel: Channel[Any] =    { rc = new Channel[Any](this); rc }  /**   * Receives the next message from this actor's mailbox.   */  def ? : Any = receive {    case x => x  }  def sender: OutputChannel[Any] = sessions.head  def receiver: Actor = this  private var continuation: PartialFunction[Any, Unit] = null  private var timeoutPending = false  // accessed in Reaction  private[actors] var isDetached = false  private var isWaiting = false  // guarded by lock of this  protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =    if ((f eq null) && (continuation eq null)) {      // do nothing (timeout is handled instead)    }    else {      val task = new Reaction(this,                              if (f eq null) continuation else f,                              msg)      Scheduler execute task    }  private def tick(): Unit =    Scheduler tick this  private[actors] var kill: () => Unit = () => {}  private def suspendActor() {    isWaiting = true    while (isWaiting) {      try {        wait()      } catch {        case _: InterruptedException =>      }    }    // links: check if we should exit    if (shouldExit) exit()  }  private def suspendActorFor(msec: Long) {    val ts = Platform.currentTime    var waittime = msec    var fromExc = false    isWaiting = true    while (isWaiting) {      try {        fromExc = false        wait(waittime)      } catch {        case _: InterruptedException => {          fromExc = true          val now = Platform.currentTime          val waited = now-ts          waittime = msec-waited          if (waittime < 0) { isWaiting = false }        }      }      if (!fromExc) { isWaiting = false }    }    // links: check if we should exit    if (shouldExit) exit()  }  private def resumeActor() {    isWaiting = false    notify()  }  /**   * Starts this actor.   */  def start(): Actor = synchronized {    // Reset various flags.    //    // Note that we do *not* reset `trapExit`. The reason is that    // users should be able to set the field in the constructor    // and before `act` is called.    exitReason = 'normal    exiting = false    shouldExit = false    Scheduler start new Reaction(this)    this  }  private def seq[a, b](first: => a, next: => b): Unit = {    val s = Actor.self    val killNext = s.kill    s.kill = () => {      s.kill = killNext      // to avoid stack overflow:      // instead of directly executing `next`,      // schedule as continuation      scheduleActor({ case _ => next }, 1)      throw new SuspendActorException    }    first    throw new KillActorException  }  private[actors] var links: List[Actor] = Nil  /**   * Links <code>self</code> to actor <code>to</code>.   *   * @param to ...   * @return   ...   */  def link(to: Actor): Actor = {    assert(Actor.self == this, "link called on actor different from self")    links = to :: links    to.linkTo(this)    to  }  /**   * Links <code>self</code> to actor defined by <code>body</code>.   */  def link(body: => Unit): Actor = {    val actor = new Actor {      def act() = body    }    link(actor)    actor.start()    actor  }  private[actors] def linkTo(to: Actor) = synchronized {    links = to :: links  }  /**   * Unlinks <code>self</code> from actor <code>from</code>.   */  def unlink(from: Actor) {    assert(Actor.self == this, "unlink called on actor different from self")    links = links.remove(from.==)    from.unlinkFrom(this)  }  private[actors] def unlinkFrom(from: Actor) = synchronized {    links = links.remove(from.==)  }  var trapExit = false  private[actors] var exitReason: AnyRef = 'normal  private[actors] var exiting = false  private[actors] var shouldExit = false  /**   * <p>   *   Terminates execution of <code>self</code> with the following   *   effect on linked actors:   * </p>   * <p>   *   For each linked actor <code>a</code> with   *   <code>trapExit</code> set to <code>true</code>, send message   *   <code>Exit(self, reason)</code> to <code>a</code>.   * </p>   * <p>   *   For each linked actor <code>a</code> with   *   <code>trapExit</code> set to <code>false</code> (default),   *   call <code>a.exit(reason)</code> if   *   <code>reason != 'normal</code>.   * </p>   */  def exit(reason: AnyRef): Nothing = {    exitReason = reason    exit()  }  /**   * Terminates with exit reason <code>'normal</code>.   */  def exit(): Nothing = {    // links    if (!links.isEmpty)      exitLinked()    throw new ExitActorException  }  // Assume !links.isEmpty  private[actors] def exitLinked() {    exiting = true    // remove this from links    links = links.remove(this.==)    // exit linked processes    links.foreach((linked: Actor) => {      unlink(linked)      if (!linked.exiting)        linked.exit(this, exitReason)    })  }  // Assume !links.isEmpty  private[actors] def exitLinked(reason: AnyRef) {    exitReason = reason    exitLinked()  }  // Assume !this.exiting  private[actors] def exit(from: Actor, reason: AnyRef) {    if (trapExit) {      this ! Exit(from, reason)    }    else if (reason != 'normal)      this.synchronized {        shouldExit = true        exitReason = reason        if (isSuspended)          resumeActor()        else if (isDetached)          scheduleActor(null, null)      }  }}/** <p> *    This object is used as the timeout pattern in *    <a href="Actor.html#receiveWithin(Long)" target="contentFrame"> *    <code>receiveWithin</code></a> and *    <a href="Actor.html#reactWithin(Long)" target="contentFrame"> *    <code>reactWithin</code></a>. *  </p> *  <p> *    The following example demonstrates its usage: *  </p><pre> *    receiveWithin(500) { *      <b>case</b> (x, y) <b>=&gt;</b> ... *      <b>case</b> TIMEOUT <b>=&gt;</b> ... *    }</pre> * *  @version 0.9.8 *  @author Philipp Haller */case object TIMEOUTcase class Exit(from: Actor, reason: AnyRef)/** <p> *    This class is used to manage control flow of actor *    executions. *  </p> * * @version 0.9.8 * @author Philipp Haller */private[actors] class SuspendActorException extends Throwable {  /*   * For efficiency reasons we do not fill in   * the execution stack trace.   */  override def fillInStackTrace(): Throwable = this}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?