pilib.scala

来自「JAVA 语言的函数式编程扩展」· SCALA 代码 · 共 208 行

SCALA
208
字号
/*                     __                                               *\**     ________ ___   / /  ___     Scala API                            ****    / __/ __// _ | / /  / _ |    (c) 2003-2007, LAMP/EPFL             ****  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **** /____/\___/_/ |_/____/_/ | |                                         ****                          |/                                          **\*                                                                      */// $Id: pilib.scala 12390 2007-07-23 12:19:46Z michelou $package scala.concurrent/** <p> *    Library for using Pi-calculus concurrent primitives in *    <a href="http://scala-lang.org/" target="_top">Scala</a>. As an *    example, the definition of a two-place buffer using the <code>pilib</code> *    library looks like: *  </p><pre> *  <b>def</b> Buffer[a](put: Chan[a], get: Chan[a]) { *    <b>def</b> B0 { choice ( put * { x => B1(x) } ) } *    <b>def</b> B1(x: a) { choice ( get(x) * B0, put * { y => B2(x, y) } ) } *    <b>def</b> B2(x: a, y: a) { choice ( get(x) * B1(y) ) } *    B0 *  } *  </pre> * *  @see     <a href="http://scala-lang.org/docu/papers.html" target="_top"> *           PiLib: A Hosted Language for Pi-Calculus Style Concurrency</a> *  @author  Vincent Cremet, Martin Odersky *  @version 1.0 */ object pilib {  //////////////////////////////// SPAWN /////////////////////////////////  /**   * Run several processes in parallel using the following syntax:   * <code>spawn &lt; p<sub>1</sub> | ... | p<sub>n</sub> &gt;</code>   */  abstract class Spawn {    def <(p: => Unit): Spawn    def |(p: => Unit): Spawn    def > : Unit  }  val spawn = new Spawn {  //object spawn extends Spawn { // BUG !    def <(p: => Unit): Spawn = { scala.concurrent.ops.spawn(p); this }    def |(p: => Unit): Spawn = { scala.concurrent.ops.spawn(p); this }    def > : Unit = ()  }  /////////////////////////// GUARDED PROCESSES //////////////////////////  /** Untyped channel. */  class UChan {    /** Default log function. */    var log = (x: Any) => ()  }  /** An untyped guarded process.   *   *  @param n         channel name   *  @param polarity  input (true) or output (false)   *  @param v         transmitted value   *  @param c         continuation   */  case class UGP(n: UChan, polarity: Boolean, v: Any, c: Any => Any)  /** Typed guarded process. */  class GP[a](n: UChan, polarity: Boolean, v: Any, c: Any => a) {    val untyped = UGP(n, polarity, v, c)  }  //////////////////////////////// CHANNELS //////////////////////////////  /**   * Name on which one can emit, receive or that can be emitted or received   * during a communication.   */  class Chan[A] extends UChan with Function1[A, Product[A]] {    var defaultValue: A = _    /** Creates an input guarded process. */    def input[B](c: A => B) =      new GP(this, true, (), x => c(x.asInstanceOf[A]))    /** Creates an input guarded process. */    def output[B](v: A, c: () => B) =      new GP(this, false, v, x => c())    /** Blocking read. */    def read = {      var res: A = defaultValue      choice ( input(x => res = x) )      res    }    /** Blocking write. */    def write(x: A) =      choice ( output(x, () => ()) )    /** Syntactic sugar for input. */    def *[B](f: A => B) =      input(f)    /** Syntactic sugar for output. */    def apply(v: A) =      new Product(this, v)    /** Attach a function to be evaluated at each communication event     *  on this channel. Replace previous attached function.     */    def attach(f: A => Unit) =      log = x => f(x.asInstanceOf[A])  }  class Product[A](c: Chan[A], v: A) {    def *[B](f: => B) = c.output(v, () => f)  }  /////////////////////// SUM OF GUARDED PROCESSES ///////////////////////  case class Sum(gs: List[UGP]) {    /** Continuation of the sum. */    var cont: () => Any = _    var initialized = false    /** Block if not initialized otherwise continue with the     *  continuation.     */    def continue = synchronized {      if (!initialized) wait()      cont()    }    /** Set the values of parameters and awake the sleeping sum.     *     *  @param f ...     */    def set(f: () => Any) = synchronized {      cont = f      initialized = true      notify()    }  }  ///////////////////////////// COMMUNICATION ////////////////////////////  private var sums: List[Sum] = Nil  /** Test if two lists of guarded processes can communicate.   *   *  @param gs1 ...   *  @param gs2 ...   *  @return    ...   */  private def matches(gs1: List[UGP], gs2: List[UGP]): Option[(() => Unit, () => Any, () => Any)] =    (gs1, gs2) match {      case (Nil, _) => None      case (_, Nil) => None      case (UGP(a1, d1, v1, c1) :: rest1, UGP(a2, d2, v2, c2) :: rest2) =>        if (a1 == a2 && d1 == !d2)          Some(((() => if (d1) a1.log(v2) else a1.log(v1)), (() => c1(v2)), (() => c2(v1))))        else matches(gs1, rest2) match {          case None => matches(rest1, gs2)          case Some(t) => Some(t)        }    }  /** Test if the given sum can react with one of the pending sums.   *  If yes then do the reaction otherwise append the sum at the end   *  of the pending sums.   *   *  @param s1 ...   *  @param ss ...   *  @return   ...   */  private def compare(s1: Sum, ss: List[Sum]): List[Sum] =    ss match {      case Nil => ss ::: List(s1)      case s2 :: rest => matches(s1.gs, s2.gs) match {        case None => s2 :: compare(s1, rest)        case Some((log, c1, c2)) =>          log()          s1.set(c1)          s2.set(c2)          rest      }    }  /** Pi-calculus non-deterministic choice.   *   *  @param s ...   *  @return  ...   */  def choice[A](s: GP[A]*): A = {    val sum = Sum(s.toList map { _.untyped })    synchronized { sums = compare(sum, sums) }    (sum.continue).asInstanceOf[A]  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?