⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.sml

📁 这是我们参加06年全国开源软件的竞赛作品
💻 SML
字号:
(* channel.sml * * COPYRIGHT (c) 1995 AT&T Bell Laboratories. * COPYRIGHT (c) 1989-1991 John H. Reppy * * The representation of synchronous channels. * * To ensure that we always leave the atomic region exactly once, we * require that the blocking operation be responsible for leaving the * atomic region (in the event case, it must also execute the clean-up * action).  The doFn always transfers control to the blocked thread * without leaving the atomic region.  Note thet the send (and sendEvt) * blockFns run using the receiver's thread ID. *)structure Channel : sig    type 'a event    include CHANNEL    val resetChan : 'a chan -> unit  end = struct    structure T = Thread    structure S = Scheduler    structure R = RepTypes    type 'a event = 'a Event.event    type 'a cont = 'a SMLofNJ.Cont.cont    val callcc = SMLofNJ.Cont.callcc    val throw = SMLofNJ.Cont.throw  (* Some inline functions to improve performance *)    fun enqueue (R.Q{rear, ...}, x) = rear := x :: !rear    datatype 'a chan = CHAN of {        priority : int ref,        inQ      : (R.trans_id ref * 'a cont) R.queue,        outQ     : (R.trans_id ref * (R.thread_id * 'a cont) cont) R.queue      }    fun resetChan (CHAN{priority, inQ, outQ}) = (	  priority := 1;	  Q.reset inQ;	  Q.reset outQ)    fun channel () = CHAN{priority=ref 1, inQ=Q.queue(), outQ=Q.queue()}  (* sameChannel : ('a chan * 'a chan) -> bool *)    fun sameChannel (CHAN{inQ=in1, ...}, CHAN{inQ=in2, ...}) =	  Q.sameQ(in1, in2)  (* create a new transaction ID *)    fun mkId () = ref(R.TRANS(S.getCurThread()))  (* given a transaction ID, get its thread ID and mark it cancelled. *)    fun getIdFromTrans (transId as ref(R.TRANS tid)) = (	  transId := R.CANCEL;	  tid)  (* given a transaction ID, set the current thread to its thread ID   * and mark it cancelled.   *)    fun setCurThread transId = S.setCurThread(getIdFromTrans transId)    datatype 'a q_item      = NoItem      | Item of (R.trans_id ref * 'a cont)  (* bump a priority value by one, returning the old value *)    fun bumpPriority (p as ref n) = (p := n+1; n)  (* functions to clean channel input and output queues *)    local      fun clean [] = []	| clean ((ref R.CANCEL, _)::r) = clean r	| clean l = l      fun cleanRev ([], l) = l	| cleanRev ((ref R.CANCEL, _)::r, l) = cleanRev (r, l)	| cleanRev (x::r, l) = cleanRev (r, x::l)      fun cleanAll l = let	    fun rev ([], l) = l	      | rev (x::r, l) = rev(r, x::l)	    in	      rev (cleanRev (l, []), [])	    end    in    fun cleanAndChk (priority, R.Q{front, rear}) = let	  fun cleanFront [] = cleanRear (! rear)	    | cleanFront f = (case (clean f)		 of [] => cleanRear (! rear)		  | f' => (front := f'; bumpPriority priority)		(* end case *))	  and cleanRear [] = 0	    | cleanRear r = (		rear := [];		case (cleanRev (r, []))		 of [] => 0		  | rr => (front := rr; bumpPriority priority)		(* end case *))	  in	    cleanFront (! front)	  end    fun cleanAndRemove (R.Q{front, rear, ...}) = let	  fun cleanFront [] = cleanRear (! rear)	    | cleanFront f = (case (clean f)		 of [] => cleanRear (! rear)		  | (item::rest) => (front := rest; Item item)		(* end case *))	  and cleanRear [] = NoItem	    | cleanRear r = (		rear := [];		case (cleanRev (r, []))		 of [] => NoItem		  | (item::rest) => (front := rest; Item item)		(* end case *))	  in	    cleanFront (! front)	  end    fun cleanAndEnqueue (R.Q{front, rear, ...}, item) = (case cleanAll(!front)	   of [] => (front := cleanRev(!rear, [item]); rear := [])	    | f => (front := f; rear := item :: cleanAll(! rear))	  (* end case *))    end (* local *)    fun impossible () = raise Fail "Channel: impossible"    fun send (CHAN{priority, inQ, outQ}, msg) = (	  S.atomicBegin();	  case (cleanAndRemove inQ)           of Item(rid, rkont) => callcc (fn sendK => (                S.enqueueAndSwitchCurThread(sendK, getIdFromTrans rid);		priority := 1;                throw rkont msg))            | NoItem => let		val (recvId, recvK) = callcc (fn sendK => (			enqueue (outQ, (mkId(), sendK));			S.atomicDispatch()))		in		  S.atomicSwitchTo (recvId, recvK, msg)		end          (* end case *))    fun sendEvt (CHAN{priority, inQ, outQ}, msg) = let	  fun doFn () = let		val (transId, rkont) = Q.dequeue inQ		in		  callcc (fn sendK => (		    S.enqueueAndSwitchCurThread(sendK, getIdFromTrans transId);		    priority := 1;		    throw rkont msg))		end	  fun blockFn {transId, cleanUp, next} = let		val (recvId, recvK) = callcc (fn sendK => (			cleanAndEnqueue (outQ, (transId, sendK));			next();			impossible ()))		in		  cleanUp();		  S.atomicSwitchTo (recvId, recvK, msg)		end	  fun pollFn () = (case (cleanAndChk (priority, inQ))		 of 0 => R.BLOCKED blockFn		  | p => R.ENABLED{prio=p, doFn=doFn}		(* end case *))	  in	    R.BEVT[pollFn]	  end    fun sendPoll (CHAN{priority, inQ, outQ}, msg) = callcc (fn sendK => (	  S.atomicBegin();	  case (cleanAndRemove inQ)           of Item(rid, rkont) => (		callcc (fn sendK => (		  S.enqueueAndSwitchCurThread(sendK, getIdFromTrans rid);		  priority := 1;                  throw rkont msg));		true)            | NoItem => (S.atomicEnd(); false)          (* end case *)))    fun recv (CHAN{priority, inQ, outQ}) = callcc (fn recvK => (	  S.atomicBegin ();	  case (cleanAndRemove outQ)	   of Item(transId, sendK) => let		val myId = S.getCurThread()		in		  setCurThread transId;		  priority := 1;		  throw sendK (myId, recvK)		end	    | NoItem => (		enqueue (inQ, (mkId(), recvK));		S.atomicDispatch())	  (* end case *)))    fun recvEvt (CHAN{priority, inQ, outQ}) = let	  fun doFn () = let		val (transId, sendK) = Q.dequeue outQ		val myId = S.getCurThread()		in		  setCurThread transId;		  priority := 1;		  callcc (fn recvK => throw sendK (myId, recvK))		end	  fun blockFn {transId, cleanUp, next} = let		val msg = callcc (fn recvK => (		      cleanAndEnqueue (inQ, (transId, recvK));		      next ();		      impossible()))		in		  cleanUp();		  S.atomicEnd();		  msg		end	  fun pollFn () = (case (cleanAndChk (priority, outQ))		 of 0 => R.BLOCKED blockFn		  | p => R.ENABLED{prio=p, doFn=doFn}		(* end case *))	  in	    R.BEVT[pollFn]	  end    fun recvPoll (CHAN{priority, inQ, outQ}) = (	  S.atomicBegin ();	  case (cleanAndRemove outQ)	   of Item(transId, sendK) => SOME(callcc (fn recvK => 		let		val myId = S.getCurThread()		in		  setCurThread transId;		  priority := 1;		  throw sendK (myId, recvK)		end))	    | NoItem => (S.atomicEnd(); NONE)	  (* end case *))  end

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -