📄 sync-var.sml
字号:
(* sync-var.sml * * COPYRIGHT (c) 1995 AT&T Bell Laboratories. * COPYRIGHT (c) 1989-1991 John H. Reppy * * The implementation of Id-style synchronizing memory cells. *)structure SyncVar :> SYNC_VAR = struct structure R = RepTypes structure S = Scheduler type 'a cont = 'a SMLofNJ.Cont.cont val callcc = SMLofNJ.Cont.callcc val throw = SMLofNJ.Cont.throw (* the underlying representation of both ivars and mvars is the same. *) datatype 'a cell = CELL of { priority : int ref, readQ : (R.trans_id ref * 'a cont) Q.queue, value : 'a option ref } type 'a ivar = 'a cell type 'a mvar = 'a cell exception Put fun newCell () = CELL{priority = ref 0, readQ = Q.queue(), value=ref NONE} fun sameCell (CELL{value=v1, ...}, CELL{value=v2, ...}) = (v1 = v2) (* create a new transaction ID *) fun mkId () = ref(R.TRANS(S.getCurThread())) (* given a transaction ID, get its thread ID and mark it cancelled. *) fun getIdFromTrans (transId as ref(R.TRANS tid)) = ( transId := R.CANCEL; tid) (* bump a priority value by one, returning the old value *) fun bumpPriority (p as ref n) = (p := n+1; n) datatype 'a q_item = NoItem | Item of (R.trans_id ref * 'a cont) (* functions to clean channel input and output queues *) local fun clean [] = [] | clean ((ref R.CANCEL, _)::r) = clean r | clean l = l fun cleanRev ([], l) = l | cleanRev ((ref R.CANCEL, _)::r, l) = cleanRev (r, l) | cleanRev (x::r, l) = cleanRev (r, x::l) in fun cleanAndChk (priority, R.Q{front, rear}) = let fun cleanFront [] = cleanRear (! rear) | cleanFront f = (case (clean f) of [] => cleanRear (! rear) | f' => (front := f'; bumpPriority priority) (* end case *)) and cleanRear [] = 0 | cleanRear r = ( rear := []; case (cleanRev (r, [])) of [] => 0 | rr => (front := rr; bumpPriority priority) (* end case *)) in cleanFront (! front) end fun cleanAndRemove (R.Q{front, rear, ...}) = let fun cleanFront [] = cleanRear (! rear) | cleanFront f = (case (clean f) of [] => cleanRear (! rear) | (item::rest) => (front := rest; Item item) (* end case *)) and cleanRear [] = NoItem | cleanRear r = ( rear := []; case (cleanRev (r, [])) of [] => NoItem | (item::rest) => (front := rest; Item item) (* end case *)) in cleanFront (! front) end fun cleanAndEnqueue (R.Q{front, rear, ...}, item) = let fun cleanFront [] = cleanRear (! rear) | cleanFront f = (case (clean f) of [] => cleanRear (! rear) | f' => (front := f'; rear := item :: (! rear)) (* end case *)) and cleanRear [] = (front := [item]) | cleanRear r = (case (cleanRev (r, [])) of [] => (front := [item]; rear := []) | rr => (rear := [item]; front := rr) (* end case *)) in cleanFront (! front) end end (* local *) (* When a thread is resumed after being blocked on an iGet or mGet operation, * there may be other threads also blocked on the variable. This function * is used to propagate the message to all of the threads that are blocked * on the variable (or until one of them takes the value in the mvar case). * It must be called from an atomic region; when the readQ is finally empty, * we leave the atomic region. We must use "cleanAndRemove" to get items * from the readQ in the unlikely event that a single thread executes a * choice of multiple gets on the same variable. *) fun relayMsg (readQ, msg) = (case (cleanAndRemove readQ) of NoItem => S.atomicEnd() | (Item(transId, kont)) => callcc (fn myKont => ( S.enqueueAndSwitchCurThread(myKont, getIdFromTrans transId); throw kont msg)) (* end case *)) fun impossible () = raise Fail "SyncVar: impossible" (** I-variables **) val iVar = newCell val sameIVar = sameCell fun iPut (CELL{priority, readQ, value}, x) = ( S.atomicBegin(); case !value of NONE => ( value := SOME x; case (cleanAndRemove readQ) of NoItem => S.atomicEnd() | (Item(transId, kont)) => callcc (fn myKont => ( S.enqueueAndSwitchCurThread(myKont, getIdFromTrans transId); priority := 1; throw kont x)) (* end case *)) | (SOME _) => (S.atomicEnd(); raise Put) (* end case *)) fun iGet (CELL{priority, readQ, value}) = ( S.atomicBegin(); case !value of NONE => let val msg = callcc (fn k => ( Q.enqueue (readQ, (mkId(), k)); S.atomicDispatch ())) in relayMsg (readQ, msg); msg end | (SOME v) => (S.atomicEnd(); v) (* end case *)) fun iGetEvt (CELL{priority, readQ, value}) = let fun blockFn {transId, cleanUp, next} = let val msg = callcc (fn k => ( Q.enqueue (readQ, (transId, k)); next (); impossible())) in cleanUp(); relayMsg (readQ, msg); msg end fun pollFn () = (case !value of NONE => R.BLOCKED blockFn | (SOME v) => R.ENABLED{ prio=bumpPriority priority, doFn=(fn () => (priority := 1; S.atomicEnd(); v)) } (* end case *)) in R.BEVT[pollFn] end fun iGetPoll (CELL{priority, readQ, value}) = ( S.atomicBegin(); case !value of NONE => NONE | (SOME v) => (S.atomicEnd(); SOME v) (* end case *)) (** M-variables **) val mVar = newCell fun mVarInit x = CELL{priority = ref 0, readQ = Q.queue(), value=ref(SOME x)} val sameMVar = sameCell fun mPut (CELL{priority, readQ, value}, x) = ( S.atomicBegin(); case !value of NONE => ( value := SOME x; case (cleanAndRemove readQ) of NoItem => S.atomicEnd() | (Item(transId, kont)) => callcc (fn myKont => ( S.enqueueAndSwitchCurThread(myKont, getIdFromTrans transId); priority := 1; throw kont x)) (* end case *)) | (SOME _) => (S.atomicEnd(); raise Put) (* end case *)) fun mTake (CELL{priority, readQ, value}) = ( S.atomicBegin(); case !value of NONE => let val v = callcc (fn k => ( Q.enqueue (readQ, (mkId(), k)); S.atomicDispatch ())) in value := NONE; S.atomicEnd(); v end | (SOME v) => (value := NONE; S.atomicEnd(); v) (* end case *)) fun mTakeEvt (CELL{priority, readQ, value}) = let fun blockFn {transId, cleanUp, next} = let val v = callcc (fn k => ( Q.enqueue (readQ, (transId, k)); next (); impossible())) in cleanUp(); value := NONE; S.atomicEnd(); v end fun pollFn () = (case !value of NONE => R.BLOCKED blockFn | (SOME v) => R.ENABLED{ prio=bumpPriority priority, doFn=(fn () => (value := NONE; S.atomicEnd(); v)) } (* end case *)) in R.BEVT[pollFn] end fun mTakePoll (CELL{priority, readQ, value}) = ( S.atomicBegin(); case !value of NONE => NONE | (SOME v) => (value := NONE; S.atomicEnd(); SOME v) (* end case *)) fun mGet (CELL{priority, readQ, value}) = ( S.atomicBegin(); case !value of NONE => let val v = callcc (fn k => ( Q.enqueue (readQ, (mkId(), k)); S.atomicDispatch ())) in relayMsg (readQ, v); v end | (SOME v) => (S.atomicEnd(); v) (* end case *)) fun mGetEvt (CELL{priority, readQ, value}) = let fun blockFn {transId, cleanUp, next} = let val v = callcc (fn k => ( Q.enqueue (readQ, (transId, k)); next (); impossible())) in cleanUp(); relayMsg (readQ, v); v end fun pollFn () = (case !value of NONE => R.BLOCKED blockFn | (SOME v) => R.ENABLED{ prio=bumpPriority priority, doFn=(fn () => (S.atomicEnd(); v)) } (* end case *)) in R.BEVT[pollFn] end fun mGetPoll (CELL{priority, readQ, value}) = ( S.atomicBegin(); case !value of NONE => NONE | (SOME v) => (S.atomicEnd(); SOME v) (* end case *)) (* Swap the current contents of the cell with a new value. This function * has the effect of an mTake followed by an mPut, except that it is * guaranteed to be atomic. It is also somewhat more efficient. *) fun mSwap (CELL{priority, readQ, value}, newV) = ( S.atomicBegin(); case !value of NONE => let val v = callcc (fn k => ( Q.enqueue (readQ, (mkId(), k)); S.atomicDispatch ())) in value := SOME newV; (* relay the new value to any other blocked threads *) relayMsg (readQ, newV); v end | (SOME v) => (value := SOME newV; S.atomicEnd(); v) (* end case *)) fun mSwapEvt (CELL{priority, readQ, value}, newV) = let fun blockFn {transId, cleanUp, next} = let val v = callcc (fn k => ( Q.enqueue (readQ, (transId, k)); next (); impossible())) in cleanUp(); value := SOME newV; relayMsg (readQ, newV); v end fun pollFn () = (case !value of NONE => R.BLOCKED blockFn | (SOME v) => R.ENABLED{ prio=bumpPriority priority, doFn=(fn () => (value := SOME newV; S.atomicEnd(); v)) } (* end case *)) in R.BEVT[pollFn] end end; (* SyncVar *)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -