📄 qio.c
字号:
b = b->next; } /* copy bytes from there */ for(sofar = 0; sofar < len;){ if(n > len - sofar) n = len - sofar; memmove(nb->wp, p, n); qcopycnt += n; sofar += n; nb->wp += n; b = b->next; if(b == nil) break; n = BLEN(b); p = b->rp; } iunlock(&q->lk); return nb;}/* * called by non-interrupt code */Queue*qopen(int limit, int msg, void (*kick)(void*), void *arg){ Queue *q; q = malloc(sizeof(Queue)); if(q == 0) return 0; q->limit = q->inilim = limit; q->kick = kick; q->arg = arg; q->state = msg; q->state |= Qstarve; q->eof = 0; q->noblock = 0; return q;}/* open a queue to be bypassed */Queue*qbypass(void (*bypass)(void*, Block*), void *arg){ Queue *q; q = malloc(sizeof(Queue)); if(q == 0) return 0; q->limit = 0; q->arg = arg; q->bypass = bypass; q->state = 0; return q;}static intnotempty(void *a){ Queue *q = a; return (q->state & Qclosed) || q->bfirst != 0;}/* * wait for the queue to be non-empty or closed. * called with q ilocked. */static intqwait(Queue *q){ /* wait for data */ for(;;){ if(q->bfirst != nil) break; if(q->state & Qclosed){ if(++q->eof > 3) return -1; if(*q->err && strcmp(q->err, Ehungup) != 0) return -1; return 0; } q->state |= Qstarve; /* flag requesting producer to wake me */ iunlock(&q->lk); sleep(&q->rr, notempty, q); ilock(&q->lk); } return 1;}/* * add a block list to a queue */voidqaddlist(Queue *q, Block *b){ /* queue the block */ if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->len += blockalloclen(b); q->dlen += blocklen(b); while(b->next) b = b->next; q->blast = b;}/* * called with q ilocked */Block*qremove(Queue *q){ Block *b; b = q->bfirst; if(b == nil) return nil; q->bfirst = b->next; b->next = nil; q->dlen -= BLEN(b); q->len -= BALLOC(b); QDEBUG checkb(b, "qremove"); return b;}/* * copy the contents of a string of blocks into * memory. emptied blocks are freed. return * pointer to first unconsumed block. */Block*bl2mem(uchar *p, Block *b, int n){ int i; Block *next; for(; b != nil; b = next){ i = BLEN(b); if(i > n){ memmove(p, b->rp, n); b->rp += n; return b; } memmove(p, b->rp, i); n -= i; p += i; b->rp += i; next = b->next; freeb(b); } return nil;}/* * copy the contents of memory into a string of blocks. * return nil on error. */Block*mem2bl(uchar *p, int len){ int n; Block *b, *first, **l; first = nil; l = &first; if(waserror()){ freeblist(first); nexterror(); } do { n = len; if(n > Maxatomic) n = Maxatomic; *l = b = allocb(n); memmove(b->wp, p, n); b->wp += n; p += n; len -= n; l = &b->next; } while(len > 0); poperror(); return first;}/* * put a block back to the front of the queue * called with q ilocked */voidqputback(Queue *q, Block *b){ b->next = q->bfirst; if(q->bfirst == nil) q->blast = b; q->bfirst = b; q->len += BALLOC(b); q->dlen += BLEN(b);}/* * flow control, get producer going again * called with q ilocked */static voidqwakeup_iunlock(Queue *q){ int dowakeup = 0; /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } iunlock(&q->lk); /* wakeup flow controlled writers */ if(dowakeup){ if(q->kick) q->kick(q->arg); wakeup(&q->wr); }}/* * get next block from a queue (up to a limit) */Block*qbread(Queue *q, int len){ Block *b, *nb; int n; qlock(&q->rlock); if(waserror()){ qunlock(&q->rlock); nexterror(); } ilock(&q->lk); switch(qwait(q)){ case 0: /* queue closed */ iunlock(&q->lk); qunlock(&q->rlock); poperror(); return nil; case -1: /* multiple reads on a closed queue */ iunlock(&q->lk); error(q->err); } /* if we get here, there's at least one block in the queue */ b = qremove(q); n = BLEN(b); /* split block if it's too big and this is not a message queue */ nb = b; if(n > len){ if((q->state&Qmsg) == 0){ n -= len; b = allocb(n); memmove(b->wp, nb->rp+len, n); b->wp += n; qputback(q, b); } nb->wp = nb->rp + len; } /* restart producer */ qwakeup_iunlock(q); poperror(); qunlock(&q->rlock); return nb;}/* * read a queue. if no data is queued, post a Block * and wait on its Rendez. */longqread(Queue *q, void *vp, int len){ Block *b, *first, **l; int m, n; qlock(&q->rlock); if(waserror()){ qunlock(&q->rlock); nexterror(); } ilock(&q->lk);again: switch(qwait(q)){ case 0: /* queue closed */ iunlock(&q->lk); qunlock(&q->rlock); poperror(); return 0; case -1: /* multiple reads on a closed queue */ iunlock(&q->lk); error(q->err); } /* if we get here, there's at least one block in the queue */ if(q->state & Qcoalesce){ /* when coalescing, 0 length blocks just go away */ b = q->bfirst; if(BLEN(b) <= 0){ freeb(qremove(q)); goto again; } /* grab the first block plus as many * following blocks as will completely * fit in the read. */ n = 0; l = &first; m = BLEN(b); for(;;) { *l = qremove(q); l = &b->next; n += m; b = q->bfirst; if(b == nil) break; m = BLEN(b); if(n+m > len) break; } } else { first = qremove(q); n = BLEN(first); } /* copy to user space outside of the ilock */ iunlock(&q->lk); b = bl2mem(vp, first, len); ilock(&q->lk); /* take care of any left over partial block */ if(b != nil){ n -= BLEN(b); if(q->state & Qmsg) freeb(b); else qputback(q, b); } /* restart producer */ qwakeup_iunlock(q); poperror(); qunlock(&q->rlock); return n;}static intqnotfull(void *a){ Queue *q = a; return q->len < q->limit || (q->state & Qclosed);}ulong noblockcnt;/* * add a block to a queue obeying flow control */longqbwrite(Queue *q, Block *b){ int n, dowakeup; Proc *p; n = BLEN(b); if(q->bypass){ (*q->bypass)(q->arg, b); return n; } dowakeup = 0; qlock(&q->wlock); if(waserror()){ if(b != nil) freeb(b); qunlock(&q->wlock); nexterror(); } ilock(&q->lk); /* give up if the queue is closed */ if(q->state & Qclosed){ iunlock(&q->lk); error(q->err); } /* if nonblocking, don't queue over the limit */ if(q->len >= q->limit){ if(q->noblock){ iunlock(&q->lk); freeb(b); noblockcnt += n; qunlock(&q->wlock); poperror(); return n; } } /* queue the block */ if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; b->next = 0; q->len += BALLOC(b); q->dlen += n; QDEBUG checkb(b, "qbwrite"); b = nil; /* make sure other end gets awakened */ if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } iunlock(&q->lk); /* get output going again */ if(q->kick && (dowakeup || (q->state&Qkick))) q->kick(q->arg); /* wakeup anyone consuming at the other end */ if(dowakeup){ p = wakeup(&q->rr); /* if we just wokeup a higher priority process, let it run */ if(p != nil && p->priority > up->priority) sched(); } /* * flow control, wait for queue to get below the limit * before allowing the process to continue and queue * more. We do this here so that postnote can only * interrupt us after the data has been queued. This * means that things like 9p flushes and ssl messages * will not be disrupted by software interrupts. * * Note - this is moderately dangerous since a process * that keeps getting interrupted and rewriting will * queue infinite crud. */ for(;;){ if(q->noblock || qnotfull(q)) break; ilock(&q->lk); q->state |= Qflow; iunlock(&q->lk); sleep(&q->wr, qnotfull, q); } USED(b); qunlock(&q->wlock); poperror(); return n;}/* * write to a queue. only Maxatomic bytes at a time is atomic. */intqwrite(Queue *q, void *vp, int len){ int n, sofar; Block *b; uchar *p = vp; QDEBUG if(!islo()) print("qwrite hi %lux\n", getcallerpc(&q)); sofar = 0; do { n = len-sofar; if(n > Maxatomic) n = Maxatomic; b = allocb(n); if(waserror()){ freeb(b); nexterror(); } memmove(b->wp, p+sofar, n); poperror(); b->wp += n; qbwrite(q, b); sofar += n; } while(sofar < len && (q->state & Qmsg) == 0); return len;}/* * used by print() to write to a queue. Since we may be splhi or not in * a process, don't qlock. * * this routine merges adjacent blocks if block n+1 will fit into * the free space of block n. */intqiwrite(Queue *q, void *vp, int len){ int n, sofar, dowakeup; Block *b; uchar *p = vp; dowakeup = 0; sofar = 0; do { n = len-sofar; if(n > Maxatomic) n = Maxatomic; b = iallocb(n); if(b == nil) break; memmove(b->wp, p+sofar, n); b->wp += n; ilock(&q->lk); /* we use an artificially high limit for kernel prints since anything * over the limit gets dropped */ if(q->dlen >= 16*1024){ iunlock(&q->lk); freeb(b); break; } QDEBUG checkb(b, "qiwrite"); if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; q->len += BALLOC(b); q->dlen += n; if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } iunlock(&q->lk); if(dowakeup){ if(q->kick) q->kick(q->arg); wakeup(&q->rr); } sofar += n; } while(sofar < len && (q->state & Qmsg) == 0); return sofar;}/* * be extremely careful when calling this, * as there is no reference accounting */voidqfree(Queue *q){ qclose(q); free(q);}/* * Mark a queue as closed. No further IO is permitted. * All blocks are released. */voidqclose(Queue *q){ Block *bfirst; if(q == nil) return; /* mark it */ ilock(&q->lk); q->state |= Qclosed; q->state &= ~(Qflow|Qstarve); strcpy(q->err, Ehungup); bfirst = q->bfirst; q->bfirst = 0; q->len = 0; q->dlen = 0; q->noblock = 0; iunlock(&q->lk); /* free queued blocks */ freeblist(bfirst); /* wake up readers/writers */ wakeup(&q->rr); wakeup(&q->wr);}/* * Mark a queue as closed. Wakeup any readers. Don't remove queued * blocks. */voidqhangup(Queue *q, char *msg){ /* mark it */ ilock(&q->lk); q->state |= Qclosed; if(msg == 0 || *msg == 0) strcpy(q->err, Ehungup); else strncpy(q->err, msg, ERRMAX-1); iunlock(&q->lk); /* wake up readers/writers */ wakeup(&q->rr); wakeup(&q->wr);}/* * return non-zero if the q is hungup */intqisclosed(Queue *q){ return q->state & Qclosed;}/* * mark a queue as no longer hung up */voidqreopen(Queue *q){ ilock(&q->lk); q->state &= ~Qclosed; q->state |= Qstarve; q->eof = 0; q->limit = q->inilim; iunlock(&q->lk);}/* * return bytes queued */intqlen(Queue *q){ return q->dlen;}/* * return space remaining before flow control */intqwindow(Queue *q){ int l; l = q->limit - q->len; if(l < 0) l = 0; return l;}/* * return true if we can read without blocking */intqcanread(Queue *q){ return q->bfirst!=0;}/* * change queue limit */voidqsetlimit(Queue *q, int limit){ q->limit = limit;}/* * set blocking/nonblocking */voidqnoblock(Queue *q, int onoff){ q->noblock = onoff;}/* * flush the output queue */voidqflush(Queue *q){ Block *bfirst; /* mark it */ ilock(&q->lk); bfirst = q->bfirst; q->bfirst = 0; q->len = 0; q->dlen = 0; iunlock(&q->lk); /* free queued blocks */ freeblist(bfirst); /* wake up readers/writers */ wakeup(&q->wr);}intqfull(Queue *q){ return q->state & Qflow;}intqstate(Queue *q){ return q->state;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -