📄 bin-io-fn.sml
字号:
(* bin-io-fn.sml * * COPYRIGHT (c) 1995 AT&T Bell Laboratories. * * This is the CML version of the BinIO functor. *)functor BinIOFn ( structure OSPrimIO : OS_PRIM_IO where type PrimIO.array = BinPrimIO.array where type PrimIO.vector = BinPrimIO.vector where type PrimIO.elem = BinPrimIO.elem where type PrimIO.pos = BinPrimIO.pos where type PrimIO.reader = BinPrimIO.reader where type PrimIO.writer = BinPrimIO.writer ) : CML_BIN_IO = struct structure PIO = OSPrimIO.PrimIO structure A = Word8Array structure V = Word8Vector structure Pos = Position structure SV = SyncVar (* assign to an MVar *) fun mUpdate (mv, x) = (SV.mTake mv; SV.mPut(mv, x)) (* an element for initializing buffers *) val someElem = (0w0 : Word8.word) val vecExtract = V.extract val vecSub = V.sub val arrUpdate = A.update val empty = V.fromList[] fun dummyCleaner () = () structure StreamIO = struct type vector = V.vector type elem = V.elem type reader = PIO.reader type writer = PIO.writer type pos = PIO.pos (*** Functional input streams ***) datatype instream = ISTRM of (in_buffer * int) and in_buffer = IBUF of { basePos : pos option, more : more SV.mvar, (* when this cell is empty, it means that *) (* there is an outstanding request to the *) (* server to extend the stream. *) data : vector, info : info } and more = MORE of in_buffer (* forward link to additional data *) | NOMORE (* placeholder for forward link *) | TERMINATED (* termination of the stream *) and info = INFO of { reader : reader, readVec : int -> vector, readVecEvt : int -> vector CML.event, closed : bool ref, getPos : unit -> pos option, tail : more SV.mvar SV.mvar, (* points to the more cell of the last buffer *) cleanTag : CleanIO.tag } fun infoOfIBuf (IBUF{info, ...}) = info fun chunkSzOfIBuf buf = let val INFO{reader=PIO.RD{chunkSize, ...}, ...} = infoOfIBuf buf in chunkSize end fun readVec (IBUF{info=INFO{readVec=f, ...}, ...}) = f fun inputExn (INFO{reader=PIO.RD{name, ...}, ...}, mlOp, exn) = raise IO.Io{function=mlOp, name=name, cause=exn} datatype more_data = EOF | DATA of in_buffer (* extend the stream by a chunk. * Invariant: the more m-variable is empty on entry and full on exit. *) fun extendStream (readFn, mlOp, buf as IBUF{more, info, ...}) = (let val INFO{getPos, tail, ...} = info val basePos = getPos() val chunk = readFn (chunkSzOfIBuf buf) in if (V.length chunk = 0) then (SV.mPut (more, NOMORE); EOF) else let val newMore = SV.mVar() val buf' = IBUF{ basePos = basePos, data = chunk, more = newMore, info = info } in (* note that we do not fill the newMore cell until * after the tail has been updated. This ensures * that someone attempting to access the tail will * not acquire the lock until after we are done. *) mUpdate (tail, newMore); SV.mPut (more, MORE buf'); (* releases lock!! *) SV.mPut (newMore, NOMORE); DATA buf' end end handle ex => ( SV.mPut (more, NOMORE); inputExn(info, mlOp, ex))) (* get the next buffer in the stream, extending it if necessary. If * the stream must be extended, we lock it by taking the value from the * more cell; the extendStream function is responsible for filling in * the cell. *) fun getBuffer (readFn, mlOp) (buf as IBUF{more, info, ...}) = let fun get TERMINATED = EOF | get (MORE buf') = DATA buf' | get NOMORE = (case SV.mTake more of NOMORE => extendStream (readFn, mlOp, buf) | next => (SV.mPut(more, next); get next) (* end case *)) in get (SV.mGet more) end (* read a chunk that is at least the specified size *) fun readChunk buf = let val INFO{readVec, reader=PIO.RD{chunkSize, ...}, ...} = infoOfIBuf buf in case (chunkSize - 1) of 0 => (fn n => readVec n) | k => (* round up to next multiple of chunkSize *) (fn n => readVec(Int.quot(n+k, chunkSize) * chunkSize)) (* end case *) end fun generalizedInput getBuf = let fun get (ISTRM(buf as IBUF{data, ...}, pos)) = let val len = V.length data in if (pos < len) then (vecExtract(data, pos, NONE), ISTRM(buf, len)) else (case (getBuf buf) of EOF => (empty, ISTRM(buf, len)) | (DATA rest) => get (ISTRM(rest, 0)) (* end case *)) end in get end (* terminate an input stream *) fun terminate (info as INFO{tail, cleanTag, ...}) = let val m = SV.mGet tail in case SV.mTake m of (m' as MORE _) => (SV.mPut(m, m'); terminate info) | TERMINATED => SV.mPut(m, TERMINATED) | _ => ( CleanIO.removeCleaner cleanTag; SV.mPut(m, TERMINATED)) (* end case *) end (* find the end of the stream *) fun findEOS (buf as IBUF{more, data, ...}) = (case (SV.mGet more) of (MORE buf) => findEOS buf | _ => ISTRM(buf, V.length data) (* end case *)) fun input (strm as ISTRM(buf, _)) = generalizedInput (getBuffer (readVec buf, "input")) strm fun input1 (ISTRM(buf, pos)) = let val IBUF{data, more, ...} = buf in if (pos < V.length data) then SOME(vecSub(data, pos), ISTRM(buf, pos+1)) else let fun get (MORE buf) = input1 (ISTRM(buf, 0)) | get TERMINATED = NONE | get NOMORE = (case SV.mTake more of NOMORE => ( case extendStream (readVec buf, "input1", buf) of EOF => NONE | (DATA rest) => input1 (ISTRM(rest, 0)) (* end case *)) | next => (SV.mPut(more, next); get next) (* end case *)) in get (SV.mGet more) end end fun inputN (ISTRM(buf, pos), n) = let fun join (item, (list, strm)) = (item::list, strm) fun inputList (buf as IBUF{data, ...}, i, n) = let val len = V.length data val remain = len-i in if (remain >= n) then ([vecExtract(data, i, SOME n)], ISTRM(buf, i+n)) else join ( vecExtract(data, i, NONE), nextBuf(buf, n-remain)) end and nextBuf (buf as IBUF{more, data, ...}, n) = let fun get (MORE buf) = inputList (buf, 0, n) | get TERMINATED = ([], ISTRM(buf, V.length data)) | get NOMORE = (case (SV.mTake more) of NOMORE => (case extendStream (readVec buf, "inputN", buf) of EOF => ([], ISTRM(buf, V.length data)) | (DATA rest) => inputList (rest, 0, n) (* end case *)) | next => (SV.mPut(more, next); get next) (* end case *)) in get (SV.mGet more) end val (data, strm) = inputList (buf, pos, n) in (V.concat data, strm) end fun inputAll (strm as ISTRM(buf, _)) = let val INFO{reader=PIO.RD{avail, ...}, ...} = infoOfIBuf buf (* read a chunk that is as large as the available input. Note * that for systems that use CR-LF for #"\n", the size will be * too large, but this should be okay. *) fun bigChunk _ = let val delta = (case avail() of NONE => chunkSzOfIBuf buf | (SOME n) => n (* end case *)) in readChunk buf delta end val bigInput = generalizedInput (getBuffer (bigChunk, "inputAll")) fun loop (v, strm) = if (V.length v = 0) then [] else v :: loop(bigInput strm) val data = V.concat (loop (bigInput strm)) in (data, findEOS buf) end fun input1Evt _ = raise Fail "input1Evt unimplemented" fun inputEvt _ = raise Fail "inputEvt unimplemented" fun inputNEvt _ = raise Fail "inputNEvt unimplemented" fun inputAllEvt _ = raise Fail "inputAllEvt unimplemented" (* Return SOME k, if k <= amount characters can be read without blocking. *) fun canInput (strm as ISTRM(buf, pos), amount) = let(****** val readVecNB = (case buf of (IBUF{info as INFO{readVecNB=NONE, ...}, ...}) => inputExn(info, "canInput", IO.NonblockingNotSupported) | (IBUF{info=INFO{readVecNB=SOME f, ...}, ...}) => f (* end case *))******) fun tryInput (buf as IBUF{data, ...}, i, n) = let val len = V.length data val remain = len - i in if (remain >= n) then SOME n else nextBuf (buf, n - remain) end and nextBuf (IBUF{more, ...}, n) = let fun get (MORE buf) = tryInput (buf, 0, n) | get TERMINATED = SOME(amount - n)(****** | get NOMORE = (case SV.mTake more of NOMORE => (( case extendStream (readVecNB, "canInput", buf) of EOF => SOME(amount - n) | (DATA b) => tryInput (b, 0, n) (* end case *)) handle IO.Io{cause=WouldBlock, ...} => SOME(amount - n)) | next => (SV.mPut(more, next); get next) (* end case *))******) | get NOMORE = SOME(amount - n) in get (SV.mGet more) end in if (amount < 0) then raise Size else tryInput (buf, pos, amount) end fun closeIn (ISTRM(buf, _)) = (case (infoOfIBuf buf) of INFO{closed=ref true, ...} => () | (info as INFO{closed, reader=PIO.RD{close, ...}, ...}) => ( terminate info; closed := true; close() handle ex => inputExn(info, "closeIn", ex)) (* end case *)) fun endOfStream (ISTRM(buf as IBUF{more, ...}, pos)) = ( case SV.mTake more of (next as MORE _) => (SV.mPut(more, next); false) | next => let val IBUF{data, info=INFO{closed, ...}, ...} = buf in if (pos = V.length data) then (case (next, !closed) of (NOMORE, false) => ( case extendStream (readVec buf, "endOfStream", buf) of EOF => true | _ => false (* end case *)) | _ => (SV.mPut(more, next); true) (* end case *)) else (SV.mPut(more, next); false) end (* end case *)) fun mkInstream (reader, optData) = let val PIO.RD{readVec, readVecEvt, getPos, setPos, ...} = reader val getPos = (case (getPos, setPos) of (SOME f, SOME _) => (fn () => SOME(f())) | _ => (fn () => NONE) (* end case *)) val more = SV.mVarInit NOMORE val tag = CleanIO.addCleaner dummyCleaner val info = INFO{ reader=reader, readVec=readVec, readVecEvt=readVecEvt, closed = ref false, getPos = getPos, tail = SV.mVarInit more, cleanTag = tag } val buf = (case optData of NONE => IBUF{ basePos = getPos(), data=empty, info=info, more=more }(** What should we do about the position in this case ?? **)(** Suggestion: When building a stream with supplied initial data, ** nothing can be said about the positions inside that initial ** data (who knows where that data even came from!). **) | (SOME v) => IBUF{ basePos = NONE, data=v, info=info, more=more} (* end case *)) val strm = ISTRM(buf, 0) in CleanIO.rebindCleaner (tag, fn () => closeIn strm); strm end
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -