tcpservice.scala
来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 271 行
SCALA
271 行
/* __ *\** ________ ___ / / ___ Scala API **** / __/ __// _ | / / / _ | (c) 2005-2008, LAMP/EPFL **** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **** /____/\___/_/ |_/____/_/ | | **** |/ **\* */// $Id: TcpService.scala 13971 2008-02-13 14:58:07Z phaller $package scala.actors.remoteimport java.io.{DataInputStream, DataOutputStream, IOException}import java.lang.{Thread, SecurityException}import java.net.{InetAddress, ServerSocket, Socket, UnknownHostException, URLClassLoader}import scala.collection.mutable.HashMap/* Object TcpService. * * @version 0.9.9 * @author Philipp Haller */object TcpService { private val random = new Random private val ports = new HashMap[Int, TcpService] def apply(port: Int, cl: ClassLoader): TcpService = ports.get(port) match { case Some(service) => service case None => val service = new TcpService(port, cl) ports += Pair(port, service) service.start() service } def generatePort: Int = { var portnum = 0 try { portnum = 8000 + random.nextInt(500) val socket = new ServerSocket(portnum) socket.close() } catch { case ioe: IOException => // this happens when trying to open a socket twice // at the same port // try again generatePort case se: SecurityException => // do nothing } portnum } var BufSize: Int = 65536}/* Class TcpService. * * @version 0.9.10 * @author Philipp Haller */class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { val serializer: JavaSerializer = new JavaSerializer(this, cl) private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port) def node: Node = internalNode private val pendingSends = new HashMap[Node, List[Array[Byte]]] /** * Sends a byte array to another node on the network. * If the node is not yet up, up to <code>TcpService.BufSize</code> * messages are buffered. */ def send(node: Node, data: Array[Byte]): Unit = synchronized { def bufferMsg(t: Throwable) { // buffer message, so that it can be re-sent // when remote net kernel comes up (pendingSends.get(node): @unchecked) match { case None => pendingSends += Pair(node, List(data)) case Some(msgs) if msgs.length < TcpService.BufSize => pendingSends += Pair(node, data :: msgs) } } // retrieve worker thread (if any) that already has connection getConnection(node) match { case None => // we are not connected, yet try { val newWorker = connect(node) newWorker transmit data // any pending sends? pendingSends.get(node) match { case None => // do nothing case Some(msgs) => msgs foreach {newWorker transmit _} pendingSends -= node } } catch { case uhe: UnknownHostException => bufferMsg(uhe) case ioe: IOException => bufferMsg(ioe) case se: SecurityException => // do nothing } case Some(worker) => worker transmit data } } def terminate() { shouldTerminate = true new Socket(internalNode.address, internalNode.port) } private var shouldTerminate = false override def run() { try { val socket = new ServerSocket(port) while (!shouldTerminate) { Debug.info(this+": waiting for new connection...") val nextClient = socket.accept() if (!shouldTerminate) { val worker = new TcpServiceWorker(this, nextClient) Debug.info("Started new "+worker) worker.readNode worker.start() } else nextClient.close() } } catch { case ioe: IOException => Debug.info(this+": caught "+ioe) case sec: SecurityException => Debug.info(this+": caught "+sec) case e: Exception => Debug.info(this+": caught "+e) } finally { Debug.info(this+": shutting down...") var workers: List[TcpServiceWorker] = List() connections.values foreach { w => workers = w :: workers } workers foreach { w => w.halt } } } // connection management private val connections = new scala.collection.mutable.HashMap[Node, TcpServiceWorker] private[actors] def addConnection(node: Node, worker: TcpServiceWorker) = synchronized { connections += Pair(node, worker) } def getConnection(n: Node) = synchronized { connections.get(n) } def isConnected(n: Node): Boolean = synchronized { !connections.get(n).isEmpty } def connect(n: Node): TcpServiceWorker = synchronized { val sock = new Socket(n.address, n.port) val worker = new TcpServiceWorker(this, sock) worker.sendNode(n) worker.start() addConnection(n, worker) worker } def disconnectNode(n: Node) = synchronized { connections.get(n) match { case None => { // do nothing } case Some(worker) => { connections -= n worker.halt } } } def isReachable(node: Node): Boolean = if (isConnected(node)) true else try { connect(node) return true } catch { case uhe: UnknownHostException => false case ioe: IOException => false case se: SecurityException => false } def nodeDown(mnode: Node): Unit = synchronized { connections -= mnode }}class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { val in = so.getInputStream() val out = so.getOutputStream() val datain = new DataInputStream(in) val dataout = new DataOutputStream(out) var connectedNode: Node = _ def sendNode(n: Node) = { connectedNode = n parent.serializer.writeObject(dataout, parent.node) } def readNode = { val node = parent.serializer.readObject(datain) node match { case n: Node => { connectedNode = n parent.addConnection(n, this) } } } def transmit(data: Array[Byte]): Unit = synchronized { Debug.info(this+": transmitting data...") dataout.writeInt(data.length) dataout.write(data) dataout.flush() } var running = true def halt = synchronized { so.close() running = false } override def run() { try { while (running) { val msg = parent.serializer.readObject(datain); parent.kernel.processMsg(connectedNode, msg) } } catch { case ioe: IOException => Debug.info(this+": caught "+ioe) parent nodeDown connectedNode case e: Exception => Debug.info(this+": caught "+e) parent nodeDown connectedNode } Debug.info(this+": terminated") }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?