netkernel.scala
来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 210 行
SCALA
210 行
/* __ *\** ________ ___ / / ___ Scala API **** / __/ __// _ | / / / _ | (c) 2005-2008, LAMP/EPFL **** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **** /____/\___/_/ |_/____/_/ | | **** |/ **\* */// $Id: NetKernel.scala 13738 2008-01-18 19:53:56Z michelou $package scala.actors.remoteimport scala.collection.mutable.{HashMap, HashSet}import scala.actors.Actor.loopcase class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])case class SyncSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte])case class SendTo(a: Actor, msg: Any)case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol)case class ReplyTo(a: Actor, msg: Any)case object Terminate/** * @version 0.9.10 * @author Philipp Haller */class NetKernel(service: Service) { def sendToNode(node: Node, msg: AnyRef) = { val bytes = service.serializer.serialize(msg) service.send(node, bytes) } def namedSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { val bytes = service.serializer.serialize(msg) sendToNode(node, NamedSend(sender, receiver, bytes)) } def namedSyncSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { val bytes = service.serializer.serialize(msg) val toSend = SyncSend(sender, receiver, bytes) sendToNode(node, toSend) } def sendReply(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { val bytes = service.serializer.serialize(msg) val toSend = Reply(sender, receiver, bytes) sendToNode(node, toSend) } private val actors = new HashMap[Symbol, Actor] private val names = new HashMap[Actor, Symbol] def register(name: Symbol, a: Actor): Unit = synchronized { actors += Pair(name, a) names += Pair(a, name) } def selfName = names.get(Actor.self) match { case None => val freshName = FreshNameCreator.newName("remotesender") register(freshName, Actor.self) freshName case Some(name) => name } def send(node: Node, name: Symbol, msg: AnyRef) { val senderName = selfName namedSend(node, senderName, name, msg) } def syncSend(node: Node, name: Symbol, msg: AnyRef) { val senderName = selfName namedSyncSend(node, senderName, name, msg) } def createProxy(node: Node, sym: Symbol): Actor = { val p = new Proxy(node, sym, this) proxies += Pair((node, sym), p) p } val proxies = new HashMap[(Node, Symbol), Actor] def getOrCreateProxy(senderNode: Node, senderName: Symbol): Actor = synchronized { proxies.get((senderNode, senderName)) match { case Some(senderProxy) => senderProxy case None => createProxy(senderNode, senderName) } } def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { msg match { case cmd@NamedSend(senderName, receiver, data) => Debug.info(this+": processing "+cmd) actors.get(receiver) match { case Some(a) => try { Debug.info(this+": receiver is "+a) val msg = service.serializer.deserialize(data) Debug.info(this+": deserialized msg is "+msg) val senderProxy = getOrCreateProxy(senderNode, senderName) Debug.info(this+": created "+senderProxy) senderProxy.send(SendTo(a, msg), null) } catch { case e: Exception => Debug.error(this+": caught "+e) } case None => // message is lost Debug.info(this+": lost message") } case cmd@SyncSend(senderName, receiver, data) => Debug.info(this+": processing "+cmd) actors.get(receiver) match { case Some(a) => val msg = service.serializer.deserialize(data) val senderProxy = getOrCreateProxy(senderNode, senderName) senderProxy.send(SyncSendTo(a, msg, receiver), null) case None => // message is lost } case cmd@Reply(senderName, receiver, data) => Debug.info(this+": processing "+cmd) actors.get(receiver) match { case Some(a) => val msg = service.serializer.deserialize(data) val senderProxy = getOrCreateProxy(senderNode, senderName) senderProxy.send(ReplyTo(a, msg), null) case None => // message is lost } } } def terminate() { // tell all proxies to terminate proxies.values foreach { p => p.send(Terminate, null) } // tell service to terminate service.terminate() }}class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { start() override def act() { Debug.info(this+": waiting to process commands") loop { react { case cmd@SendTo(a, msg) => Debug.info(this+": processing "+cmd) a ! msg case cmd@SyncSendTo(a, msg, receiver) => Debug.info(this+": processing "+cmd) val replyCh = new Channel[Any](this) a.send(msg, replyCh) val res = replyCh.receive { case x => x } res match { case refmsg: AnyRef => kernel.sendReply(node, receiver, name, refmsg) } case cmd@ReplyTo(a, msg) => Debug.info(this+": processing "+cmd) a.replyChannel ! msg case cmd@Terminate => Debug.info(this+": processing "+cmd) exit() } } } override def !(msg: Any): Unit = msg match { case ch ! m => // do not send remotely this.send(msg, Actor.self.replyChannel) case a: AnyRef => kernel.send(node, name, a) case other => error("Cannot send non-AnyRef value remotely.") } override def !?(msg: Any): Any = msg match { case a: AnyRef => val replyCh = Actor.self.freshReplyChannel kernel.syncSend(node, name, a) replyCh.receive { case x => x } case other => error("Cannot send non-AnyRef value remotely.") }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?