📄 channel.c
字号:
#include <u.h>#include <libc.h>#include <thread.h>#include "threadimpl.h"static Lock chanlock; /* central channel access lock */static void enqueue(Alt*, Channel**);static void dequeue(Alt*);static int canexec(Alt*);static int altexec(Alt*, int);static void_chanfree(Channel *c){ int i, inuse; inuse = 0; for(i = 0; i < c->nentry; i++) if(c->qentry[i]) inuse = 1; if(inuse) c->freed = 1; else{ if(c->qentry) free(c->qentry); free(c); }}voidchanfree(Channel *c){ lock(&chanlock); _chanfree(c); unlock(&chanlock);}intchaninit(Channel *c, int elemsize, int elemcnt){ if(elemcnt < 0 || elemsize <= 0 || c == nil) return -1; c->f = 0; c->n = 0; c->freed = 0; c->e = elemsize; c->s = elemcnt; _threaddebug(DBGCHAN, "chaninit %p", c); return 1;}Channel*chancreate(int elemsize, int elemcnt){ Channel *c; if(elemcnt < 0 || elemsize <= 0) return nil; c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1); c->e = elemsize; c->s = elemcnt; _threaddebug(DBGCHAN, "chancreate %p", c); return c;}intalt(Alt *alts){ Alt *a, *xa; Channel volatile *c; int n, s; void* r; Thread *t; /* * The point of going splhi here is that note handlers * might reasonably want to use channel operations, * but that will hang if the note comes while we hold the * chanlock. Instead, we delay the note until we've dropped * the lock. */ t = _threadgetproc()->thread; if(t->moribund || _threadexitsallstatus) yield(); /* won't return */ s = _procsplhi(); lock(&chanlock); t->alt = alts; t->chan = Chanalt; /* test whether any channels can proceed */ n = 0; a = nil; for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){ xa->entryno = -1; if(xa->op == CHANNOP) continue; c = xa->c; if(c==nil){ unlock(&chanlock); _procsplx(s); t->chan = Channone; return -1; } if(canexec(xa)) if(nrand(++n) == 0) a = xa; } if(a==nil){ /* nothing can proceed */ if(xa->op == CHANNOBLK){ unlock(&chanlock); _procsplx(s); t->chan = Channone; return xa - alts; } /* enqueue on all channels. */ c = nil; for(xa=alts; xa->op!=CHANEND; xa++){ if(xa->op==CHANNOP) continue; enqueue(xa, &c); } /* * wait for successful rendezvous. * we can't just give up if the rendezvous * is interrupted -- someone else might come * along and try to rendezvous with us, so * we need to be here. */ Again: unlock(&chanlock); _procsplx(s); r = _threadrendezvous(&c, 0); s = _procsplhi(); lock(&chanlock); if(r==(void*)~0){ /* interrupted */ if(c!=nil) /* someone will meet us; go back */ goto Again; c = (Channel*)~0; /* so no one tries to meet us */ } /* dequeue from channels, find selected one */ a = nil; for(xa=alts; xa->op!=CHANEND; xa++){ if(xa->op==CHANNOP) continue; if(xa->c == c) a = xa; dequeue(xa); } unlock(&chanlock); _procsplx(s); if(a == nil){ /* we were interrupted */ assert(c==(Channel*)~0); return -1; } }else{ altexec(a, s); /* unlocks chanlock, does splx */ } _sched(); t->chan = Channone; return a - alts;}static intrunop(int op, Channel *c, void *v, int nb){ int r; Alt a[2]; /* * we could do this without calling alt, * but the only reason would be performance, * and i'm not convinced it matters. */ a[0].op = op; a[0].c = c; a[0].v = v; a[1].op = CHANEND; if(nb) a[1].op = CHANNOBLK; switch(r=alt(a)){ case -1: /* interrupted */ return -1; case 1: /* nonblocking, didn't accomplish anything */ assert(nb); return 0; case 0: return 1; default: fprint(2, "ERROR: channel alt returned %d\n", r); abort(); return -1; }}intrecv(Channel *c, void *v){ return runop(CHANRCV, c, v, 0);}intnbrecv(Channel *c, void *v){ return runop(CHANRCV, c, v, 1);}intsend(Channel *c, void *v){ return runop(CHANSND, c, v, 0);}intnbsend(Channel *c, void *v){ return runop(CHANSND, c, v, 1);}static voidchannelsize(Channel *c, int sz){ if(c->e != sz){ fprint(2, "expected channel with elements of size %d, got size %d", sz, c->e); abort(); }}intsendul(Channel *c, ulong v){ channelsize(c, sizeof(ulong)); return send(c, &v);}ulongrecvul(Channel *c){ ulong v; channelsize(c, sizeof(ulong)); if(recv(c, &v) < 0) return ~0; return v;}intsendp(Channel *c, void *v){ channelsize(c, sizeof(void*)); return send(c, &v);}void*recvp(Channel *c){ void *v; channelsize(c, sizeof(void*)); if(recv(c, &v) < 0) return nil; return v;}intnbsendul(Channel *c, ulong v){ channelsize(c, sizeof(ulong)); return nbsend(c, &v);}ulongnbrecvul(Channel *c){ ulong v; channelsize(c, sizeof(ulong)); if(nbrecv(c, &v) == 0) return 0; return v;}intnbsendp(Channel *c, void *v){ channelsize(c, sizeof(void*)); return nbsend(c, &v);}void*nbrecvp(Channel *c){ void *v; channelsize(c, sizeof(void*)); if(nbrecv(c, &v) == 0) return nil; return v;}static intemptyentry(Channel *c){ int i, extra; assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry)); for(i=0; i<c->nentry; i++) if(c->qentry[i]==nil) return i; extra = 16; c->nentry += extra; c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0])); if(c->qentry == nil) sysfatal("realloc channel entries: %r"); memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0])); return i;}static voidenqueue(Alt *a, Channel **c){ int i; _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c); a->tag = c; i = emptyentry(a->c); a->c->qentry[i] = a;}static voiddequeue(Alt *a){ int i; Channel *c; c = a->c; for(i=0; i<c->nentry; i++) if(c->qentry[i]==a){ _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c); c->qentry[i] = nil; if(c->freed) _chanfree(c); return; }}static intcanexec(Alt *a){ int i, otherop; Channel *c; c = a->c; /* are there senders or receivers blocked? */ otherop = (CHANSND+CHANRCV) - a->op; for(i=0; i<c->nentry; i++) if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){ _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c); return 1; } /* is there room in the channel? */ if((a->op==CHANSND && c->n < c->s) || (a->op==CHANRCV && c->n > 0)){ _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c); return 1; } return 0;}static void*altexecbuffered(Alt *a, int willreplace){ uchar *v; Channel *c; c = a->c; /* use buffered channel queue */ if(a->op==CHANRCV && c->n > 0){ _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c); v = c->v + c->e*(c->f%c->s); if(!willreplace) c->n--; c->f++; return v; } if(a->op==CHANSND && c->n < c->s){ _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c); v = c->v + c->e*((c->f+c->n)%c->s); if(!willreplace) c->n++; return v; } abort(); return nil;}static voidaltcopy(void *dst, void *src, int sz){ if(dst){ if(src) memmove(dst, src, sz); else memset(dst, 0, sz); }}static intaltexec(Alt *a, int spl){ volatile Alt *b; int i, n, otherop; Channel *c; void *me, *waiter, *buf; c = a->c; /* rendezvous with others */ otherop = (CHANSND+CHANRCV) - a->op; n = 0; b = nil; me = a->v; for(i=0; i<c->nentry; i++) if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil) if(nrand(++n) == 0) b = c->qentry[i]; if(b != nil){ _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b); waiter = b->v; if(c->s && c->n){ /* * if buffer is full and there are waiters * and we're meeting a waiter, * we must be receiving. * * we use the value in the channel buffer, * copy the waiter's value into the channel buffer * on behalf of the waiter, and then wake the waiter. */ if(a->op!=CHANRCV) abort(); buf = altexecbuffered(a, 1); altcopy(me, buf, c->e); altcopy(buf, waiter, c->e); }else{ if(a->op==CHANRCV) altcopy(me, waiter, c->e); else altcopy(waiter, me, c->e); } *b->tag = c; /* commits us to rendezvous */ _threaddebug(DBGCHAN, "unlocking the chanlock"); unlock(&chanlock); _procsplx(spl); _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock); while(_threadrendezvous(b->tag, 0) == (void*)~0) ; return 1; } buf = altexecbuffered(a, 0); if(a->op==CHANRCV) altcopy(me, buf, c->e); else altcopy(buf, me, c->e); unlock(&chanlock); _procsplx(spl); return 1;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -