📄 ipctest.c
字号:
/* $Id: ipctest.c,v 1.22.2.6 2005/08/17 08:27:58 sunjd Exp $ */#include <string.h>#include <errno.h>#include <stdlib.h>#include <unistd.h>#include <stdio.h>#include <sys/types.h>#include <sys/wait.h>#include <glib.h>#include <clplumbing/cl_log.h>#include <clplumbing/cl_poll.h>#include <clplumbing/GSource.h>#include <clplumbing/ipc.h>#define MAXERRORS 1000typedef int (*TestFunc_t)(IPC_Channel*chan, int count);static int channelpair(TestFunc_t client, TestFunc_t server, int count);#if 0static void clientserverpair(IPC_Channel* channels[2]);#endifstatic int echoserver(IPC_Channel*, int repcount);static int echoclient(IPC_Channel*, int repcount);static int asyn_echoserver(IPC_Channel*, int repcount);static int asyn_echoclient(IPC_Channel*, int repcount);static int mainloop_server(IPC_Channel* chan, int repcount);static int mainloop_client(IPC_Channel* chan, int repcount);static int checksock(IPC_Channel* channel);static void checkifblocked(IPC_Channel* channel);static int (*PollFunc)(struct pollfd * fds, unsigned int, int)= (int (*)(struct pollfd * fds, unsigned int, int)) poll;static gboolean checkmsg(IPC_Message* rmsg, const char * who, int rcount);static intchannelpair(TestFunc_t clientfunc, TestFunc_t serverfunc, int count){ IPC_Channel* channels[2]; int rc = 0; int waitstat = 0; switch (fork()) { case -1: cl_perror("can't fork"); exit(1); break; default: /* Parent */ while (wait(&waitstat) > 0) { if (WIFEXITED(waitstat)) { rc += WEXITSTATUS(waitstat); }else{ rc += 1; } } if (rc > 127) { rc = 127; } exit(rc); break; case 0: /* Child */ break; } /* Child continues here... */ if (ipc_channel_pair(channels) != IPC_OK) { cl_perror("Can't create ipc channel pair"); exit(1); } checksock(channels[0]); checksock(channels[1]); switch (fork()) { case -1: cl_perror("can't fork"); exit(1); break; case 0: /* echo "client" Child */ channels[1]->ops->destroy(channels[1]); channels[1] = NULL; rc = clientfunc(channels[0], count); exit (rc > 127 ? 127 : rc); break; default: break; } channels[0]->ops->destroy(channels[0]); channels[0] = NULL; rc = serverfunc(channels[1], count); wait(&waitstat); if (WIFEXITED(waitstat)) { rc += WEXITSTATUS(waitstat); }else{ rc += 1; } return(rc);}#if 0static voidclientserverpair(IPC_Channel* channels[2]){ char path[] = IPC_PATH_ATTR; char commpath[] = "/var/run/foobar" GHashTable * wattrs; IPC_WAIT_CONNECTION* wconn; wattrs = g_hash_table_new(g_str_hash, g_str_equal); g_hash_table_insert(wattrs, path, commpath); wconn = ipc_wait_conn_constructor(IPC_ANYTYPE, wconnattrs); if (wconn == NULL) { cl_perror("Can't create wait connection"); exit(1); }}#endifstatic voidcheckifblocked(IPC_Channel* chan){ if (chan->ops->is_sending_blocked(chan)) { cl_log(LOG_INFO, "Sending is blocked."); chan->ops->resume_io(chan); }}#ifdef CHEAT_CHECKSextern long SeqNums[32];#endifstatic inttransport_tests(int iterations){ int rc = 0;#ifdef CHEAT_CHECKS memset(SeqNums, 0, sizeof(SeqNums));#endif rc += channelpair(echoclient, echoserver, iterations);#ifdef CHEAT_CHECKS memset(SeqNums, 0, sizeof(SeqNums));#endif rc += channelpair(asyn_echoclient, asyn_echoserver, iterations);#ifdef CHEAT_CHECKS memset(SeqNums, 0, sizeof(SeqNums));#endif rc += channelpair(mainloop_client, mainloop_server, iterations); return rc;}intmain(int argc, char ** argv){ int rc = 0; cl_log_set_entity("ipctest"); cl_log_enable_stderr(TRUE); rc += transport_tests(10000);#if 0 /* Broken for the moment - need to fix it long term */ cl_log(LOG_INFO, "NOTE: Enabling poll(2) replacement code."); PollFunc = cl_poll; g_main_set_poll_func(cl_glibpoll); ipc_set_pollfunc(cl_poll); rc += transport_tests(50000);#endif cl_log(LOG_INFO, "TOTAL errors: %d", rc); return (rc > 127 ? 127 : rc);}static intchecksock(IPC_Channel* channel){ if (!IPC_ISRCONN(channel)) { cl_log(LOG_ERR, "Channel status is %d" ", not IPC_CONNECT", channel->ch_status); return 1; } return 0;}static voidEOFcheck(IPC_Channel* chan){ int fd = chan->ops->get_recv_select_fd(chan); struct pollfd pf[1]; int rc; cl_log(LOG_INFO, "channel state: %d", chan->ch_status); if (chan->recv_queue->current_qlen > 0) { cl_log(LOG_INFO, "EOF Receive queue has %d messages in it" , chan->recv_queue->current_qlen); } if (fd <= 0) { cl_log(LOG_INFO, "EOF receive fd: %d", fd); } pf[0].fd = fd; pf[0].events = POLLIN|POLLHUP; pf[0].revents = 0; rc = poll(pf, 1, 0); if (rc < 0) { cl_perror("failed poll(2) call in EOFcheck"); return; } /* Got input? */ if (pf[0].revents & POLLIN) { cl_log(LOG_INFO, "EOF socket %d (still) has input ready (real poll)" , fd); } if ((pf[0].revents & ~(POLLIN|POLLHUP)) != 0) { cl_log(LOG_INFO, "EOFcheck poll(2) bits: 0x%lx" , (unsigned long)pf[0].revents); } pf[0].fd = fd; pf[0].events = POLLIN|POLLHUP; pf[0].revents = 0; rc = PollFunc(pf, 1, 0); if (rc < 0) { cl_perror("failed PollFunc() call in EOFcheck"); return; } /* Got input? */ if (pf[0].revents & POLLIN) { cl_log(LOG_INFO, "EOF socket %d (still) has input ready (PollFunc())" , fd); } if ((pf[0].revents & ~(POLLIN|POLLHUP)) != 0) { cl_log(LOG_INFO, "EOFcheck PollFunc() bits: 0x%lx" , (unsigned long)pf[0].revents); }}static intechoserver(IPC_Channel* wchan, int repcount){ char str[256]; int j; int errcount = 0; IPC_Message wmsg; IPC_Message* rmsg; wmsg.msg_private = NULL; wmsg.msg_done = NULL; wmsg.msg_body = str; wmsg.msg_ch = wchan; cl_log(LOG_INFO, "Echo server: %d reps pid %d.", repcount, getpid()); for (j=1; j <= repcount ;++j, rmsg != NULL && (rmsg->msg_done(rmsg),1)) { int rc; snprintf(str, sizeof(str)-1, "String-%d", j); wmsg.msg_len = strlen(str)+1; if ((rc = wchan->ops->send(wchan, &wmsg)) != IPC_OK) { cl_log(LOG_ERR , "echotest: send failed %d rc iter %d" , rc, j); ++errcount; continue; } /*fprintf(stderr, "+"); */ wchan->ops->waitout(wchan); checkifblocked(wchan); /*fprintf(stderr, "S"); */ /* Try and induce a failure... */ if (j == repcount) { sleep(1); } while ((rc = wchan->ops->waitin(wchan)) == IPC_INTR); if (rc != IPC_OK) { cl_log(LOG_ERR , "echotest server: waitin failed %d rc iter %d" " errno=%d" , rc, j, errno); cl_perror("waitin"); exit(1); } /*fprintf(stderr, "-"); */ if ((rc = wchan->ops->recv(wchan, &rmsg)) != IPC_OK) { cl_log(LOG_ERR , "echotest server: recv failed %d rc iter %d" " errno=%d" , rc, j, errno); cl_perror("recv"); ++errcount; rmsg=NULL; continue; } /*fprintf(stderr, "s"); */ if (rmsg->msg_len != wmsg.msg_len) { cl_log(LOG_ERR , "echotest: length mismatch [%lu,%lu] iter %d" , (unsigned long)rmsg->msg_len , (unsigned long)wmsg.msg_len, j); ++errcount; continue; } if (strncmp(rmsg->msg_body, wmsg.msg_body, wmsg.msg_len) != 0) { cl_log(LOG_ERR , "echotest: data mismatch. iteration %d" , j); ++errcount; continue; } } cl_log(LOG_INFO, "echoserver: %d errors", errcount);#if 0 cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)wchan);#endif wchan->ops->destroy(wchan); wchan = NULL; return errcount;}static intechoclient(IPC_Channel* rchan, int repcount){ int j; int errcount = 0; IPC_Message* rmsg; cl_log(LOG_INFO, "Echo client: %d reps pid %d." , repcount, (int)getpid()); for (j=1; j <= repcount ;++j) { int rc; while ((rc = rchan->ops->waitin(rchan)) == IPC_INTR); if (rc != IPC_OK) { cl_log(LOG_ERR , "echotest client: waitin failed %d rc iter %d" " errno=%d" , rc, j, errno); cl_perror("waitin"); exit(1); } /*fprintf(stderr, "/"); */ if ((rc = rchan->ops->recv(rchan, &rmsg)) != IPC_OK) { cl_log(LOG_ERR , "echoclient: recv failed %d rc iter %d" " errno=%d" , rc, j, errno); cl_perror("recv"); ++errcount; --j; rmsg=NULL; continue; } /*fprintf(stderr, "c"); */ if ((rc = rchan->ops->send(rchan, rmsg)) != IPC_OK) { cl_log(LOG_ERR , "echoclient: send failed %d rc iter %d" , rc, j); cl_log(LOG_INFO, "Message being sent: %s" , (char*)rmsg->msg_body); ++errcount; continue; } /*fprintf(stderr, "%%"); */ rchan->ops->waitout(rchan); checkifblocked(rchan); /*fprintf(stderr, "C"); */ } cl_log(LOG_INFO, "echoclient: %d errors", errcount);#if 0 cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)rchan);#endif rchan->ops->destroy(rchan); rchan = NULL; return errcount;}static voidechomsgbody(void * body, int niter, size_t * len){ char * str = body; sprintf(str, "String-%d", niter); *len = strlen(str)+1;}static voidmsg_free(IPC_Message* msg){ #if 0 memset(msg->msg_body, 0xAA, msg->msg_len);#endif free(msg->msg_body);#if 0 memset(msg, 0x55, sizeof(*msg));#endif free(msg);}static IPC_Message*newmessage(IPC_Channel* chan, int niter){ IPC_Message* msg; msg = malloc(sizeof(*msg)); if (msg) { msg->msg_private = NULL; msg->msg_done = msg_free; msg->msg_ch = chan; msg->msg_body = malloc(32); echomsgbody(msg->msg_body, niter, &msg->msg_len); } return msg;}void dump_ipc_info(IPC_Channel* chan);static intcheckinput(IPC_Channel* chan, const char * where, int* rdcount, int maxcount){ IPC_Message* rmsg = NULL; int errs = 0; int rc; while (chan->ops->is_message_pending(chan) && errs < 10 && *rdcount < maxcount) { if (chan->ch_status == IPC_DISCONNECT && *rdcount < maxcount){ cl_log(LOG_ERR , "checkinput1[0x%lx %s]: EOF in iter %d" , (unsigned long)chan, where, *rdcount); EOFcheck(chan); } if (rmsg != NULL) { rmsg->msg_done(rmsg); rmsg = NULL; } if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) { if (chan->ch_status == IPC_DISCONNECT) { cl_log(LOG_ERR , "checkinput2[0x%lx %s]: EOF in iter %d" , (unsigned long)chan, where, *rdcount); EOFcheck(chan); return errs; } cl_log(LOG_ERR , "checkinput[%s]: recv" " failed: rc %d rdcount %d errno=%d" , where, rc, *rdcount, errno); cl_perror("recv"); rmsg=NULL; ++errs; continue; } *rdcount += 1; if (!checkmsg(rmsg, where, *rdcount)) { dump_ipc_info(chan); ++errs; } if (*rdcount < maxcount && chan->ch_status == IPC_DISCONNECT){ cl_log(LOG_ERR , "checkinput3[0x%lx %s]: EOF in iter %d" , (unsigned long)chan, where, *rdcount); EOFcheck(chan); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -