📄 qio.c
字号:
* space, reallocate to a smaller block */Block*packblock(Block *bp){ Block **l, *nbp; int n; for(l = &bp; *l; l = &(*l)->next){ nbp = *l; n = BLEN(nbp); if((n<<2) < BALLOC(nbp)){ *l = allocb(n); memmove((*l)->wp, nbp->rp, n); (*l)->wp += n; (*l)->next = nbp->next; freeb(nbp); } } return bp;}intqproduce(Queue *q, void *vp, int len){ Block *b; int dowakeup; uchar *p = vp; /* sync with qread */ dowakeup = 0; ilock(q); /* no waiting receivers, room in buffer? */ if(q->len >= q->limit){ q->state |= Qflow; iunlock(q); return -1; } /* save in buffer */ b = iallocb(len); if(b == 0){ iunlock(q); return 0; } memmove(b->wp, p, len); producecnt += len; b->wp += len; if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; /* b->next = 0; done by iallocb() */ q->len += BALLOC(b); q->dlen += BLEN(b); QDEBUG checkb(b, "qproduce"); if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } if(q->len >= q->limit) q->state |= Qflow; iunlock(q); if(dowakeup) wakeup(&q->rr); return len;}/* * copy from offset in the queue */Block*qcopy(Queue *q, int len, ulong offset){ int sofar; int n; Block *b, *nb; uchar *p; nb = allocb(len); ilock(q); /* go to offset */ b = q->bfirst; for(sofar = 0; ; sofar += n){ if(b == nil){ iunlock(q); return nb; } n = BLEN(b); if(sofar + n > offset){ p = b->rp + offset - sofar; n -= offset - sofar; break; } QDEBUG checkb(b, "qcopy"); b = b->next; } /* copy bytes from there */ for(sofar = 0; sofar < len;){ if(n > len - sofar) n = len - sofar; memmove(nb->wp, p, n); qcopycnt += n; sofar += n; nb->wp += n; b = b->next; if(b == nil) break; n = BLEN(b); p = b->rp; } iunlock(q); return nb;}/* * called by non-interrupt code */Queue*qopen(int limit, int msg, void (*kick)(void*), void *arg){ Queue *q; q = malloc(sizeof(Queue)); if(q == 0) return 0; ilock(q); q->limit = q->inilim = limit; q->kick = kick; q->arg = arg; q->state = msg ? Qmsg : 0; q->state |= Qstarve; q->eof = 0; q->noblock = 0; iunlock(q); return q;}static intnotempty(void *a){ Queue *q = a; return (q->state & Qclosed) || q->bfirst != 0;}/* * get next block from a queue (up to a limit) */Block*qbread(Queue *q, int len){ Block *b, *nb; int n, dowakeup; qlock(&q->rlock); if(waserror()){ qunlock(&q->rlock); nexterror(); } /* wait for data */ for(;;){ /* sync with qwrite/qproduce */ ilock(q); b = q->bfirst; if(b){ QDEBUG checkb(b, "qbread 0"); break; } if(q->state & Qclosed){ iunlock(q); poperror(); qunlock(&q->rlock); if(++q->eof > 3) error(q->err); return 0; } q->state |= Qstarve; /* flag requesting producer to wake me */ iunlock(q); sleep(&q->rr, notempty, q); } /* remove a buffered block */ q->bfirst = b->next; b->next = 0; n = BLEN(b); q->dlen -= n; q->len -= BALLOC(b); QDEBUG checkb(b, "qbread 1"); /* split block if it's too big and this is not a message-oriented queue */ nb = b; if(n > len){ if((q->state&Qmsg) == 0){ iunlock(q); n -= len; b = allocb(n); memmove(b->wp, nb->rp+len, n); b->wp += n; ilock(q); b->next = q->bfirst; if(q->bfirst == 0) q->blast = b; q->bfirst = b; q->len += BALLOC(b); q->dlen += n; } nb->wp = nb->rp + len; } /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; iunlock(q); /* wakeup flow controlled writers */ if(dowakeup){ if(q->kick) q->kick(q->arg); wakeup(&q->wr); } poperror(); qunlock(&q->rlock); return nb;}/* * read a queue. if no data is queued, post a Block * and wait on its Rendez. */longqread(Queue *q, void *vp, int len){ Block *b; b = qbread(q, len); if(b == 0) return 0; len = BLEN(b); memmove(vp, b->rp, len); freeb(b); return len;}static intqnotfull(void *a){ Queue *q = a; return q->len < q->limit || (q->state & Qclosed);}/* * add a block to a queue obeying flow control */longqbwrite(Queue *q, Block *b){ int n, dowakeup; dowakeup = 0; n = BLEN(b); qlock(&q->wlock); if(waserror()){ qunlock(&q->wlock); nexterror(); } /* flow control */ for(;;){ ilock(q); if(q->state & Qclosed){ iunlock(q); freeb(b); error(q->err); } if(q->len < q->limit) break; if(q->noblock){ iunlock(q); freeb(b); qunlock(&q->wlock); poperror(); return n; } q->state |= Qflow; iunlock(q); sleep(&q->wr, qnotfull, q); } if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; b->next = 0; q->len += BALLOC(b); q->dlen += n; QDEBUG checkb(b, "qbwrite"); if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } iunlock(q); if(dowakeup){ if(q->kick) q->kick(q->arg); wakeup(&q->rr); } qunlock(&q->wlock); poperror(); return n;}/* * write to a queue. only Maxatomic bytes at a time is atomic. */intqwrite(Queue *q, void *vp, int len){ int n, sofar; Block *b; uchar *p = vp; QDEBUG if(islo() == 0) print("qwrite hi %lux\n", getcallerpc(&q)); sofar = 0; do { n = len-sofar; if(n > Maxatomic) n = Maxatomic; b = allocb(n); setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); if(waserror()){ freeb(b); nexterror(); } memmove(b->wp, p+sofar, n); poperror(); b->wp += n; qbwrite(q, b); sofar += n; } while(sofar < len && (q->state & Qmsg) == 0); return len;}/* * used by print() to write to a queue. Since we may be splhi or not in * a process, don't qlock. */intqiwrite(Queue *q, void *vp, int len){ int n, sofar, dowakeup; Block *b; uchar *p = vp; dowakeup = 0; sofar = 0; do { n = len-sofar; if(n > Maxatomic) n = Maxatomic; b = allocb(n); memmove(b->wp, p+sofar, n); b->wp += n; ilock(q); QDEBUG checkb(b, "qiwrite"); if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; q->len += BALLOC(b); q->dlen += n; if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } iunlock(q); if(dowakeup){ if(q->kick) q->kick(q->arg); wakeup(&q->rr); } sofar += n; } while(sofar < len && (q->state & Qmsg) == 0); return len;}/* * be extremely careful when calling this, * as there is no reference accounting */voidqfree(Queue *q){ qclose(q); free(q);}/* * Mark a queue as closed. No further IO is permitted. * All blocks are released. */voidqclose(Queue *q){ Block *bfirst; if(q == nil) return; /* mark it */ ilock(q); q->state |= Qclosed; q->state &= ~(Qflow|Qstarve); strcpy(q->err, Ehungup); bfirst = q->bfirst; q->bfirst = 0; q->len = 0; q->dlen = 0; q->noblock = 0; iunlock(q); /* free queued blocks */ freeblist(bfirst); /* wake up readers/writers */ wakeup(&q->rr); wakeup(&q->wr);}/* * Mark a queue as closed. Wakeup any readers. Don't remove queued * blocks. */voidqhangup(Queue *q, char *msg){ /* mark it */ ilock(q); q->state |= Qclosed; if(msg == 0 || *msg == 0) strcpy(q->err, Ehungup); else strncpy(q->err, msg, ERRLEN-1); iunlock(q); /* wake up readers/writers */ wakeup(&q->rr); wakeup(&q->wr);}/* * return non-zero if the q is hungup */intqisclosed(Queue *q){ return q->state & Qclosed;}/* * mark a queue as no longer hung up */voidqreopen(Queue *q){ ilock(q); q->state &= ~Qclosed; q->state |= Qstarve; q->eof = 0; q->limit = q->inilim; iunlock(q);}/* * return bytes queued */intqlen(Queue *q){ return q->dlen;}/* * return space remaining before flow control */intqwindow(Queue *q){ int l; l = q->limit - q->len; if(l < 0) l = 0; return l;}/* * return true if we can read without blocking */intqcanread(Queue *q){ return q->bfirst!=0;}/* * change queue limit */voidqsetlimit(Queue *q, int limit){ q->limit = limit;}/* * set blocking/nonblocking */voidqnoblock(Queue *q, int onoff){ q->noblock = onoff;}/* * flush the output queue */voidqflush(Queue *q){ Block *bfirst; /* mark it */ ilock(q); bfirst = q->bfirst; q->bfirst = 0; q->len = 0; q->dlen = 0; iunlock(q); /* free queued blocks */ freeblist(bfirst); /* wake up readers/writers */ wakeup(&q->wr);}intqfull(Queue *q){ return q->state & Qflow;}intqstate(Queue *q){ return q->state;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -