ipctest.c
来自「linux集群服务器软件代码包」· C语言 代码 · 共 1,066 行 · 第 1/2 页
C
1,066 行
){ if (!stopsending){ ++wrcount; if (wrcount > repcount) { break; } wmsg = wchan->ops->new_ipcmsg(wchan, NULL, 32, NULL); echomsgbody(wmsg->msg_body, wrcount, &wmsg->msg_len); if ((rc = wchan->ops->send(wchan, wmsg)) != IPC_OK){ cl_log(LOG_INFO, "channel sstatus in echo server is %d", wchan->ch_status); if (wchan->ch_status != IPC_CONNECT) { cl_log(LOG_ERR , "asyn_echoserver: send failed" " %d rc iter %d" , rc, wrcount); ++errcount; continue; }else {/*send failed because of channel busy * roll back */ --wrcount; } } if (wchan->ops->is_sending_blocked(wchan)) { /* fprintf(stderr, "b"); */ ++blockedcount; }else{ blockedcount = 0; } } errcount += checkinput(wchan, w, &rdcount, repcount); if (wrcount < repcount && wchan->ch_status == IPC_DISCONNECT) { ++errcount; break; } } /* cl_log(LOG_INFO, "async_echoserver: wrcount =%d rdcount=%d B", wrcount, rdcount); */ wchan->ops->waitout(wchan); errcount += checkinput(wchan, w, &rdcount, repcount); if (wrcount >= repcount && rdcount < repcount) { while ((rc = wchan->ops->waitin(wchan)) == IPC_INTR); if (rc != IPC_OK) { cl_log(LOG_ERR , "asyn_echoserver: waitin()" " failed %d rc rdcount %d errno=%d" , rc, rdcount, errno); cl_perror("waitin"); exit(1); } } if (wchan->ch_status == IPC_DISCONNECT && rdcount < repcount) { cl_log(LOG_ERR , "asyn_echoserver: EOF in iter %d" , rdcount); EOFcheck(wchan); ++errcount; break; } blockedcount = 0; } cl_log(LOG_INFO, "asyn_echoserver: %d errors", errcount);#if 0 cl_log(LOG_INFO, "%d destroying channel 0x%lx", getpid(), (unsigned long)wchan);#endif wchan->ops->destroy(wchan); wchan = NULL; return errcount;}static intasyn_echoclient(IPC_Channel* chan, int repcount){ int rdcount = 0; int wrcount = 0; int errcount = 0; IPC_Message* rmsg; int rfd = chan->ops->get_recv_select_fd(chan); int wfd = chan->ops->get_send_select_fd(chan); gboolean rdeqwr = (rfd == wfd); cl_log(LOG_INFO, "Async Echo client: %d reps pid %d." , repcount, (int)getpid()); ipc_set_pollfunc(PollFunc); while (rdcount < repcount && errcount < repcount) { int rc; struct pollfd pf[2]; int nfd = 1; pf[0].fd = rfd; pf[0].events = POLLIN|POLLHUP; if (chan->ops->is_sending_blocked(chan)) { if (rdeqwr) { pf[0].events |= POLLOUT; }else{ nfd = 2; pf[1].fd = wfd; pf[1].events = POLLOUT|POLLHUP; } } /* Have input? */ /* fprintf(stderr, "i"); */ while (chan->ops->is_message_pending(chan) && rdcount < repcount) { /*fprintf(stderr, "r"); */ if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) { if (!IPC_ISRCONN(chan)) { cl_log(LOG_ERR , "Async echoclient: disconnect" " iter %d", rdcount+1); ++errcount; return errcount; } cl_log(LOG_ERR , "Async echoclient: recv" " failed %d rc iter %d errno=%d" , rc, rdcount+1, errno); cl_perror("recv"); rmsg=NULL; ++errcount; cl_log(LOG_INFO, "sleep(1)"); sleep(1); continue; } /*fprintf(stderr, "c"); */ ++rdcount; do { rc = chan->ops->send(chan, rmsg); }while (rc != IPC_OK && chan->ch_status == IPC_CONNECT); if (chan->ch_status != IPC_CONNECT){ ++errcount; cl_perror("send"); cl_log(LOG_ERR , "Async echoclient: send failed" " rc %d, iter %d", rc, rdcount); cl_log(LOG_INFO, "Message being sent: %s" , (char*)rmsg->msg_body); if (!IPC_ISRCONN(chan)) { cl_log(LOG_ERR , "Async echoclient: EOF(2)" " iter %d", rdcount+1); EOFcheck(chan); return errcount; } continue; } ++wrcount; /*fprintf(stderr, "x"); */ } if (rdcount >= repcount) { break; } /* * At this point it is possible that the POLLOUT bit * being on is no longer necessary, but this will only * cause an extra (false) output poll iteration at worst... * This is because (IIRC) both is_sending_blocked(), and * is_message_pending() both perform a resume_io(). * This might be confusing, but -- oh well... */ /* fprintf(stderr, "P"); cl_log(LOG_INFO, "poll[%d, 0x%x]" , pf[0].fd, pf[0].events); cl_log(LOG_DEBUG, "poll[%d, 0x%x]..." , pf[0].fd, pf[0].events); fprintf(stderr, "%%"); cl_log(LOG_DEBUG, "CallingPollFunc()"); */ rc = PollFunc(pf, nfd, -1); /* Bad poll? */ if (rc <= 0) { cl_perror("Async echoclient: bad poll rc." " %d rc iter %d", rc, rdcount); ++errcount; continue; } /* Error indication? */ if ((pf[0].revents & (POLLERR|POLLNVAL)) != 0) { cl_log(LOG_ERR , "Async echoclient: bad poll revents." " revents: 0x%x iter %d", pf[0].revents, rdcount); ++errcount; continue; } /* HUP without input... Premature EOF... */ if ((pf[0].revents & POLLHUP) && ((pf[0].revents&POLLIN) == 0)) { cl_log(LOG_ERR , "Async echoclient: premature pollhup." " revents: 0x%x iter %d", pf[0].revents, rdcount); EOFcheck(chan); ++errcount; continue; } /* Error indication? */ if (nfd > 1 && (pf[1].revents & (POLLERR|POLLNVAL)) != 0) { cl_log(LOG_ERR , "Async echoclient: bad poll revents[1]." " revents: 0x%x iter %d", pf[1].revents, rdcount); ++errcount; continue; } /* Output unblocked (only) ? */ if (pf[nfd-1].revents & POLLOUT) { /*fprintf(stderr, "R");*/ chan->ops->resume_io(chan); }else if ((pf[0].revents & POLLIN) == 0) { /* Neither I nor O available... */ cl_log(LOG_ERR , "Async echoclient: bad events." " revents: 0x%x iter %d", pf[0].revents, rdcount); ++errcount; } } cl_poll_ignore(rfd); cl_poll_ignore(wfd); cl_log(LOG_INFO, "Async echoclient: %d errors, %d reads, %d writes", errcount, rdcount, wrcount);#if 0 cl_log(LOG_INFO, "%d destroying channel 0x%lx",getpid(), (unsigned long)chan);#endif chan->ops->destroy(chan); chan = NULL; return errcount;}struct iterinfo { int wcount; int rcount; int errcount; IPC_Channel* chan; int max; gboolean sendingsuspended;};static GMainLoop* loop = NULL;static gbooleans_send_msg(gpointer data){ struct iterinfo*i = data; IPC_Message* wmsg; int rc; ++i->wcount; wmsg = i->chan->ops->new_ipcmsg(i->chan, NULL, 32, NULL); echomsgbody(wmsg->msg_body, i->wcount, &wmsg->msg_len); /*cl_log(LOG_INFO, "s_send_msg: sending out %d", i->wcount);*/ if ((rc = i->chan->ops->send(i->chan, wmsg)) != IPC_OK) { cl_log(LOG_ERR , "s_send_msg: send failed" " %d rc iter %d" , rc, i->wcount); cl_log(LOG_ERR , "s_send_msg: channel status: %d qlen: %d" , i->chan->ch_status , i->chan->send_queue->current_qlen); ++i->errcount; if (i->chan->ch_status != IPC_CONNECT) { cl_log(LOG_ERR, "s_send_msg: Exiting."); return FALSE; } if (i->errcount >= MAXERRORS) { g_main_quit(loop); return FALSE; } } return !i->sendingsuspended?i->wcount < i->max: FALSE;}static voidmainloop_low_flow_callback(IPC_Channel* ch, void* userdata){ struct iterinfo* i = (struct iterinfo*) userdata; if (userdata == NULL){ cl_log(LOG_ERR, "userdata is NULL"); return; } if (i->sendingsuspended){ i->sendingsuspended = FALSE; g_idle_add(s_send_msg, i); } return; }static voidmainloop_high_flow_callback(IPC_Channel* ch, void* userdata){ struct iterinfo* i = (struct iterinfo*) userdata; if (userdata == NULL){ cl_log(LOG_ERR, "userdata is NULL"); return; } i->sendingsuspended = TRUE; }static gbooleans_rcv_msg(IPC_Channel* chan, gpointer data){ struct iterinfo*i = data; i->errcount += checkinput(chan, "s_rcv_msg", &i->rcount, i->max); if (chan->ch_status == IPC_DISCONNECT || i->rcount >= i->max || i->errcount > MAXERRORS) { if (i->rcount < i->max) { ++i->errcount; cl_log(LOG_INFO, "Early exit from s_rcv_msg"); } g_main_quit(loop); return FALSE; } return TRUE;}static gbooleancheckmsg(IPC_Message* rmsg, const char * who, int rcount){ char str[256]; size_t len; echomsgbody(str, rcount, &len); if (rmsg->msg_len != len) { cl_log(LOG_ERR , "checkmsg[%s]: length mismatch" " [expected %u, got %lu] iteration %d" , who, (unsigned)len , (unsigned long)rmsg->msg_len , rcount); cl_log(LOG_ERR , "checkmsg[%s]: expecting [%s]" , who, str); cl_log(LOG_ERR , "checkmsg[%s]: got [%s] instead" , who, (const char *)rmsg->msg_body); return FALSE; } if (strncmp(rmsg->msg_body, str, len) != 0) { cl_log(LOG_ERR , "checkmsg[%s]: data mismatch" ". input iteration %d" , who, rcount); cl_log(LOG_ERR , "checkmsg[%s]: expecting [%s]" , who, str); cl_log(LOG_ERR , "checkmsg[%s]: got [%s] instead" , who, (const char *)rmsg->msg_body); return FALSE;#if 0 }else if (strcmp(who, "s_rcv_msg") == 0) {#if 0 || strcmp(who, "s_echo_msg") == 0) {#endif cl_log(LOG_ERR , "checkmsg[%s]: data Good" "! input iteration %d" , who, rcount);#endif } return TRUE;}static gbooleans_echo_msg(IPC_Channel* chan, gpointer data){ struct iterinfo* i = data; int rc; IPC_Message* rmsg; while (chan->ops->is_message_pending(chan)) { if (chan->ch_status == IPC_DISCONNECT) { break; } if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) { cl_log(LOG_ERR , "s_echo_msg: recv failed %d rc iter %d" " errno=%d" , rc, i->rcount+1, errno); cl_perror("recv"); ++i->errcount; goto retout; } i->rcount++; if (!checkmsg(rmsg, "s_echo_msg", i->rcount)) { ++i->errcount; } /*cl_log(LOG_INFO, "s_echo_msg: rcount= %d, wcount =%d", i->rcount, i->wcount);*/ do { rc = chan->ops->send(chan, rmsg); }while (rc != IPC_OK && chan->ch_status == IPC_CONNECT); if (chan->ch_status != IPC_CONNECT){ cl_log(LOG_ERR, "s_echo_msg: send failed %d rc iter %d qlen %d", rc, i->rcount, chan->send_queue->current_qlen); cl_perror("send"); i->errcount ++; } i->wcount+=1; /*cl_log(LOG_INFO, "s_echo_msg: end of this ite");*/ } retout: /*fprintf(stderr, "%%");*/ if (i->rcount >= i->max || chan->ch_status == IPC_DISCONNECT || i->errcount > MAXERRORS) { chan->ops->waitout(chan); g_main_quit(loop); return FALSE; } return TRUE;}static voidinit_iterinfo(struct iterinfo * i, IPC_Channel* chan, int max){ memset(i, 0, sizeof(*i)); i->chan = chan; i->max = max; i->sendingsuspended = FALSE;}static intmainloop_server(IPC_Channel* chan, int repcount){ struct iterinfo info; GCHSource* msgchan; guint sendmsgsrc; loop = g_main_new(FALSE); init_iterinfo(&info, chan, repcount); chan->ops->set_high_flow_callback(chan, mainloop_high_flow_callback, &info); chan->ops->set_low_flow_callback(chan, mainloop_low_flow_callback, &info); chan->high_flow_mark = 20; chan->low_flow_mark = 2; sendmsgsrc = g_idle_add(s_send_msg, &info); msgchan = G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, chan , FALSE, s_rcv_msg, &info, NULL); cl_log(LOG_INFO, "Mainloop echo server: %d reps pid %d.", repcount, (int)getpid()); g_main_run(loop); g_main_destroy(loop); g_source_remove(sendmsgsrc); loop = NULL; cl_log(LOG_INFO, "Mainloop echo server: %d errors", info.errcount); return info.errcount;}static intmainloop_client(IPC_Channel* chan, int repcount){ struct iterinfo info; loop = g_main_new(FALSE); init_iterinfo(&info, chan, repcount); G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, chan , FALSE, s_echo_msg, &info, NULL); cl_log(LOG_INFO, "Mainloop echo client: %d reps pid %d.", repcount, (int)getpid()); g_main_run(loop); g_main_destroy(loop); loop = NULL; cl_log(LOG_INFO, "Mainloop echo client: %d errors, %d read %d written" , info.errcount, info.rcount, info.wcount); return info.errcount;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?