inet_gethost.c
来自「OTP是开放电信平台的简称」· C语言 代码 · 共 2,324 行 · 第 1/4 页
C
2,324 行
} } return 0;}static void main_loop(void){ AddrByte *inbuff = NULL; int insize; int i,w;#ifdef WIN32 HANDLE handles[64]; DWORD num_handles; DWORD index; QueItem *qi;#else size_t inbuff_size = 0; fd_set fds; int max_fd;#endif int new_data; int save_serial; /* It's important that the free workers list is handled first */ Worker *workers[3] = {free_workers, busy_workers, stalled_workers}; int *wsizes[3] = {&num_free_workers, &num_busy_workers, &num_stalled_workers}; int (*handlers[3])(int) = {&handle_io_free, &handle_io_busy, &handle_io_stalled}; Worker *cw; AddrByte domainbuff[DOMAINNAME_MAX];#ifdef WIN32 { DWORD dummy; /* Create the reader and writer */ if ((!create_mesq(&to_erlang)) || (!create_mesq(&from_erlang))) { fatal("Could not create message que! errno = %d.",GetLastError()); } if (((HANDLE) _beginthreadex(NULL,0,writer,to_erlang,0,&dummy)) == NULL) { fatal("Could not create writer thread! errno = %d.",GetLastError()); } if (((HANDLE) _beginthreadex(NULL,0,reader,from_erlang,0,&dummy)) == NULL) { fatal("Could not create reader thread! errno = %d.",GetLastError()); } DEBUGF(4,("Created reader and writer threads.")); }#endif for(;;) {#ifdef WIN32 num_handles = 0; handles[num_handles++] = event_mesq(from_erlang); for (w = 0; w < 3; ++w) { for (i = 0; i < *wsizes[w]; ++i) { handles[num_handles++] = event_mesq(workers[w][i].readfrom); } } if ((index = WaitForMultipleObjects(num_handles, handles, FALSE, INFINITE)) == WAIT_FAILED) { fatal("Could not WaitForMultpleObjects! errno = %d.",GetLastError()); } w = 0; index -= WAIT_OBJECT_0; DEBUGF(4,("Got data on index %d.",index)); if (index > 0) { if (((int)index - 1) < *wsizes[0]) { (*handlers[0])(index - 1); } else if (((int)index - 1) < ((*wsizes[0]) + (*wsizes[1]))) { (*handlers[1])(index - 1 - (*wsizes[0])); } else { (*handlers[2])(index - 1 - (*wsizes[0]) - (*wsizes[1])); } } new_data = (index == 0);#else max_fd = 0; FD_ZERO(&fds); FD_SET(0,&fds); for (w = 0; w < 3; ++w) { for (i = 0; i < *wsizes[w]; ++i) { FD_SET(workers[w][i].readfrom,&fds); if (workers[w][i].readfrom > max_fd) { max_fd = workers[w][i].readfrom; } } } for (;;) { if (select(max_fd + 1,&fds,NULL,NULL,NULL) < 0) { if (errno == EINTR) { continue; } else { fatal("Select failed (invalid internal structures?), " "errno = %d.",errno); } } break; } for (w = 0; w < 3; ++w) { for (i = 0; i < *wsizes[w]; ++i) { if (FD_ISSET(workers[w][i].readfrom, &fds)) { int hres = (*handlers[w])(i); if (hres < 0) { return; } else { i -= hres; /* We'll retry this position, if hres == 1. The position is usually replaced with another worker, a worker with I/O usually changes state as we use blocking file I/O */ } } } } new_data = FD_ISSET(0,&fds);#endif check_que(); /* Now check for new requests... */ if (new_data) { /* Erlang... */ OpType op;#ifdef WIN32 if (!deque_mesq(from_erlang,&qi)) { DEBUGF(1,("Erlang has closed.")); return; } insize = qi->req_size; inbuff = qi->request; DEBUGF(4,("Got data from erlang.")); DEBUGF(4,("OPeration == %d.",get_op(inbuff)));#else insize = read_request(&inbuff, &inbuff_size); if (insize == 0) { /* Other errors taken care of in read_request */ DEBUGF(1,("Erlang has closed.")); return; }#endif op = get_op(inbuff); if (op == OP_CANCEL_REQUEST) { SerialType serial = get_serial(inbuff); if (!clean_que_of(serial)) { for (i = 0; i < num_busy_workers; ++i) { if (busy_workers[i].serial == serial) { if (busy_workers[i].que_size) { restart_worker(&busy_workers[i]); start_que_request(&busy_workers[i]); } else { stall_worker(i); check_que(); } break; } } }#ifdef WIN32 FREE(qi);#endif continue; /* New select */ } else if (op == OP_CONTROL) { CtlType ctl; SerialType serial = get_serial(inbuff); if (serial != INVALID_SERIAL) { fatal("Invalid serial: %d.", serial); } switch (ctl = get_ctl(inbuff)) { case SETOPT_DEBUG_LEVEL: debug_level = get_debug_level(inbuff); DEBUGF(debug_level, ("debug_level = %d", debug_level)); for (w = 0; w < 3; ++w) { for (i = 0; i < *wsizes[w]; i++) { int res; cw = &(workers[w][i]);#ifdef WIN32 if ((res = send_mes_to_worker(qi, cw)) == 0) { QueItem *m = ALLOC(sizeof(QueItem) - 1 + qi->req_size); memcpy(qi->request, m->request, (m->req_size = qi->req_size)); m->next = NULL; qi = m; }#else res = send_request_to_worker(inbuff, insize, cw);#endif if (res != 0) { kill_worker(cw); (*wsizes[w])--; *cw = workers[w][*wsizes[w]]; } } } break; default: warning("Unknown control requested from erlang (%d), " "message discarded.", (int) ctl); break; } #ifdef WIN32 FREE(qi);#endif continue; /* New select */ } else { ProtoType proto; if (op != OP_GETHOSTBYNAME && op != OP_GETHOSTBYADDR) { warning("Unknown operation requested from erlang (%d), " "message discarded.", op);#ifdef WIN32 FREE(qi);#endif continue; } if ((proto = get_proto(inbuff)) != PROTO_IPV4 && proto != PROTO_IPV6) { warning("Unknown protocol requested from erlang (%d), " "message discarded.", proto);#ifdef WIN32 FREE(qi);#endif continue; } if (get_domainname(inbuff,insize,domainbuff) < 0) { warning("Malformed message sent from erlang, no domain, " "message discarded.", op);#ifdef WIN32 FREE(qi);#endif continue; } } if (BEE_GREEDY()) { DEBUGF(4,("Beeing greedy!")); if ((cw = pick_worker_greedy(domainbuff)) != NULL) { /* Put it in the worker specific que if the domainname matches... */#ifndef WIN32 QueItem *qi = ALLOC(sizeof(QueItem) - 1 + insize); qi->req_size = insize; memcpy(&(qi->request), inbuff, insize); qi->next = NULL;#endif if (!cw->que_first) { cw->que_first = cw->que_last = qi; } else { cw->que_last->next = qi; cw->que_last = qi; } ++(cw->que_size); continue; } /* Otherwise busyness as usual */ } save_serial = get_serial(inbuff); while ((cw = pick_worker()) != NULL) { int res;#ifdef WIN32 res = send_mes_to_worker(qi,cw);#else res = send_request_to_worker(inbuff, insize, cw);#endif if (res == 0) { break; } else { kill_last_picked_worker(); } } if (cw == NULL) { /* Insert into que */#ifndef WIN32 QueItem *qi = ALLOC(sizeof(QueItem) - 1 + insize); qi->req_size = insize; memcpy(&(qi->request), inbuff, insize); qi->next = NULL;#endif if (!que_first) { que_first = que_last = qi; } else { que_last->next = qi; que_last = qi; } } else { cw->serial = save_serial; domaincopy(cw->domain, domainbuff); } } }}/* * Main process worker administration */static void init_workers(int max){ max_workers = max; num_busy_workers = 0; num_free_workers = 0; num_stalled_workers = 0; busy_workers = ALLOC(sizeof(Worker) * max_workers); free_workers = ALLOC(sizeof(Worker) * max_workers); stalled_workers = ALLOC(sizeof(Worker) * max_workers);#ifndef WIN32 init_signals();#endif}#ifdef WIN32static void kill_worker(Worker *pw){ /* Cannot really kill a thread in win32, have to just leave it to die */ close_mesq(pw->writeto); close_mesq(pw->readfrom); pw->state = WORKER_EMPTY;}#elsestatic void kill_worker(Worker *pw){ fd_set fds; struct timeval tmo; int selret; static char buff[1024]; DEBUGF(3,("Killing worker[%ld] with fd %d, serial %d", (long) pw->pid, (int) pw->readfrom, (int) pw->serial)); kill(pw->pid, SIGUSR1); /* This is all just to check that the child died, not really necessary */ for(;;) { FD_ZERO(&fds); FD_SET(pw->readfrom, &fds); tmo.tv_usec=0; tmo.tv_sec = CHILDWAIT_TMO; selret = select(pw->readfrom+1, &fds, NULL, NULL, &tmo); if (selret < 0) { if (errno != EINTR) { warning("Unable to select on dying child file descriptor, " "errno = %d.",errno); break; } } else if (selret == 0) { warning("Timeout waiting for child process to die, " "ignoring child (pid = %d).", pw->pid); break; } else { int ret; if ((ret = read(pw->readfrom, buff, 1024)) < 0) { if (errno != EINTR) { warning("Child file descriptor not closed properly, " "errno = %d", errno); break; } } else if (ret == 0) { break; } /* continue */ } } /* Waiting is done by signal handler... */ close(pw->readfrom); close(pw->writeto); pw->state = WORKER_EMPTY; /* Leave rest as is... */}static void kill_all_workers(void) /* Emergency function, will not check that the children died... */{ int i; for (i = 0; i < num_busy_workers; ++i) { kill(busy_workers[i].pid, SIGUSR1); } for (i = 0; i < num_free_workers; ++i) { kill(free_workers[i].pid, SIGUSR1); } for (i = 0; i < num_stalled_workers; ++i) { kill(stalled_workers[i].pid, SIGUSR1); }}#endif /* !WIN32 */static Worker *pick_worker(void){ Worker tmp; if (num_free_workers > 0) { --num_free_workers; tmp = free_workers[num_free_workers]; } else if (num_stalled_workers > 0) { /* "restart" the worker... */ --num_stalled_workers; kill_worker(&(stalled_workers[num_stalled_workers])); if (create_worker(&tmp,0) < 0) { warning("Unable to create worker process, insufficient " "resources"); return NULL; } } else { if (num_busy_workers == max_workers) { return NULL; } if (create_worker(&tmp,0) < 0) { warning("Unable to create worker process, insufficient " "resources"); return NULL; } } /* tmp contains a worker now, make it busy and put it in the right array */ tmp.state = WORKER_BUSY; busy_workers[num_busy_workers] = tmp; ++num_busy_workers; return &(busy_workers[num_busy_workers-1]);}static Worker *pick_worker_greedy(AddrByte *domainbuff){ int i; int ql = 0; int found = -1; for (i=0; i < num_busy_workers; ++i) { if (domaineq(busy_workers[i].domain, domainbuff)) { if ((found < 0) || (busy_workers[i].que_size < busy_workers[found].que_size)) { found = i; ql = busy_workers[i].que_size; } } } if (found >= 0) { return &busy_workers[found]; } return NULL;}static void restart_worker(Worker *w){ kill_worker(w); if (create_worker(w,1) < 0) { fatal("Unable to create worker process, insufficient resources"); }}static void kill_last_picked_worker(void){ kill_worker( &(busy_workers[num_busy_workers-1])); --num_busy_workers;}/* * Starts a request qued to a specific worker, check_que starts normally queued requests. * We expect a que here... */static void start_que_request(Worker *w) { QueItem *qi; SerialType save_serial; if (!w->que_first || !w->que_size) { fatal("Expected que'd requests but found none, " "internal datastructure corrupted!"); } qi = w->que_first; w->que_first = w->que_first->next; if (!w->que_first) { w->que_last = NULL; } --(w->que_size); save_serial = get_serial(qi->request);#ifdef WIN32 while (send_mes_to_worker(qi, w) != 0) { restart_worker(w); }#else while (send_request_to_worker(qi->request, qi->req_size, w) != 0) { restart_worker(w); }#endif w->serial = save_serial; DEBUGF(3,("Did deque serial %d from worker[%ld] specific que, " "Que is %sempty", get_serial(qi->request), (long) w->pid, (w->que_first) ? "not " : ""));#ifndef WIN32 FREE(qi);#endif}#ifndef WIN32 /* Signal utilities */static RETSIGTYPE (*sys_sigset(int sig, RETSIGTYPE (*func)(int)))(int){ struct sigaction act, oact; sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = func; sigaction(sig, &act, &oact); return(oact.sa_handler);}static void sys_sigblock(int sig){ sigset_t mask; sigemptyset(&mask); sigaddset(&mask, sig); sigprocmask(SIG_BLOCK, &mask, (sigset_t *)NULL);}static void sys_sigrelease(int sig){ sigset_t mask; sigemptyset(&mask); sigaddset(&mask, sig); sigprocmask(SIG_UNBLOCK, &mask, (sigset_t *)NULL);}/* Child signal handler */void reap_children(int ignored){ int res; sys_sigblock(SIGCHLD); for (;;) { while ((res = waitpid((pid_t)-1, NULL, WNOHANG)) > 0) ; if (!(res < 0 && errno == EAGAIN)) { DEBUGF(4,("reap_children: res = %d, errno = %d.",res,errno)); break; } } sys_sigrelease(SIGCHLD);}static void init_signals(void){ sys_sigset(SIGCHLD,&reap_children); /* SIG_IGN would give same result on most (?) platforms. */ sys_sigset(SIGPIPE, SIG_IGN);}#endifstatic void stall_worker(int ndx){ --num_busy_workers; stalled_workers[num_stalled_workers] = busy_workers[ndx]; stalled_workers[num_stalled_workers].state = WORKER_STALLED; busy_workers[ndx] = busy_workers[num_busy_workers]; DEBUGF(3, ("Stalled worker[%ld]", (long) stalled_workers[num_stalled_workers].pid)); ++num_stalled_workers;}/* * Main loop message passing */#ifndef WIN32static int read_request(AddrByte **buff, size_t *buff_size){ int siz; int r; if ((r = READ_PACKET_BYTES(0,&siz)) != PACKET_BYTES) { if (r == 0) { return 0;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?