⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ipctest.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
	}	return errs;}static intasyn_echoserver(IPC_Channel* wchan, int repcount){	int		rdcount = 0;	int		wrcount = 0;	int		errcount = 0;	int		blockedcount = 0;	IPC_Message*	wmsg;	int		lastcount = -1;	const char*	w = "asyn_echoserver";	cl_log(LOG_INFO, "Asyn echo server: %d reps pid %d."	,	repcount, (int)getpid());	while (rdcount < repcount) {		int	rc;		do {			++wrcount;			if (wrcount > repcount) {				break;			}			wmsg = newmessage(wchan, wrcount);			/*fprintf(stderr, "s"); */			if ((rc = wchan->ops->send(wchan, wmsg)) != IPC_OK) {				cl_log(LOG_ERR				,	"asyn_echoserver: send failed"				" %d rc iter %d"				,	rc, wrcount);				++errcount;				continue;			}			lastcount = wrcount;						if (wchan->ops->is_sending_blocked(wchan)) {				/* fprintf(stderr, "b"); */				++blockedcount;			}else{				blockedcount = 0;			}			errcount += checkinput(wchan, w, &rdcount, repcount);			if (wrcount < repcount			&&	wchan->ch_status == IPC_DISCONNECT) {				++errcount;				break;			}		}while (wrcount < repcount && blockedcount < 10		&&	wchan->ch_status != IPC_DISCONNECT);		if (wrcount < repcount) {			/* fprintf(stderr, "B"); */		}		wchan->ops->waitout(wchan);		errcount += checkinput(wchan, w, &rdcount, repcount);		if (wrcount >= repcount && rdcount < repcount) {			while ((rc = wchan->ops->waitin(wchan)) == IPC_INTR);						if (rc != IPC_OK) {				cl_log(LOG_ERR				,	"asyn_echoserver: waitin()"				" failed %d rc rdcount %d errno=%d"				,	rc, rdcount, errno);				cl_perror("waitin");				exit(1);			}		}		if (wchan->ch_status == IPC_DISCONNECT		&&	rdcount < repcount) {			cl_log(LOG_ERR			,	"asyn_echoserver: EOF in iter %d"			,	rdcount);			EOFcheck(wchan);			++errcount;			break;		}	}	cl_log(LOG_INFO, "asyn_echoserver: %d errors", errcount);#if 0	cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)wchan);#endif	wchan->ops->destroy(wchan); wchan = NULL;	return errcount;}static intasyn_echoclient(IPC_Channel* chan, int repcount){	int		rdcount = 0;	int		wrcount = 0;	int		errcount = 0;	IPC_Message*	rmsg;	int		rfd = chan->ops->get_recv_select_fd(chan);	int		wfd = chan->ops->get_send_select_fd(chan);	gboolean	rdeqwr = (rfd == wfd);	cl_log(LOG_INFO, "Async Echo client: %d reps pid %d."	,	repcount, (int)getpid());	ipc_set_pollfunc(PollFunc);	while (rdcount < repcount && errcount < repcount) {		int		rc;		struct pollfd 	pf[2];		int		nfd = 1;		pf[0].fd	= rfd;		pf[0].events	= POLLIN|POLLHUP;		if (chan->ops->is_sending_blocked(chan)) {			if (rdeqwr) {				pf[0].events |= POLLOUT;			}else{				nfd = 2;				pf[1].fd = wfd;				pf[1].events = POLLOUT|POLLHUP;			}		}		/* Have input? */		/* fprintf(stderr, "i"); */		while (chan->ops->is_message_pending(chan)		&&	rdcount < repcount) {			/*fprintf(stderr, "r"); */			if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) {				if (!IPC_ISRCONN(chan)) {					cl_log(LOG_ERR					,	"Async echoclient: disconnect"					" iter %d", rdcount+1);					++errcount;					return errcount;				}				cl_log(LOG_ERR				,	"Async echoclient: recv"				" failed %d rc iter %d errno=%d"				,	rc, rdcount+1, errno);				cl_perror("recv");				rmsg=NULL;				++errcount;				cl_log(LOG_INFO, "sleep(1)");				sleep(1);				continue;			}			/*fprintf(stderr, "c"); */			++rdcount;			if ((rc = chan->ops->send(chan, rmsg))			!=	IPC_OK) {				++errcount;				cl_perror("send");				cl_log(LOG_ERR				,	"Async echoclient: send failed"				" rc %d, iter %d", rc, rdcount);				cl_log(LOG_INFO, "Message being sent: %s"				,		(char*)rmsg->msg_body);				if (!IPC_ISRCONN(chan)) {					cl_log(LOG_ERR					,	"Async echoclient: EOF(2)"					" iter %d", rdcount+1);					EOFcheck(chan);					return errcount;				}				continue;			}else{				++wrcount;			}			/*fprintf(stderr, "x"); */		}		if (rdcount >= repcount) {			break;		}		/*		 * At this point it is possible that the POLLOUT bit		 * being on is no longer necessary, but this will only		 * cause an extra (false) output poll iteration at worst...		 * This is because (IIRC) both is_sending_blocked(), and 		 * is_message_pending() both perform a resume_io().		 * This might be confusing, but -- oh well...		 */		/*		  fprintf(stderr, "P");		  cl_log(LOG_INFO, "poll[%d, 0x%x]"		  ,	pf[0].fd, pf[0].events);		  cl_log(LOG_DEBUG, "poll[%d, 0x%x]..."		  ,	pf[0].fd, pf[0].events);		  fprintf(stderr, "%%");		  cl_log(LOG_DEBUG, "CallingPollFunc()");		*/		rc = PollFunc(pf, nfd, -1);		/* Bad poll? */		if (rc <= 0) {			cl_perror("Async echoclient: bad poll rc."			" %d rc iter %d", rc, rdcount);			++errcount;			continue;		}		/* Error indication? */		if ((pf[0].revents & (POLLERR|POLLNVAL)) != 0) {			cl_log(LOG_ERR			,	"Async echoclient: bad poll revents."			" revents: 0x%x iter %d", pf[0].revents, rdcount);			++errcount;			continue;		}		/* HUP without input... Premature EOF... */		if ((pf[0].revents & POLLHUP)		&&	((pf[0].revents&POLLIN) == 0)) {			cl_log(LOG_ERR			,	"Async echoclient: premature pollhup."			" revents: 0x%x iter %d", pf[0].revents, rdcount);			EOFcheck(chan);			++errcount;			continue;		}		/* Error indication? */		if (nfd > 1		&&	(pf[1].revents & (POLLERR|POLLNVAL)) != 0) {			cl_log(LOG_ERR			,	"Async echoclient: bad poll revents[1]."			" revents: 0x%x iter %d", pf[1].revents, rdcount);			++errcount;			continue;		}		/* Output unblocked (only) ? */		if (pf[nfd-1].revents & POLLOUT) {			/*fprintf(stderr, "R");*/			chan->ops->resume_io(chan);		}else if ((pf[0].revents & POLLIN) == 0) {			/* Neither I nor O available... */			cl_log(LOG_ERR			,	"Async echoclient: bad events."			" revents: 0x%x iter %d", pf[0].revents, rdcount);			++errcount;		}	}	cl_poll_ignore(rfd);	cl_poll_ignore(wfd);	cl_log(LOG_INFO, "Async echoclient: %d errors, %d reads, %d writes"	,	errcount, rdcount, wrcount);#if 0	cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)chan);#endif	chan->ops->destroy(chan); chan = NULL;	return errcount;}struct iterinfo {	int		wcount;	int		rcount;	int		errcount;	IPC_Channel*	chan;	int		max;	gboolean	sendingsuspended;};static GMainLoop*	loop = NULL;static gbooleans_send_msg(gpointer data){	struct iterinfo*i = data;	IPC_Message*	wmsg;	int		rc;		/* Flow control? */	if (i->chan->send_queue->current_qlen	>=	i->chan->send_queue->max_qlen-2) {		i->sendingsuspended = TRUE;		return FALSE;	}	if (i->sendingsuspended) {		i->sendingsuspended = FALSE;	}	++i->wcount;		wmsg = newmessage(i->chan, i->wcount);	/*fprintf(stderr, "s");*/	if ((rc = i->chan->ops->send(i->chan, wmsg)) != IPC_OK) {		cl_log(LOG_ERR		,	"s_send_msg: send failed"		" %d rc iter %d"		,	rc, i->wcount);		cl_log(LOG_ERR		,	"s_send_msg: channel status: %d qlen: %d"		,	i->chan->ch_status		,	i->chan->send_queue->current_qlen);		++i->errcount;		if (i->chan->ch_status != IPC_CONNECT) {			cl_log(LOG_ERR,	"s_send_msg: Exiting.");			return FALSE;		}		if (i->errcount >= MAXERRORS) {			g_main_quit(loop);			return FALSE;		}	}	return i->wcount < i->max;}static gbooleans_rcv_msg(IPC_Channel* chan, gpointer data){	struct iterinfo*i = data;	i->errcount += checkinput(chan, "s_rcv_msg", &i->rcount, i->max);	if (i->sendingsuspended	&&	!chan->ops->is_sending_blocked(chan)) {		i->sendingsuspended = FALSE;		g_idle_add(s_send_msg, data);	}	if (chan->ch_status == IPC_DISCONNECT	||	i->rcount >= i->max || i->errcount > MAXERRORS) {		if (i->rcount < i->max) {			++i->errcount;			cl_log(LOG_INFO, "Early exit from s_rcv_msg");		}		g_main_quit(loop);		return FALSE;	}	return TRUE;}static gbooleancheckmsg(IPC_Message* rmsg, const char * who, int rcount){	char		str[256];	size_t		len;	echomsgbody(str, rcount, &len);	if (rmsg->msg_len != len) {		cl_log(LOG_ERR		,	"checkmsg[%s]: length mismatch"		" [expected %u, got %lu] iteration %d"		,	who, (unsigned)len		,	(unsigned long)rmsg->msg_len		,	rcount);		cl_log(LOG_ERR		,	"checkmsg[%s]: expecting [%s]"		,	who, str);		cl_log(LOG_ERR		,	"checkmsg[%s]: got [%s] instead"		,	who, (const char *)rmsg->msg_body);		return FALSE;	}	if (strncmp(rmsg->msg_body, str, len) != 0) {		cl_log(LOG_ERR		,	"checkmsg[%s]: data mismatch"		". input iteration %d"		,	who, rcount);		cl_log(LOG_ERR		,	"checkmsg[%s]: expecting [%s]"		,	who, str);		cl_log(LOG_ERR		,	"checkmsg[%s]: got [%s] instead"		,	who, (const char *)rmsg->msg_body);		return FALSE;#if 0	}else if (strcmp(who, "s_rcv_msg") == 0) {#if 0	||	strcmp(who, "s_echo_msg") == 0) {#endif		cl_log(LOG_ERR		,	"checkmsg[%s]: data Good"		"! input iteration %d"		,	who, rcount);#endif	}	return TRUE;}static gbooleans_echo_msg(IPC_Channel* chan, gpointer data){	struct iterinfo*	i = data;	int			rc;	IPC_Message*		rmsg;	while (chan->ops->is_message_pending(chan)) {		if (chan->ch_status == IPC_DISCONNECT) {			break;		}		if (i->chan->send_queue->current_qlen		>=	i->chan->send_queue->max_qlen-2) {			i->sendingsuspended = TRUE;			cl_log(LOG_INFO			,	"s_echo_msg: Sending suspended.");			goto retout;		}		i->sendingsuspended = FALSE;		if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) {			cl_log(LOG_ERR			,	"s_echo_msg: recv failed %d rc iter %d"			" errno=%d"			,	rc, i->rcount+1, errno);			cl_perror("recv");			++i->errcount;			goto retout;		}		i->rcount++;		if (!checkmsg(rmsg, "s_echo_msg", i->rcount)) {			++i->errcount;		}		/*fprintf(stderr, "c");*/		if ((rc = chan->ops->send(chan, rmsg)) != IPC_OK) {			cl_log(LOG_ERR			,	"s_echo_msg: send failed %d rc iter %d qlen %d"			,	rc, i->rcount, chan->send_queue->current_qlen);			cl_perror("s_echo_msg:send");			++i->errcount;		}else{			i->wcount+=1;		}	}retout:	/*fprintf(stderr, "%%");*/	if (i->rcount >= i->max || chan->ch_status == IPC_DISCONNECT	||	i->errcount > MAXERRORS) {		chan->ops->waitout(chan);		g_main_quit(loop);		return FALSE;	}	return TRUE;}static voidinit_iterinfo(struct iterinfo * i, IPC_Channel* chan, int max){	memset(i, 0, sizeof(*i));	i->chan = chan;	i->max = max;}static intmainloop_server(IPC_Channel* chan, int repcount){	struct iterinfo info;	GCHSource*	msgchan;	guint		sendmsgsrc;	loop = g_main_new(FALSE);	init_iterinfo(&info, chan, repcount);	sendmsgsrc = g_idle_add(s_send_msg, &info);	msgchan = G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, chan	,	FALSE, s_rcv_msg, &info, NULL);	cl_log(LOG_INFO, "Mainloop echo server: %d reps pid %d.", repcount, (int)getpid());	g_main_run(loop);	g_main_destroy(loop);	g_source_remove(sendmsgsrc);	loop = NULL;	cl_log(LOG_INFO, "Mainloop echo server: %d errors", info.errcount);	return info.errcount;}static intmainloop_client(IPC_Channel* chan, int repcount){	struct iterinfo info;	loop = g_main_new(FALSE);	init_iterinfo(&info, chan, repcount);	G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, chan	,	FALSE, s_echo_msg, &info, NULL);	cl_log(LOG_INFO, "Mainloop echo client: %d reps pid %d.", repcount, (int)getpid());	g_main_run(loop);	g_main_destroy(loop);	loop = NULL;	cl_log(LOG_INFO, "Mainloop echo client: %d errors, %d read %d written"	,	info.errcount, info.rcount, info.wcount);	return info.errcount;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -