📄 ipctest.c
字号:
} return errs;}static intasyn_echoserver(IPC_Channel* wchan, int repcount){ int rdcount = 0; int wrcount = 0; int errcount = 0; int blockedcount = 0; IPC_Message* wmsg; int lastcount = -1; const char* w = "asyn_echoserver"; cl_log(LOG_INFO, "Asyn echo server: %d reps pid %d." , repcount, (int)getpid()); while (rdcount < repcount) { int rc; do { ++wrcount; if (wrcount > repcount) { break; } wmsg = newmessage(wchan, wrcount); /*fprintf(stderr, "s"); */ if ((rc = wchan->ops->send(wchan, wmsg)) != IPC_OK) { cl_log(LOG_ERR , "asyn_echoserver: send failed" " %d rc iter %d" , rc, wrcount); ++errcount; continue; } lastcount = wrcount; if (wchan->ops->is_sending_blocked(wchan)) { /* fprintf(stderr, "b"); */ ++blockedcount; }else{ blockedcount = 0; } errcount += checkinput(wchan, w, &rdcount, repcount); if (wrcount < repcount && wchan->ch_status == IPC_DISCONNECT) { ++errcount; break; } }while (wrcount < repcount && blockedcount < 10 && wchan->ch_status != IPC_DISCONNECT); if (wrcount < repcount) { /* fprintf(stderr, "B"); */ } wchan->ops->waitout(wchan); errcount += checkinput(wchan, w, &rdcount, repcount); if (wrcount >= repcount && rdcount < repcount) { while ((rc = wchan->ops->waitin(wchan)) == IPC_INTR); if (rc != IPC_OK) { cl_log(LOG_ERR , "asyn_echoserver: waitin()" " failed %d rc rdcount %d errno=%d" , rc, rdcount, errno); cl_perror("waitin"); exit(1); } } if (wchan->ch_status == IPC_DISCONNECT && rdcount < repcount) { cl_log(LOG_ERR , "asyn_echoserver: EOF in iter %d" , rdcount); EOFcheck(wchan); ++errcount; break; } } cl_log(LOG_INFO, "asyn_echoserver: %d errors", errcount);#if 0 cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)wchan);#endif wchan->ops->destroy(wchan); wchan = NULL; return errcount;}static intasyn_echoclient(IPC_Channel* chan, int repcount){ int rdcount = 0; int wrcount = 0; int errcount = 0; IPC_Message* rmsg; int rfd = chan->ops->get_recv_select_fd(chan); int wfd = chan->ops->get_send_select_fd(chan); gboolean rdeqwr = (rfd == wfd); cl_log(LOG_INFO, "Async Echo client: %d reps pid %d." , repcount, (int)getpid()); ipc_set_pollfunc(PollFunc); while (rdcount < repcount && errcount < repcount) { int rc; struct pollfd pf[2]; int nfd = 1; pf[0].fd = rfd; pf[0].events = POLLIN|POLLHUP; if (chan->ops->is_sending_blocked(chan)) { if (rdeqwr) { pf[0].events |= POLLOUT; }else{ nfd = 2; pf[1].fd = wfd; pf[1].events = POLLOUT|POLLHUP; } } /* Have input? */ /* fprintf(stderr, "i"); */ while (chan->ops->is_message_pending(chan) && rdcount < repcount) { /*fprintf(stderr, "r"); */ if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) { if (!IPC_ISRCONN(chan)) { cl_log(LOG_ERR , "Async echoclient: disconnect" " iter %d", rdcount+1); ++errcount; return errcount; } cl_log(LOG_ERR , "Async echoclient: recv" " failed %d rc iter %d errno=%d" , rc, rdcount+1, errno); cl_perror("recv"); rmsg=NULL; ++errcount; cl_log(LOG_INFO, "sleep(1)"); sleep(1); continue; } /*fprintf(stderr, "c"); */ ++rdcount; if ((rc = chan->ops->send(chan, rmsg)) != IPC_OK) { ++errcount; cl_perror("send"); cl_log(LOG_ERR , "Async echoclient: send failed" " rc %d, iter %d", rc, rdcount); cl_log(LOG_INFO, "Message being sent: %s" , (char*)rmsg->msg_body); if (!IPC_ISRCONN(chan)) { cl_log(LOG_ERR , "Async echoclient: EOF(2)" " iter %d", rdcount+1); EOFcheck(chan); return errcount; } continue; }else{ ++wrcount; } /*fprintf(stderr, "x"); */ } if (rdcount >= repcount) { break; } /* * At this point it is possible that the POLLOUT bit * being on is no longer necessary, but this will only * cause an extra (false) output poll iteration at worst... * This is because (IIRC) both is_sending_blocked(), and * is_message_pending() both perform a resume_io(). * This might be confusing, but -- oh well... */ /* fprintf(stderr, "P"); cl_log(LOG_INFO, "poll[%d, 0x%x]" , pf[0].fd, pf[0].events); cl_log(LOG_DEBUG, "poll[%d, 0x%x]..." , pf[0].fd, pf[0].events); fprintf(stderr, "%%"); cl_log(LOG_DEBUG, "CallingPollFunc()"); */ rc = PollFunc(pf, nfd, -1); /* Bad poll? */ if (rc <= 0) { cl_perror("Async echoclient: bad poll rc." " %d rc iter %d", rc, rdcount); ++errcount; continue; } /* Error indication? */ if ((pf[0].revents & (POLLERR|POLLNVAL)) != 0) { cl_log(LOG_ERR , "Async echoclient: bad poll revents." " revents: 0x%x iter %d", pf[0].revents, rdcount); ++errcount; continue; } /* HUP without input... Premature EOF... */ if ((pf[0].revents & POLLHUP) && ((pf[0].revents&POLLIN) == 0)) { cl_log(LOG_ERR , "Async echoclient: premature pollhup." " revents: 0x%x iter %d", pf[0].revents, rdcount); EOFcheck(chan); ++errcount; continue; } /* Error indication? */ if (nfd > 1 && (pf[1].revents & (POLLERR|POLLNVAL)) != 0) { cl_log(LOG_ERR , "Async echoclient: bad poll revents[1]." " revents: 0x%x iter %d", pf[1].revents, rdcount); ++errcount; continue; } /* Output unblocked (only) ? */ if (pf[nfd-1].revents & POLLOUT) { /*fprintf(stderr, "R");*/ chan->ops->resume_io(chan); }else if ((pf[0].revents & POLLIN) == 0) { /* Neither I nor O available... */ cl_log(LOG_ERR , "Async echoclient: bad events." " revents: 0x%x iter %d", pf[0].revents, rdcount); ++errcount; } } cl_poll_ignore(rfd); cl_poll_ignore(wfd); cl_log(LOG_INFO, "Async echoclient: %d errors, %d reads, %d writes" , errcount, rdcount, wrcount);#if 0 cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)chan);#endif chan->ops->destroy(chan); chan = NULL; return errcount;}struct iterinfo { int wcount; int rcount; int errcount; IPC_Channel* chan; int max; gboolean sendingsuspended;};static GMainLoop* loop = NULL;static gbooleans_send_msg(gpointer data){ struct iterinfo*i = data; IPC_Message* wmsg; int rc; /* Flow control? */ if (i->chan->send_queue->current_qlen >= i->chan->send_queue->max_qlen-2) { i->sendingsuspended = TRUE; return FALSE; } if (i->sendingsuspended) { i->sendingsuspended = FALSE; } ++i->wcount; wmsg = newmessage(i->chan, i->wcount); /*fprintf(stderr, "s");*/ if ((rc = i->chan->ops->send(i->chan, wmsg)) != IPC_OK) { cl_log(LOG_ERR , "s_send_msg: send failed" " %d rc iter %d" , rc, i->wcount); cl_log(LOG_ERR , "s_send_msg: channel status: %d qlen: %d" , i->chan->ch_status , i->chan->send_queue->current_qlen); ++i->errcount; if (i->chan->ch_status != IPC_CONNECT) { cl_log(LOG_ERR, "s_send_msg: Exiting."); return FALSE; } if (i->errcount >= MAXERRORS) { g_main_quit(loop); return FALSE; } } return i->wcount < i->max;}static gbooleans_rcv_msg(IPC_Channel* chan, gpointer data){ struct iterinfo*i = data; i->errcount += checkinput(chan, "s_rcv_msg", &i->rcount, i->max); if (i->sendingsuspended && !chan->ops->is_sending_blocked(chan)) { i->sendingsuspended = FALSE; g_idle_add(s_send_msg, data); } if (chan->ch_status == IPC_DISCONNECT || i->rcount >= i->max || i->errcount > MAXERRORS) { if (i->rcount < i->max) { ++i->errcount; cl_log(LOG_INFO, "Early exit from s_rcv_msg"); } g_main_quit(loop); return FALSE; } return TRUE;}static gbooleancheckmsg(IPC_Message* rmsg, const char * who, int rcount){ char str[256]; size_t len; echomsgbody(str, rcount, &len); if (rmsg->msg_len != len) { cl_log(LOG_ERR , "checkmsg[%s]: length mismatch" " [expected %u, got %lu] iteration %d" , who, (unsigned)len , (unsigned long)rmsg->msg_len , rcount); cl_log(LOG_ERR , "checkmsg[%s]: expecting [%s]" , who, str); cl_log(LOG_ERR , "checkmsg[%s]: got [%s] instead" , who, (const char *)rmsg->msg_body); return FALSE; } if (strncmp(rmsg->msg_body, str, len) != 0) { cl_log(LOG_ERR , "checkmsg[%s]: data mismatch" ". input iteration %d" , who, rcount); cl_log(LOG_ERR , "checkmsg[%s]: expecting [%s]" , who, str); cl_log(LOG_ERR , "checkmsg[%s]: got [%s] instead" , who, (const char *)rmsg->msg_body); return FALSE;#if 0 }else if (strcmp(who, "s_rcv_msg") == 0) {#if 0 || strcmp(who, "s_echo_msg") == 0) {#endif cl_log(LOG_ERR , "checkmsg[%s]: data Good" "! input iteration %d" , who, rcount);#endif } return TRUE;}static gbooleans_echo_msg(IPC_Channel* chan, gpointer data){ struct iterinfo* i = data; int rc; IPC_Message* rmsg; while (chan->ops->is_message_pending(chan)) { if (chan->ch_status == IPC_DISCONNECT) { break; } if (i->chan->send_queue->current_qlen >= i->chan->send_queue->max_qlen-2) { i->sendingsuspended = TRUE; cl_log(LOG_INFO , "s_echo_msg: Sending suspended."); goto retout; } i->sendingsuspended = FALSE; if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) { cl_log(LOG_ERR , "s_echo_msg: recv failed %d rc iter %d" " errno=%d" , rc, i->rcount+1, errno); cl_perror("recv"); ++i->errcount; goto retout; } i->rcount++; if (!checkmsg(rmsg, "s_echo_msg", i->rcount)) { ++i->errcount; } /*fprintf(stderr, "c");*/ if ((rc = chan->ops->send(chan, rmsg)) != IPC_OK) { cl_log(LOG_ERR , "s_echo_msg: send failed %d rc iter %d qlen %d" , rc, i->rcount, chan->send_queue->current_qlen); cl_perror("s_echo_msg:send"); ++i->errcount; }else{ i->wcount+=1; } }retout: /*fprintf(stderr, "%%");*/ if (i->rcount >= i->max || chan->ch_status == IPC_DISCONNECT || i->errcount > MAXERRORS) { chan->ops->waitout(chan); g_main_quit(loop); return FALSE; } return TRUE;}static voidinit_iterinfo(struct iterinfo * i, IPC_Channel* chan, int max){ memset(i, 0, sizeof(*i)); i->chan = chan; i->max = max;}static intmainloop_server(IPC_Channel* chan, int repcount){ struct iterinfo info; GCHSource* msgchan; guint sendmsgsrc; loop = g_main_new(FALSE); init_iterinfo(&info, chan, repcount); sendmsgsrc = g_idle_add(s_send_msg, &info); msgchan = G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, chan , FALSE, s_rcv_msg, &info, NULL); cl_log(LOG_INFO, "Mainloop echo server: %d reps pid %d.", repcount, (int)getpid()); g_main_run(loop); g_main_destroy(loop); g_source_remove(sendmsgsrc); loop = NULL; cl_log(LOG_INFO, "Mainloop echo server: %d errors", info.errcount); return info.errcount;}static intmainloop_client(IPC_Channel* chan, int repcount){ struct iterinfo info; loop = g_main_new(FALSE); init_iterinfo(&info, chan, repcount); G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, chan , FALSE, s_echo_msg, &info, NULL); cl_log(LOG_INFO, "Mainloop echo client: %d reps pid %d.", repcount, (int)getpid()); g_main_run(loop); g_main_destroy(loop); loop = NULL; cl_log(LOG_INFO, "Mainloop echo client: %d errors, %d read %d written" , info.errcount, info.rcount, info.wcount); return info.errcount;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -