actor.scala
来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 874 行 · 第 1/2 页
SCALA
874 行
* @param f a partial function with message patterns and actions */ def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt continuation = f isDetached = true } else { sessions = List(qel.session) scheduleActor(f, qel.msg) } throw new SuspendActorException } } /** * Receives a message from this actor's mailbox within a certain * time span. * <p> * This method never returns. Therefore, the rest of the computation * has to be contained in the actions of the partial function. * * @param msec the time span before timeout * @param f a partial function with message patterns and actions */ def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { tick() // first, remove spurious TIMEOUT message from mailbox if any val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt TimerThread.requestTimeout(this, f, msec) timeoutPending = true continuation = f isDetached = true } else { sessions = List(qel.session) scheduleActor(f, qel.msg) } throw new SuspendActorException } } /** * The behavior of an actor is specified by implementing this * abstract method. Note that the preferred way to create actors * is through the <code>actor</code> method * defined in object <code>Actor</code>. */ def act(): Unit /** * Sends <code>msg</code> to this actor (asynchronous). */ def !(msg: Any) { send(msg, Actor.self) } /** * Forwards <code>msg</code> to this actor (asynchronous). */ def forward(msg: Any) { send(msg, Actor.sender) } /** * Sends <code>msg</code> to this actor and awaits reply * (synchronous). * * @param msg the message to be sent * @return the reply */ def !?(msg: Any): Any = { val replyCh = Actor.self.freshReplyChannel send(msg, replyCh) replyCh.receive { case x => x } } /** * Sends <code>msg</code> to this actor and awaits reply * (synchronous) within <code>msec</code> milliseconds. * * @param msec the time span before timeout * @param msg the message to be sent * @return <code>None</code> in case of timeout, otherwise * <code>Some(x)</code> where <code>x</code> is the reply */ def !?(msec: Long, msg: Any): Option[Any] = { val replyCh = Actor.self.freshReplyChannel send(msg, replyCh) replyCh.receiveWithin(msec) { case TIMEOUT => None case x => Some(x) } } /** * Sends <code>msg</code> to this actor and immediately * returns a future representing the reply value. */ def !!(msg: Any): Future[Any] = { val ftch = new Channel[Any](Actor.self) send(msg, ftch) new Future[Any](ftch) { def apply() = if (isSet) value.get else ch.receive { case any => value = Some(any); any } def isSet = value match { case None => ch.receiveWithin(0) { case TIMEOUT => false case any => value = Some(any); true } case Some(_) => true } } } /** * Sends <code>msg</code> to this actor and immediately * returns a future representing the reply value. * The reply is post-processed using the partial function * <code>f</code>. This also allows to recover a more * precise type for the reply value. */ def !: Future[A] = { val ftch = new Channel[Any](Actor.self) send(msg, ftch) new Future[A](ftch) { def apply() = if (isSet) value.get else ch.receive { case any => value = Some(f(any)); value.get } def isSet = value match { case None => ch.receiveWithin(0) { case TIMEOUT => false case any => value = Some(f(any)); true } case Some(_) => true } } } /** * Replies with <code>msg</code> to the sender. */ def reply(msg: Any) { sender ! msg } private var rc: Channel[Any] = null private[actors] def replyChannel = rc private[actors] def freshReplyChannel: Channel[Any] = { rc = new Channel[Any](this); rc } /** * Receives the next message from this actor's mailbox. */ def ? : Any = receive { case x => x } def sender: OutputChannel[Any] = sessions.head def receiver: Actor = this private var continuation: PartialFunction[Any, Unit] = null private var timeoutPending = false // accessed in Reaction private[actors] var isDetached = false private var isWaiting = false // guarded by lock of this protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = if ((f eq null) && (continuation eq null)) { // do nothing (timeout is handled instead) } else { val task = new Reaction(this, if (f eq null) continuation else f, msg) Scheduler execute task } private def tick(): Unit = Scheduler tick this private[actors] var kill: () => Unit = () => {} private def suspendActor() { isWaiting = true while (isWaiting) { try { wait() } catch { case _: InterruptedException => } } // links: check if we should exit if (shouldExit) exit() } private def suspendActorFor(msec: Long) { val ts = Platform.currentTime var waittime = msec var fromExc = false isWaiting = true while (isWaiting) { try { fromExc = false wait(waittime) } catch { case _: InterruptedException => { fromExc = true val now = Platform.currentTime val waited = now-ts waittime = msec-waited if (waittime < 0) { isWaiting = false } } } if (!fromExc) { isWaiting = false } } // links: check if we should exit if (shouldExit) exit() } private def resumeActor() { isWaiting = false notify() } /** * Starts this actor. */ def start(): Actor = synchronized { // Reset various flags. // // Note that we do *not* reset `trapExit`. The reason is that // users should be able to set the field in the constructor // and before `act` is called. exitReason = 'normal exiting = false shouldExit = false Scheduler start new Reaction(this) this } private def seq[a, b](first: => a, next: => b): Unit = { val s = Actor.self val killNext = s.kill s.kill = () => { s.kill = killNext // to avoid stack overflow: // instead of directly executing `next`, // schedule as continuation scheduleActor({ case _ => next }, 1) throw new SuspendActorException } first throw new KillActorException } private[actors] var links: List[Actor] = Nil /** * Links <code>self</code> to actor <code>to</code>. * * @param to ... * @return ... */ def link(to: Actor): Actor = { assert(Actor.self == this, "link called on actor different from self") links = to :: links to.linkTo(this) to } /** * Links <code>self</code> to actor defined by <code>body</code>. */ def link(body: => Unit): Actor = { val actor = new Actor { def act() = body } link(actor) actor.start() actor } private[actors] def linkTo(to: Actor) = synchronized { links = to :: links } /** * Unlinks <code>self</code> from actor <code>from</code>. */ def unlink(from: Actor) { assert(Actor.self == this, "unlink called on actor different from self") links = links.remove(from.==) from.unlinkFrom(this) } private[actors] def unlinkFrom(from: Actor) = synchronized { links = links.remove(from.==) } var trapExit = false private[actors] var exitReason: AnyRef = 'normal private[actors] var exiting = false private[actors] var shouldExit = false /** * <p> * Terminates execution of <code>self</code> with the following * effect on linked actors: * </p> * <p> * For each linked actor <code>a</code> with * <code>trapExit</code> set to <code>true</code>, send message * <code>Exit(self, reason)</code> to <code>a</code>. * </p> * <p> * For each linked actor <code>a</code> with * <code>trapExit</code> set to <code>false</code> (default), * call <code>a.exit(reason)</code> if * <code>reason != 'normal</code>. * </p> */ def exit(reason: AnyRef): Nothing = { exitReason = reason exit() } /** * Terminates with exit reason <code>'normal</code>. */ def exit(): Nothing = { // links if (!links.isEmpty) exitLinked() throw new ExitActorException } // Assume !links.isEmpty private[actors] def exitLinked() { exiting = true // remove this from links links = links.remove(this.==) // exit linked processes links.foreach((linked: Actor) => { unlink(linked) if (!linked.exiting) linked.exit(this, exitReason) }) } // Assume !links.isEmpty private[actors] def exitLinked(reason: AnyRef) { exitReason = reason exitLinked() } // Assume !this.exiting private[actors] def exit(from: Actor, reason: AnyRef) { if (trapExit) { this ! Exit(from, reason) } else if (reason != 'normal) this.synchronized { shouldExit = true exitReason = reason if (isSuspended) resumeActor() else if (isDetached) scheduleActor(null, null) } }}/** <p> * This object is used as the timeout pattern in * <a href="Actor.html#receiveWithin(Long)" target="contentFrame"> * <code>receiveWithin</code></a> and * <a href="Actor.html#reactWithin(Long)" target="contentFrame"> * <code>reactWithin</code></a>. * </p> * <p> * The following example demonstrates its usage: * </p><pre> * receiveWithin(500) { * <b>case</b> (x, y) <b>=></b> ... * <b>case</b> TIMEOUT <b>=></b> ... * }</pre> * * @version 0.9.8 * @author Philipp Haller */case object TIMEOUTcase class Exit(from: Actor, reason: AnyRef)/** <p> * This class is used to manage control flow of actor * executions. * </p> * * @version 0.9.8 * @author Philipp Haller */private[actors] class SuspendActorException extends Throwable { /* * For efficiency reasons we do not fill in * the execution stack trace. */ override def fillInStackTrace(): Throwable = this}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?