📄 qio.c
字号:
#include "u.h"#include "lib.h"#include "mem.h"#include "dat.h"#include "fns.h"#include "error.h"static ulong padblockcnt;static ulong concatblockcnt;static ulong pullupblockcnt;static ulong copyblockcnt;static ulong consumecnt;static ulong producecnt;static ulong qcopycnt;static int debugging;#define QDEBUG if(0)/* * IO queues */struct Queue{ Lock lk; Block* bfirst; /* buffer */ Block* blast; int len; /* bytes allocated to queue */ int dlen; /* data bytes in queue */ int limit; /* max bytes in queue */ int inilim; /* initial limit */ int state; int noblock; /* true if writes return immediately when q full */ int eof; /* number of eofs read by user */ void (*kick)(void*); /* restart output */ void (*bypass)(void*, Block*); /* bypass queue altogether */ void* arg; /* argument to kick */ QLock rlock; /* mutex for reading processes */ Rendez rr; /* process waiting to read */ QLock wlock; /* mutex for writing processes */ Rendez wr; /* process waiting to write */ char err[ERRMAX];};enum{ Maxatomic = 64*1024,};uint qiomaxatomic = Maxatomic;voidixsummary(void){ debugging ^= 1; iallocsummary(); print("pad %lud, concat %lud, pullup %lud, copy %lud\n", padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt); print("consume %lud, produce %lud, qcopy %lud\n", consumecnt, producecnt, qcopycnt);}/* * free a list of blocks */voidfreeblist(Block *b){ Block *next; for(; b != 0; b = next){ next = b->next; b->next = 0; freeb(b); }}/* * pad a block to the front (or the back if size is negative) */Block*padblock(Block *bp, int size){ int n; Block *nbp; QDEBUG checkb(bp, "padblock 1"); if(size >= 0){ if(bp->rp - bp->base >= size){ bp->rp -= size; return bp; } if(bp->next) panic("padblock 0x%luX", getcallerpc(&bp)); n = BLEN(bp); padblockcnt++; nbp = allocb(size+n); nbp->rp += size; nbp->wp = nbp->rp; memmove(nbp->wp, bp->rp, n); nbp->wp += n; freeb(bp); nbp->rp -= size; } else { size = -size; if(bp->next) panic("padblock 0x%luX", getcallerpc(&bp)); if(bp->lim - bp->wp >= size) return bp; n = BLEN(bp); padblockcnt++; nbp = allocb(size+n); memmove(nbp->wp, bp->rp, n); nbp->wp += n; freeb(bp); } QDEBUG checkb(nbp, "padblock 1"); return nbp;}/* * return count of bytes in a string of blocks */intblocklen(Block *bp){ int len; len = 0; while(bp) { len += BLEN(bp); bp = bp->next; } return len;}/* * return count of space in blocks */intblockalloclen(Block *bp){ int len; len = 0; while(bp) { len += BALLOC(bp); bp = bp->next; } return len;}/* * copy the string of blocks into * a single block and free the string */Block*concatblock(Block *bp){ int len; Block *nb, *f; if(bp->next == 0) return bp; nb = allocb(blocklen(bp)); for(f = bp; f; f = f->next) { len = BLEN(f); memmove(nb->wp, f->rp, len); nb->wp += len; } concatblockcnt += BLEN(nb); freeblist(bp); QDEBUG checkb(nb, "concatblock 1"); return nb;}/* * make sure the first block has at least n bytes */Block*pullupblock(Block *bp, int n){ int i; Block *nbp; /* * this should almost always be true, it's * just to avoid every caller checking. */ if(BLEN(bp) >= n) return bp; /* * if not enough room in the first block, * add another to the front of the list. */ if(bp->lim - bp->rp < n){ nbp = allocb(n); nbp->next = bp; bp = nbp; } /* * copy bytes from the trailing blocks into the first */ n -= BLEN(bp); while((nbp = bp->next)){ i = BLEN(nbp); if(i > n) { memmove(bp->wp, nbp->rp, n); pullupblockcnt++; bp->wp += n; nbp->rp += n; QDEBUG checkb(bp, "pullupblock 1"); return bp; } else { /* shouldn't happen but why crash if it does */ if(i < 0){ print("pullup negative length packet, called from 0x%p\n", getcallerpc(&bp)); i = 0; } memmove(bp->wp, nbp->rp, i); pullupblockcnt++; bp->wp += i; bp->next = nbp->next; nbp->next = 0; freeb(nbp); n -= i; if(n == 0){ QDEBUG checkb(bp, "pullupblock 2"); return bp; } } } freeb(bp); return 0;}/* * make sure the first block has at least n bytes */Block*pullupqueue(Queue *q, int n){ Block *b; if(BLEN(q->bfirst) >= n) return q->bfirst; q->bfirst = pullupblock(q->bfirst, n); for(b = q->bfirst; b != nil && b->next != nil; b = b->next) ; q->blast = b; return q->bfirst;}/* * trim to len bytes starting at offset */Block *trimblock(Block *bp, int offset, int len){ ulong l; Block *nb, *startb; QDEBUG checkb(bp, "trimblock 1"); if(blocklen(bp) < offset+len) { freeblist(bp); return nil; } while((l = BLEN(bp)) < offset) { offset -= l; nb = bp->next; bp->next = nil; freeb(bp); bp = nb; } startb = bp; bp->rp += offset; while((l = BLEN(bp)) < len) { len -= l; bp = bp->next; } bp->wp -= (BLEN(bp) - len); if(bp->next) { freeblist(bp->next); bp->next = nil; } return startb;}/* * copy 'count' bytes into a new block */Block*copyblock(Block *bp, int count){ int l; Block *nbp; QDEBUG checkb(bp, "copyblock 0"); nbp = allocb(count); for(; count > 0 && bp != 0; bp = bp->next){ l = BLEN(bp); if(l > count) l = count; memmove(nbp->wp, bp->rp, l); nbp->wp += l; count -= l; } if(count > 0){ memset(nbp->wp, 0, count); nbp->wp += count; } copyblockcnt++; QDEBUG checkb(nbp, "copyblock 1"); return nbp;}Block*adjustblock(Block* bp, int len){ int n; Block *nbp; if(len < 0){ freeb(bp); return nil; } if(bp->rp+len > bp->lim){ nbp = copyblock(bp, len); freeblist(bp); QDEBUG checkb(nbp, "adjustblock 1"); return nbp; } n = BLEN(bp); if(len > n) memset(bp->wp, 0, len-n); bp->wp = bp->rp+len; QDEBUG checkb(bp, "adjustblock 2"); return bp;}/* * throw away up to count bytes from a * list of blocks. Return count of bytes * thrown away. */intpullblock(Block **bph, int count){ Block *bp; int n, bytes; bytes = 0; if(bph == nil) return 0; while(*bph != nil && count != 0) { bp = *bph; n = BLEN(bp); if(count < n) n = count; bytes += n; count -= n; bp->rp += n; QDEBUG checkb(bp, "pullblock "); if(BLEN(bp) == 0) { *bph = bp->next; bp->next = nil; freeb(bp); } } return bytes;}/* * get next block from a queue, return null if nothing there */Block*qget(Queue *q){ int dowakeup; Block *b; /* sync with qwrite */ ilock(&q->lk); b = q->bfirst; if(b == nil){ q->state |= Qstarve; iunlock(&q->lk); return nil; } q->bfirst = b->next; b->next = 0; q->len -= BALLOC(b); q->dlen -= BLEN(b); QDEBUG checkb(b, "qget"); /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; iunlock(&q->lk); if(dowakeup) wakeup(&q->wr); return b;}/* * throw away the next 'len' bytes in the queue */intqdiscard(Queue *q, int len){ Block *b; int dowakeup, n, sofar; ilock(&q->lk); for(sofar = 0; sofar < len; sofar += n){ b = q->bfirst; if(b == nil) break; QDEBUG checkb(b, "qdiscard"); n = BLEN(b); if(n <= len - sofar){ q->bfirst = b->next; b->next = 0; q->len -= BALLOC(b); q->dlen -= BLEN(b); freeb(b); } else { n = len - sofar; b->rp += n; q->dlen -= n; } } /* * if writer flow controlled, restart * * This used to be * q->len < q->limit/2 * but it slows down tcp too much for certain write sizes. * I really don't understand it completely. It may be * due to the queue draining so fast that the transmission * stalls waiting for the app to produce more data. - presotto */ if((q->state & Qflow) && q->len < q->limit){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; iunlock(&q->lk); if(dowakeup) wakeup(&q->wr); return sofar;}/* * Interrupt level copy out of a queue, return # bytes copied. */intqconsume(Queue *q, void *vp, int len){ Block *b; int n, dowakeup; uchar *p = vp; Block *tofree = nil; /* sync with qwrite */ ilock(&q->lk); for(;;) { b = q->bfirst; if(b == 0){ q->state |= Qstarve; iunlock(&q->lk); return -1; } QDEBUG checkb(b, "qconsume 1"); n = BLEN(b); if(n > 0) break; q->bfirst = b->next; q->len -= BALLOC(b); /* remember to free this */ b->next = tofree; tofree = b; }; if(n < len) len = n; memmove(p, b->rp, len); consumecnt += n; b->rp += len; q->dlen -= len; /* discard the block if we're done with it */ if((q->state & Qmsg) || len == n){ q->bfirst = b->next; b->next = 0; q->len -= BALLOC(b); q->dlen -= BLEN(b); /* remember to free this */ b->next = tofree; tofree = b; } /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; iunlock(&q->lk); if(dowakeup) wakeup(&q->wr); if(tofree != nil) freeblist(tofree); return len;}intqpass(Queue *q, Block *b){ int dlen, len, dowakeup; /* sync with qread */ dowakeup = 0; ilock(&q->lk); if(q->len >= q->limit){ freeblist(b); iunlock(&q->lk); return -1; } if(q->state & Qclosed){ len = BALLOC(b); freeblist(b); iunlock(&q->lk); return len; } /* add buffer to queue */ if(q->bfirst) q->blast->next = b; else q->bfirst = b; len = BALLOC(b); dlen = BLEN(b); QDEBUG checkb(b, "qpass"); while(b->next){ b = b->next; QDEBUG checkb(b, "qpass"); len += BALLOC(b); dlen += BLEN(b); } q->blast = b; q->len += len; q->dlen += dlen; if(q->len >= q->limit/2) q->state |= Qflow; if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } iunlock(&q->lk); if(dowakeup) wakeup(&q->rr); return len;}intqpassnolim(Queue *q, Block *b){ int dlen, len, dowakeup; /* sync with qread */ dowakeup = 0; ilock(&q->lk); if(q->state & Qclosed){ freeblist(b); iunlock(&q->lk); return BALLOC(b); } /* add buffer to queue */ if(q->bfirst) q->blast->next = b; else q->bfirst = b; len = BALLOC(b); dlen = BLEN(b); QDEBUG checkb(b, "qpass"); while(b->next){ b = b->next; QDEBUG checkb(b, "qpass"); len += BALLOC(b); dlen += BLEN(b); } q->blast = b; q->len += len; q->dlen += dlen; if(q->len >= q->limit/2) q->state |= Qflow; if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } iunlock(&q->lk); if(dowakeup) wakeup(&q->rr); return len;}/* * if the allocated space is way out of line with the used * space, reallocate to a smaller block */Block*packblock(Block *bp){ Block **l, *nbp; int n; for(l = &bp; *l; l = &(*l)->next){ nbp = *l; n = BLEN(nbp); if((n<<2) < BALLOC(nbp)){ *l = allocb(n); memmove((*l)->wp, nbp->rp, n); (*l)->wp += n; (*l)->next = nbp->next; freeb(nbp); } } return bp;}intqproduce(Queue *q, void *vp, int len){ Block *b; int dowakeup; uchar *p = vp; /* sync with qread */ dowakeup = 0; ilock(&q->lk); /* no waiting receivers, room in buffer? */ if(q->len >= q->limit){ q->state |= Qflow; iunlock(&q->lk); return -1; } /* save in buffer */ b = iallocb(len); if(b == 0){ iunlock(&q->lk); return 0; } memmove(b->wp, p, len); producecnt += len; b->wp += len; if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; /* b->next = 0; done by iallocb() */ q->len += BALLOC(b); q->dlen += BLEN(b); QDEBUG checkb(b, "qproduce"); if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } if(q->len >= q->limit) q->state |= Qflow; iunlock(&q->lk); if(dowakeup) wakeup(&q->rr); return len;}/* * copy from offset in the queue */Block*qcopy(Queue *q, int len, ulong offset){ int sofar; int n; Block *b, *nb; uchar *p; nb = allocb(len); ilock(&q->lk); /* go to offset */ b = q->bfirst; for(sofar = 0; ; sofar += n){ if(b == nil){ iunlock(&q->lk); return nb; } n = BLEN(b); if(sofar + n > offset){ p = b->rp + offset - sofar; n -= offset - sofar; break; } QDEBUG checkb(b, "qcopy");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -