⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ipctest.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* $Id: ipctest.c,v 1.22.2.6 2005/08/17 08:27:58 sunjd Exp $ */#include <string.h>#include <errno.h>#include <stdlib.h>#include <unistd.h>#include <stdio.h>#include <sys/types.h>#include <sys/wait.h>#include <glib.h>#include <clplumbing/cl_log.h>#include <clplumbing/cl_poll.h>#include <clplumbing/GSource.h>#include <clplumbing/ipc.h>#define	MAXERRORS	1000typedef int (*TestFunc_t)(IPC_Channel*chan, int count);static int channelpair(TestFunc_t client, TestFunc_t server, int count);#if 0static void clientserverpair(IPC_Channel* channels[2]);#endifstatic int echoserver(IPC_Channel*, int repcount);static int echoclient(IPC_Channel*, int repcount);static int asyn_echoserver(IPC_Channel*, int repcount);static int asyn_echoclient(IPC_Channel*, int repcount);static int mainloop_server(IPC_Channel* chan, int repcount);static int mainloop_client(IPC_Channel* chan, int repcount);static int checksock(IPC_Channel* channel);static void checkifblocked(IPC_Channel* channel);static int (*PollFunc)(struct pollfd * fds, unsigned int, int)=	(int (*)(struct pollfd * fds, unsigned int, int))  poll;static gboolean checkmsg(IPC_Message* rmsg, const char * who, int rcount);static intchannelpair(TestFunc_t	clientfunc, TestFunc_t serverfunc, int count){	IPC_Channel* channels[2];	int		rc  = 0;	int		waitstat = 0;	switch (fork()) {		case -1:			cl_perror("can't fork");			exit(1);			break;		default: /* Parent */			while (wait(&waitstat) > 0) {				if (WIFEXITED(waitstat)) {					rc += WEXITSTATUS(waitstat);				}else{					rc += 1;				}			}			if (rc > 127) {				rc = 127;			}			exit(rc);			break;		case 0:	/* Child */			break;	}	/* Child continues here... */	if (ipc_channel_pair(channels) != IPC_OK) {		cl_perror("Can't create ipc channel pair");		exit(1);	}	checksock(channels[0]);	checksock(channels[1]);	switch (fork()) {		case -1:			cl_perror("can't fork");			exit(1);			break;		case 0:		/* echo "client" Child */			channels[1]->ops->destroy(channels[1]);			channels[1] = NULL;			rc = clientfunc(channels[0], count);			exit (rc > 127 ? 127 : rc);			break;		default:			break;	}	channels[0]->ops->destroy(channels[0]);	channels[0] = NULL;	rc = serverfunc(channels[1], count);	wait(&waitstat);	if (WIFEXITED(waitstat)) {		rc += WEXITSTATUS(waitstat);	}else{		rc += 1;	}	return(rc);}#if 0static voidclientserverpair(IPC_Channel* channels[2]){	char			path[] = IPC_PATH_ATTR;	char			commpath[] = "/var/run/foobar"	GHashTable *		wattrs;	IPC_WAIT_CONNECTION*	wconn;	wattrs = g_hash_table_new(g_str_hash, g_str_equal);	g_hash_table_insert(wattrs, path, commpath);	wconn = ipc_wait_conn_constructor(IPC_ANYTYPE, wconnattrs);	if (wconn == NULL) {		cl_perror("Can't create wait connection");		exit(1);	}}#endifstatic voidcheckifblocked(IPC_Channel* chan){	if (chan->ops->is_sending_blocked(chan)) {		cl_log(LOG_INFO, "Sending is blocked.");		chan->ops->resume_io(chan);	}}#ifdef CHEAT_CHECKSextern long	SeqNums[32];#endifstatic inttransport_tests(int iterations){	int	rc = 0;#ifdef CHEAT_CHECKS	memset(SeqNums, 0, sizeof(SeqNums));#endif	rc += channelpair(echoclient, echoserver, iterations);#ifdef CHEAT_CHECKS	memset(SeqNums, 0, sizeof(SeqNums));#endif	rc += channelpair(asyn_echoclient, asyn_echoserver, iterations);#ifdef CHEAT_CHECKS	memset(SeqNums, 0, sizeof(SeqNums));#endif	rc += channelpair(mainloop_client, mainloop_server, iterations);	return rc;}intmain(int argc, char ** argv){	int	rc = 0;	cl_log_set_entity("ipctest");	cl_log_enable_stderr(TRUE);	rc += transport_tests(10000);#if 0	/* Broken for the moment - need to fix it long term */	cl_log(LOG_INFO, "NOTE: Enabling poll(2) replacement code.");	PollFunc = cl_poll;	g_main_set_poll_func(cl_glibpoll);	ipc_set_pollfunc(cl_poll);	rc += transport_tests(50000);#endif		cl_log(LOG_INFO, "TOTAL errors: %d", rc);	return (rc > 127 ? 127 : rc);}static intchecksock(IPC_Channel* channel){	if (!IPC_ISRCONN(channel)) {		cl_log(LOG_ERR, "Channel status is %d"		", not IPC_CONNECT", channel->ch_status);		return 1;	}	return 0;}static voidEOFcheck(IPC_Channel* chan){	int		fd = chan->ops->get_recv_select_fd(chan);	struct pollfd 	pf[1];	int		rc;	cl_log(LOG_INFO, "channel state: %d", chan->ch_status);	if (chan->recv_queue->current_qlen > 0) {		cl_log(LOG_INFO, "EOF Receive queue has %d messages in it"		,	chan->recv_queue->current_qlen);	}	if (fd <= 0) {		cl_log(LOG_INFO, "EOF receive fd: %d", fd);	}	pf[0].fd	= fd;	pf[0].events	= POLLIN|POLLHUP;	pf[0].revents	= 0;	rc = poll(pf, 1, 0);	if (rc < 0) {		cl_perror("failed poll(2) call in EOFcheck");		return;	}	/* Got input? */	if (pf[0].revents & POLLIN) {		cl_log(LOG_INFO, "EOF socket %d (still) has input ready (real poll)"		,	fd);	}	if ((pf[0].revents & ~(POLLIN|POLLHUP)) != 0) {		cl_log(LOG_INFO, "EOFcheck poll(2) bits: 0x%lx"		,	(unsigned long)pf[0].revents);	}	pf[0].fd	= fd;	pf[0].events	= POLLIN|POLLHUP;	pf[0].revents	= 0;	rc = PollFunc(pf, 1, 0);	if (rc < 0) {		cl_perror("failed PollFunc() call in EOFcheck");		return;	}	/* Got input? */	if (pf[0].revents & POLLIN) {		cl_log(LOG_INFO, "EOF socket %d (still) has input ready (PollFunc())"		,	fd);	}	if ((pf[0].revents & ~(POLLIN|POLLHUP)) != 0) {		cl_log(LOG_INFO, "EOFcheck PollFunc() bits: 0x%lx"		,	(unsigned long)pf[0].revents);	}}static intechoserver(IPC_Channel* wchan, int repcount){	char	str[256];	int	j;	int	errcount = 0;	IPC_Message	wmsg;	IPC_Message*	rmsg;	wmsg.msg_private = NULL;	wmsg.msg_done = NULL;	wmsg.msg_body = str;	wmsg.msg_ch = wchan;	cl_log(LOG_INFO, "Echo server: %d reps pid %d.", repcount, getpid());	for (j=1; j <= repcount	;++j, rmsg != NULL && (rmsg->msg_done(rmsg),1)) {		int	rc;		snprintf(str, sizeof(str)-1, "String-%d", j);		wmsg.msg_len = strlen(str)+1;		if ((rc = wchan->ops->send(wchan, &wmsg)) != IPC_OK) {			cl_log(LOG_ERR			,	"echotest: send failed %d rc iter %d"			,	rc, j);			++errcount;			continue;		}		/*fprintf(stderr, "+"); */		wchan->ops->waitout(wchan);		checkifblocked(wchan);		/*fprintf(stderr, "S"); */		/* Try and induce a failure... */		if (j == repcount) {			sleep(1);		}		while ((rc = wchan->ops->waitin(wchan)) == IPC_INTR);				if (rc != IPC_OK) {			cl_log(LOG_ERR			,	"echotest server: waitin failed %d rc iter %d"			" errno=%d"			,	rc, j, errno);			cl_perror("waitin");			exit(1);		}		/*fprintf(stderr, "-"); */		if ((rc = wchan->ops->recv(wchan, &rmsg)) != IPC_OK) {			cl_log(LOG_ERR			,	"echotest server: recv failed %d rc iter %d"			" errno=%d"			,	rc, j, errno);			cl_perror("recv");			++errcount;			rmsg=NULL;			continue;		}		/*fprintf(stderr, "s"); */		if (rmsg->msg_len != wmsg.msg_len) {			cl_log(LOG_ERR			,	"echotest: length mismatch [%lu,%lu] iter %d"			,	(unsigned long)rmsg->msg_len			,	(unsigned long)wmsg.msg_len, j);			++errcount;			continue;		}		if (strncmp(rmsg->msg_body, wmsg.msg_body, wmsg.msg_len)		!= 0) {			cl_log(LOG_ERR			,	"echotest: data mismatch. iteration %d"			,	j);			++errcount;			continue;		}			}	cl_log(LOG_INFO, "echoserver: %d errors", errcount);#if 0	cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)wchan);#endif	wchan->ops->destroy(wchan); wchan = NULL;	return errcount;}static intechoclient(IPC_Channel* rchan, int repcount){	int	j;	int	errcount = 0;	IPC_Message*	rmsg;	cl_log(LOG_INFO, "Echo client: %d reps pid %d."	,	repcount, (int)getpid());	for (j=1; j <= repcount ;++j) {		int	rc;		while ((rc = rchan->ops->waitin(rchan)) == IPC_INTR);				if (rc != IPC_OK) {			cl_log(LOG_ERR			,	"echotest client: waitin failed %d rc iter %d"			" errno=%d"			,	rc, j, errno);			cl_perror("waitin");			exit(1);		}		/*fprintf(stderr, "/"); */		if ((rc = rchan->ops->recv(rchan, &rmsg)) != IPC_OK) {			cl_log(LOG_ERR			,	"echoclient: recv failed %d rc iter %d"			" errno=%d"			,	rc, j, errno);			cl_perror("recv");			++errcount;			--j;			rmsg=NULL;			continue;		}		/*fprintf(stderr, "c"); */		if ((rc = rchan->ops->send(rchan, rmsg)) != IPC_OK) {			cl_log(LOG_ERR			,	"echoclient: send failed %d rc iter %d"			,	rc, j);			cl_log(LOG_INFO, "Message being sent: %s"			,		(char*)rmsg->msg_body);			++errcount;			continue;		}		/*fprintf(stderr, "%%"); */		rchan->ops->waitout(rchan);		checkifblocked(rchan);		/*fprintf(stderr, "C"); */	}	cl_log(LOG_INFO, "echoclient: %d errors", errcount);#if 0	cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)rchan);#endif	rchan->ops->destroy(rchan); rchan = NULL;	return errcount;}static voidechomsgbody(void * body, int niter, size_t * len){	char *	str = body;	sprintf(str, "String-%d", niter);	*len = strlen(str)+1;}static voidmsg_free(IPC_Message* msg){	#if 0	memset(msg->msg_body, 0xAA, msg->msg_len);#endif	free(msg->msg_body);#if 0	memset(msg, 0x55, sizeof(*msg));#endif	free(msg);}static IPC_Message*newmessage(IPC_Channel* chan, int niter){	IPC_Message*	msg;	msg = malloc(sizeof(*msg));	if (msg) {		msg->msg_private = NULL;		msg->msg_done = msg_free;		msg->msg_ch = chan;		msg->msg_body = malloc(32);		echomsgbody(msg->msg_body, niter, &msg->msg_len);	}	return msg;}void dump_ipc_info(IPC_Channel* chan);static intcheckinput(IPC_Channel* chan, const char * where, int* rdcount, int maxcount){	IPC_Message*	rmsg = NULL;	int		errs = 0;	int		rc;	while (chan->ops->is_message_pending(chan)	&&	errs < 10 && *rdcount < maxcount) {		if (chan->ch_status == IPC_DISCONNECT && *rdcount < maxcount){			cl_log(LOG_ERR			,	"checkinput1[0x%lx %s]: EOF in iter %d"			,	(unsigned long)chan, where, *rdcount);			EOFcheck(chan);		}		if (rmsg != NULL) {			rmsg->msg_done(rmsg);			rmsg = NULL;		}		if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) {			if (chan->ch_status == IPC_DISCONNECT) {				cl_log(LOG_ERR				,	"checkinput2[0x%lx %s]: EOF in iter %d"				,	(unsigned long)chan, where, *rdcount);				EOFcheck(chan);				return errs;			}			cl_log(LOG_ERR			,	"checkinput[%s]: recv"			" failed: rc %d  rdcount %d errno=%d"			,	where, rc, *rdcount, errno);			cl_perror("recv");			rmsg=NULL;			++errs;			continue;		}		*rdcount += 1;		if (!checkmsg(rmsg, where, *rdcount)) {			dump_ipc_info(chan);			++errs;		}		if (*rdcount < maxcount && chan->ch_status == IPC_DISCONNECT){			cl_log(LOG_ERR			,	"checkinput3[0x%lx %s]: EOF in iter %d"			,	(unsigned long)chan, where, *rdcount);			EOFcheck(chan);		}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -