⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client.c

📁 这是一个同样来自贝尔实验室的和UNIX有着渊源的操作系统, 其简洁的设计和实现易于我们学习和理解
💻 C
字号:
/* * Sun RPC client. */#include <u.h>#include <libc.h>#include <thread.h>#include <sunrpc.h>typedef struct Out Out;struct Out{	char err[ERRMAX];	/* error string */	Channel *creply;	/* send to finish rpc */	uchar *p;			/* pending request packet */	int n;				/* size of request */	ulong tag;			/* flush tag of pending request */	ulong xid;			/* xid of pending request */	ulong st;			/* first send time */	ulong t;			/* resend time */	int nresend;		/* number of resends */	SunRpc rpc;		/* response rpc */};static voidudpThread(void *v){	uchar *p, *buf;	Ioproc *io;	int n;	SunClient *cli;	enum { BufSize = 65536 };	cli = v;	buf = emalloc(BufSize);	io = ioproc();	p = nil;	for(;;){		n = ioread(io, cli->fd, buf, BufSize);		if(n <= 0)			break;		p = emalloc(4+n);		memmove(p+4, buf, n);		p[0] = n>>24;		p[1] = n>>16;		p[2] = n>>8;		p[3] = n;		if(sendp(cli->readchan, p) == 0)			break;		p = nil;	}	free(p);	closeioproc(io);	while(send(cli->dying, nil) == -1)		;}static voidnetThread(void *v){	uchar *p, buf[4];	Ioproc *io;	uint n, tot;	int done;	SunClient *cli;	cli = v;	io = ioproc();	tot = 0;	p = nil;	for(;;){		n = ioreadn(io, cli->fd, buf, 4);		if(n != 4)			break;		n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];		if(cli->chatty)			fprint(2, "%.8ux...", n);		done = n&0x80000000;		n &= ~0x80000000;		if(tot == 0){			p = emalloc(4+n);			tot = 4;		}else			p = erealloc(p, tot+n);		if(ioreadn(io, cli->fd, p+tot, n) != n)			break;		tot += n;		if(done){			p[0] = tot>>24;			p[1] = tot>>16;			p[2] = tot>>8;			p[3] = tot;			if(sendp(cli->readchan, p) == 0)				break;			p = nil;			tot = 0;		}	}	free(p);	closeioproc(io);	while(send(cli->dying, 0) == -1)		;}static voidtimerThread(void *v){	Ioproc *io;	SunClient *cli;	cli = v;	io = ioproc();	for(;;){		if(iosleep(io, 200) < 0)			break;		if(sendul(cli->timerchan, 0) == 0)			break;	}	closeioproc(io);	while(send(cli->dying, 0) == -1)		;}static ulongmsec(void){	return nsec()/1000000;}static ulongtwait(ulong rtt, int nresend){	ulong t;	t = rtt;	if(nresend <= 1)		{}	else if(nresend <= 3)		t *= 2;	else if(nresend <= 18)		t <<= nresend-2;	else		t = 60*1000;	if(t > 60*1000)		t = 60*1000;	return t;}static voidrpcMuxThread(void *v){	uchar *buf, *p, *ep;	int i, n, nout, mout;	ulong t, xidgen, tag;	Alt a[5];	Out *o, **out;	SunRpc rpc;	SunClient *cli;	cli = v;	mout = 16;	nout = 0;	out = emalloc(mout*sizeof(out[0]));	xidgen = truerand();	a[0].op = CHANRCV;	a[0].c = cli->rpcchan;	a[0].v = &o;	a[1].op = CHANNOP;	a[1].c = cli->timerchan;	a[1].v = nil;	a[2].op = CHANRCV;	a[2].c = cli->flushchan;	a[2].v = &tag;	a[3].op = CHANRCV;	a[3].c = cli->readchan;	a[3].v = &buf;	a[4].op = CHANEND;	for(;;){		switch(alt(a)){		case 0:	/* o = <-rpcchan */			if(o == nil)				goto Done;			cli->nsend++;			/* set xid */			o->xid = ++xidgen;			if(cli->needcount)				p = o->p+4;			else				p = o->p;			p[0] = xidgen>>24;			p[1] = xidgen>>16;			p[2] = xidgen>>8;			p[3] = xidgen;			if(write(cli->fd, o->p, o->n) != o->n){				free(o->p);				o->p = nil;				snprint(o->err, sizeof o->err, "write: %r");				sendp(o->creply, 0);				break;			}			if(nout >= mout){				mout *= 2;				out = erealloc(out, mout*sizeof(out[0]));			}			o->st = msec();			o->nresend = 0;			o->t = o->st + twait(cli->rtt.avg, 0);if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);			out[nout++] = o;			a[1].op = CHANRCV;			break;		case 1:	/* <-timerchan */			t = msec();			for(i=0; i<nout; i++){				o = out[i];				if((int)(t - o->t) > 0){if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);					if(cli->maxwait && t - o->st >= cli->maxwait){						free(o->p);						o->p = nil;						strcpy(o->err, "timeout");						sendp(o->creply, 0);						out[i--] = out[--nout];						continue;					}					cli->nresend++;					o->nresend++;					o->t = t + twait(cli->rtt.avg, o->nresend);					if(write(cli->fd, o->p, o->n) != o->n){						free(o->p);						o->p = nil;						snprint(o->err, sizeof o->err, "rewrite: %r");						sendp(o->creply, 0);						out[i--] = out[--nout];						continue;					}				}			}			/* stop ticking if no work; rpcchan will turn it back on */			if(nout == 0)				a[1].op = CHANNOP;			break;					case 2:	/* tag = <-flushchan */			for(i=0; i<nout; i++){				o = out[i];				if(o->tag == tag){					out[i--] = out[--nout];					strcpy(o->err, "flushed");					free(o->p);					o->p = nil;					sendp(o->creply, 0);				}			}			break;		case 3:	/* buf = <-readchan */			p = buf;			n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];			p += 4;			ep = p+n;			if(sunRpcUnpack(p, ep, &p, &rpc) < 0){				fprint(2, "in: %.*H unpack failed\n", n, buf+4);				free(buf);				break;			}			if(cli->chatty)				fprint(2, "in: %B\n", &rpc);			if(rpc.iscall){				fprint(2, "did not get reply\n");				free(buf);				break;			}			o = nil;			for(i=0; i<nout; i++){				o = out[i];				if(o->xid == rpc.xid)					break;			}			if(i==nout){				if(cli->chatty) fprint(2, "did not find waiting request\n");				free(buf);				break;			}			out[i] = out[--nout];			free(o->p);			o->p = nil;			if(rpc.status == SunSuccess){				o->p = buf;				o->rpc = rpc;			}else{				o->p = nil;				free(buf);				sunErrstr(rpc.status);				rerrstr(o->err, sizeof o->err);			}			sendp(o->creply, 0);			break;		}	}Done:	free(out);	sendp(cli->dying, 0);}SunClient*sunDial(char *address){	int fd;	SunClient *cli;	if((fd = dial(address, 0, 0, 0)) < 0)		return nil;	cli = emalloc(sizeof(SunClient));	cli->fd = fd;	cli->maxwait = 15000;	cli->rtt.avg = 1000;	cli->dying = chancreate(sizeof(void*), 0);	cli->rpcchan = chancreate(sizeof(Out*), 0);	cli->timerchan = chancreate(sizeof(ulong), 0);	cli->flushchan = chancreate(sizeof(ulong), 0);	cli->readchan = chancreate(sizeof(uchar*), 0);	if(strstr(address, "udp!")){		cli->needcount = 0;		cli->nettid = threadcreate(udpThread, cli, SunStackSize);		cli->timertid = threadcreate(timerThread, cli, SunStackSize);	}else{		cli->needcount = 1;		cli->nettid = threadcreate(netThread, cli, SunStackSize);		/* assume reliable: don't need timer */		/* BUG: netThread should know how to redial */	}	threadcreate(rpcMuxThread, cli, SunStackSize);	return cli;}voidsunClientClose(SunClient *cli){	int n;	/*	 * Threadints get you out of any stuck system calls	 * or thread rendezvouses, but do nothing if the thread	 * is in the ready state.  Keep interrupting until it takes.	 */	n = 0;	if(!cli->timertid)		n++;	while(n < 2){		threadint(cli->nettid);		if(cli->timertid)			threadint(cli->timertid);		yield();		while(nbrecv(cli->dying, nil) == 1)			n++;	}	sendp(cli->rpcchan, 0);	recvp(cli->dying);	/* everyone's gone: clean up */	close(cli->fd);	chanfree(cli->flushchan);	chanfree(cli->readchan);	chanfree(cli->timerchan);	free(cli);}	voidsunClientFlushRpc(SunClient *cli, ulong tag){	sendul(cli->flushchan, tag);}voidsunClientProg(SunClient *cli, SunProg *p){	if(cli->nprog%16 == 0)		cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));	cli->prog[cli->nprog++] = p;}intsunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree){	uchar *bp, *p, *ep;	int i, n1, n2, n, nn;	Out o;	SunProg *prog;	SunStatus ok;	for(i=0; i<cli->nprog; i++)		if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)			break;	if(i==cli->nprog){		werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);		return -1;	}	prog = cli->prog[i];	if(cli->chatty){		fprint(2, "out: %B\n", &tx->rpc);		fprint(2, "\t%C\n", tx);	}	n1 = sunRpcSize(&tx->rpc);	n2 = sunCallSize(prog, tx);	n = n1+n2;	if(cli->needcount)		n += 4;	bp = emalloc(n);	ep = bp+n;	p = bp;	if(cli->needcount){		nn = n-4;		p[0] = (nn>>24)|0x80;		p[1] = nn>>16;		p[2] = nn>>8;		p[3] = nn;		p += 4;	}	if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess	|| (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){		sunErrstr(ok);		free(bp);		return -1;	}	if(p != ep){		werrstr("rpc: packet size mismatch");		free(bp);		return -1;	}	memset(&o, 0, sizeof o);	o.creply = chancreate(sizeof(void*), 0);	o.tag = tag;	o.p = bp;	o.n = n;	sendp(cli->rpcchan, &o);	recvp(o.creply);	chanfree(o.creply);	if(o.p == nil){		werrstr("%s", o.err);		return -1;	}	p = o.rpc.data;	ep = p+o.rpc.ndata;	rx->rpc = o.rpc;	rx->rpc.proc = tx->rpc.proc;	rx->rpc.prog = tx->rpc.prog;	rx->rpc.vers = tx->rpc.vers;	rx->type = (rx->rpc.proc<<1)|1;	if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){		sunErrstr(ok);		werrstr("unpack: %r");		free(o.p);		return -1;	}	if(cli->chatty){		fprint(2, "in: %B\n", &rx->rpc);		fprint(2, "in:\t%C\n", rx);	}	if(tofree)		*tofree = o.p;	else		free(o.p);	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -