inet_gethost.c

来自「OTP是开放电信平台的简称」· C语言 代码 · 共 2,324 行 · 第 1/4 页

C
2,324
字号
	}    }    return 0;}static void main_loop(void){    AddrByte *inbuff = NULL;    int insize;    int i,w;#ifdef WIN32    HANDLE handles[64];    DWORD num_handles;    DWORD index;    QueItem *qi;#else    size_t inbuff_size = 0;    fd_set fds;    int max_fd;#endif    int new_data;    int save_serial;    /* It's important that the free workers list is handled first */    Worker *workers[3] = {free_workers, busy_workers, stalled_workers};    int *wsizes[3] = {&num_free_workers, &num_busy_workers,		      &num_stalled_workers};    int (*handlers[3])(int) = {&handle_io_free, &handle_io_busy, 			       &handle_io_stalled};    Worker *cw;    AddrByte domainbuff[DOMAINNAME_MAX];#ifdef WIN32    {	DWORD dummy;	/* Create the reader and writer */	if ((!create_mesq(&to_erlang)) || (!create_mesq(&from_erlang))) {	    fatal("Could not create message que! errno = %d.",GetLastError());	}	if (((HANDLE) _beginthreadex(NULL,0,writer,to_erlang,0,&dummy))	    == NULL) {	    fatal("Could not create writer thread! errno = %d.",GetLastError());	}	if (((HANDLE) _beginthreadex(NULL,0,reader,from_erlang,0,&dummy)) 	    == NULL) {	    fatal("Could not create reader thread! errno = %d.",GetLastError());	}	DEBUGF(4,("Created reader and writer threads."));    }#endif    for(;;) {#ifdef WIN32	num_handles = 0;	handles[num_handles++] = event_mesq(from_erlang);	for (w = 0; w < 3; ++w) {	    for (i = 0; i < *wsizes[w]; ++i) {		handles[num_handles++] = event_mesq(workers[w][i].readfrom);	    }	}	if ((index = WaitForMultipleObjects(num_handles, handles, FALSE, INFINITE))	    == WAIT_FAILED) {	    fatal("Could not WaitForMultpleObjects! errno = %d.",GetLastError());	}	w = 0;	index -= WAIT_OBJECT_0;	DEBUGF(4,("Got data on index %d.",index));	if (index > 0) {	    if (((int)index - 1) < *wsizes[0]) {		(*handlers[0])(index - 1);	    } else if (((int)index - 1) <  ((*wsizes[0]) + (*wsizes[1]))) {		(*handlers[1])(index - 1 - (*wsizes[0]));	    } else {		(*handlers[2])(index - 1 - (*wsizes[0]) - (*wsizes[1]));	    }	}	new_data = (index == 0);#else	max_fd = 0;	FD_ZERO(&fds);	FD_SET(0,&fds);	for (w = 0; w < 3; ++w) {	    for (i = 0; i < *wsizes[w]; ++i) {		FD_SET(workers[w][i].readfrom,&fds);		if (workers[w][i].readfrom > max_fd) {		    max_fd =  workers[w][i].readfrom;		}	    }	}	for (;;) {	    if (select(max_fd + 1,&fds,NULL,NULL,NULL) < 0) {		if (errno == EINTR) {		    continue;		} else {		    fatal("Select failed (invalid internal structures?), "			  "errno = %d.",errno);		}	    }	    break;	}	for (w = 0; w < 3; ++w) {	    for (i = 0; i < *wsizes[w]; ++i) {		if (FD_ISSET(workers[w][i].readfrom, &fds)) {		    int hres = (*handlers[w])(i);		    if (hres < 0) {			return;		    } else {			i -= hres; /* We'll retry this position, if hres == 1. 				      The position is usually				      replaced with another worker, 				      a worker with				      I/O usually changes state as we 				      use blocking file I/O */		    }		}	    }	}	new_data = FD_ISSET(0,&fds);#endif	check_que();	/* Now check for new requests... */	if (new_data) { /* Erlang... */	    OpType op;#ifdef WIN32	    if (!deque_mesq(from_erlang,&qi)) {		DEBUGF(1,("Erlang has closed."));		return;	    }	    insize = qi->req_size;	    inbuff = qi->request;	    DEBUGF(4,("Got data from erlang."));	    DEBUGF(4,("OPeration == %d.",get_op(inbuff)));#else	    insize = read_request(&inbuff, &inbuff_size);	    if (insize == 0) { /* Other errors taken care of in 				    read_request */		DEBUGF(1,("Erlang has closed."));		return;	    }#endif	    op = get_op(inbuff);	    if (op == OP_CANCEL_REQUEST) {		SerialType serial = get_serial(inbuff);		if (!clean_que_of(serial)) {		    for (i = 0; i <  num_busy_workers; ++i) {			if (busy_workers[i].serial == serial) {		    			    if (busy_workers[i].que_size) {				restart_worker(&busy_workers[i]);				start_que_request(&busy_workers[i]);			    } else {				stall_worker(i);				check_que();			    }			    break;			}		    }		}#ifdef WIN32		FREE(qi);#endif		continue; /* New select */	    } else if (op == OP_CONTROL) {		CtlType ctl;		SerialType serial = get_serial(inbuff);		if (serial != INVALID_SERIAL) {		    fatal("Invalid serial: %d.", serial);		}		switch (ctl = get_ctl(inbuff)) {		case SETOPT_DEBUG_LEVEL:		    debug_level = get_debug_level(inbuff);		    DEBUGF(debug_level, ("debug_level = %d", debug_level));		    for (w = 0; w < 3; ++w) {			for (i = 0; i < *wsizes[w]; i++) {			    int res;			    cw = &(workers[w][i]);#ifdef WIN32			    if ((res = send_mes_to_worker(qi, cw)) == 0) {				QueItem *m =				    ALLOC(sizeof(QueItem) - 1 + qi->req_size);				memcpy(qi->request, m->request,				       (m->req_size = qi->req_size));				m->next = NULL;				qi = m;			    }#else			    res = send_request_to_worker(inbuff, insize, cw);#endif			    if (res != 0) {			        kill_worker(cw);				(*wsizes[w])--;				*cw = workers[w][*wsizes[w]];			    }			}	            }	            break;		default:		    warning("Unknown control requested from erlang (%d), "			    "message discarded.", (int) ctl);		    break;		} #ifdef WIN32		FREE(qi);#endif		continue; /* New select */	    } else {		ProtoType proto;		if (op != OP_GETHOSTBYNAME && op != OP_GETHOSTBYADDR) {		    warning("Unknown operation requested from erlang (%d), "			    "message discarded.", op);#ifdef WIN32		    FREE(qi);#endif		    continue;		}		if ((proto = get_proto(inbuff)) != PROTO_IPV4 &&		    proto != PROTO_IPV6) {		    warning("Unknown protocol requested from erlang (%d), "			    "message discarded.", proto);#ifdef WIN32		    FREE(qi);#endif		    continue;		}		if (get_domainname(inbuff,insize,domainbuff) < 0) {		    warning("Malformed message sent from erlang, no domain, "			    "message discarded.", op);#ifdef WIN32		    FREE(qi);#endif		    continue;		}	    }   	    if (BEE_GREEDY()) {		DEBUGF(4,("Beeing greedy!"));		if ((cw = pick_worker_greedy(domainbuff)) != NULL) {		    /* Put it in the worker specific que if the 		       domainname matches... */#ifndef WIN32		    		    QueItem *qi = ALLOC(sizeof(QueItem) - 1 +					insize);		    qi->req_size = insize;		    memcpy(&(qi->request), inbuff, insize);		    qi->next = NULL;#endif		    if (!cw->que_first) {			cw->que_first = cw->que_last = qi;		    } else {			cw->que_last->next = qi;			cw->que_last = qi;		    }		    ++(cw->que_size);		    continue;		}		/* Otherwise busyness as usual */	    }	    save_serial = get_serial(inbuff);	    while ((cw = pick_worker()) != NULL) {		int res;#ifdef WIN32		res = send_mes_to_worker(qi,cw);#else		res = send_request_to_worker(inbuff, insize, cw);#endif		if (res == 0) {		    break;		} else {		    kill_last_picked_worker();		}	    }	    if (cw == NULL) {		/* Insert into que */#ifndef WIN32		QueItem *qi = ALLOC(sizeof(QueItem) - 1 +				    insize);		qi->req_size = insize;		memcpy(&(qi->request), inbuff, insize);		qi->next = NULL;#endif		if (!que_first) {		    que_first = que_last = qi;		} else {		    que_last->next = qi;		    que_last = qi;		}	    } else {		cw->serial = save_serial;		domaincopy(cw->domain, domainbuff);	    }	}    }}/* * Main process worker administration */static void init_workers(int max){    max_workers = max;    num_busy_workers = 0;    num_free_workers = 0;    num_stalled_workers = 0;    busy_workers = ALLOC(sizeof(Worker) * max_workers);    free_workers = ALLOC(sizeof(Worker) * max_workers);    stalled_workers = ALLOC(sizeof(Worker) * max_workers);#ifndef WIN32    init_signals();#endif}#ifdef WIN32static void kill_worker(Worker *pw){    /* Cannot really kill a thread in win32, have to just leave it to die */    close_mesq(pw->writeto);    close_mesq(pw->readfrom);    pw->state = WORKER_EMPTY;}#elsestatic void kill_worker(Worker *pw){    fd_set fds;    struct timeval tmo;    int selret;    static char buff[1024];    DEBUGF(3,("Killing worker[%ld] with fd %d, serial %d", 	      (long) pw->pid,	      (int) pw->readfrom, 	      (int) pw->serial));    kill(pw->pid, SIGUSR1);    /* This is all just to check that the child died, not        really necessary */    for(;;) {	FD_ZERO(&fds);	FD_SET(pw->readfrom, &fds);	tmo.tv_usec=0;	tmo.tv_sec = CHILDWAIT_TMO;	selret = select(pw->readfrom+1, &fds, NULL, NULL, &tmo);	if (selret < 0) {	    if (errno != EINTR) {		warning("Unable to select on dying child file descriptor, "			"errno = %d.",errno);		break;	    }	} else if (selret == 0) {	    warning("Timeout waiting for child process to die, "		    "ignoring child (pid = %d).", pw->pid);	    break;	} else {	    int ret;	    if ((ret = read(pw->readfrom, buff, 1024)) < 0) {		if (errno != EINTR) {		    warning("Child file descriptor not closed properly, "			    "errno = %d", errno);		    break;		}	    } else if (ret == 0) {		break;	    }	    /* continue */	}    }    /* Waiting is done by signal handler... */    close(pw->readfrom);    close(pw->writeto);    pw->state = WORKER_EMPTY;    /* Leave rest as is... */}static void kill_all_workers(void) /* Emergency function, will not check that the children died... */{    int i;    for (i = 0; i < num_busy_workers; ++i) {	kill(busy_workers[i].pid, SIGUSR1);    }    for (i = 0; i < num_free_workers; ++i) {	kill(free_workers[i].pid, SIGUSR1);    }    for (i = 0; i < num_stalled_workers; ++i) {	kill(stalled_workers[i].pid, SIGUSR1);    }}#endif /* !WIN32 */static Worker *pick_worker(void){    Worker tmp;    if (num_free_workers > 0) {	--num_free_workers;	tmp = free_workers[num_free_workers];    } else if (num_stalled_workers > 0) {	/* "restart" the worker... */	--num_stalled_workers;	kill_worker(&(stalled_workers[num_stalled_workers]));	if (create_worker(&tmp,0) < 0) {	    warning("Unable to create worker process, insufficient "		    "resources");	    return NULL;	}    } else {	if (num_busy_workers == max_workers) {	    return NULL;	} 	if (create_worker(&tmp,0) < 0) {	    warning("Unable to create worker process, insufficient "		    "resources");	    return NULL;	}    }    /* tmp contains a worker now, make it busy and put it in the right        array */    tmp.state = WORKER_BUSY;    busy_workers[num_busy_workers] = tmp;    ++num_busy_workers;    return &(busy_workers[num_busy_workers-1]);}static Worker *pick_worker_greedy(AddrByte *domainbuff){    int i;    int ql = 0;    int found = -1;    for (i=0; i < num_busy_workers; ++i) {	if (domaineq(busy_workers[i].domain, domainbuff)) {	    if ((found < 0) || (busy_workers[i].que_size < 				busy_workers[found].que_size)) {		found = i;		ql = busy_workers[i].que_size;	    }	}    }    if (found >= 0) {	return &busy_workers[found];    }     return NULL;}static void restart_worker(Worker *w){    kill_worker(w);    if (create_worker(w,1) < 0) {	fatal("Unable to create worker process, insufficient resources");    }}static void kill_last_picked_worker(void){    kill_worker( &(busy_workers[num_busy_workers-1]));    --num_busy_workers;}/* * Starts a request qued to a specific worker, check_que starts normally queued requests. * We expect a que here... */static void start_que_request(Worker *w) {    QueItem *qi;    SerialType save_serial;    if (!w->que_first || !w->que_size) {	fatal("Expected que'd requests but found none, "	      "internal datastructure corrupted!");    }    qi = w->que_first;    w->que_first = w->que_first->next;    if (!w->que_first) {	w->que_last = NULL;    }    --(w->que_size);    save_serial = get_serial(qi->request);#ifdef WIN32    while (send_mes_to_worker(qi, w) != 0) {	restart_worker(w);    }#else    while (send_request_to_worker(qi->request, 				  qi->req_size, w) != 0) {	restart_worker(w);    }#endif    w->serial = save_serial;    DEBUGF(3,("Did deque serial %d from worker[%ld] specific que, "	      "Que is %sempty",	      get_serial(qi->request), (long) w->pid, 	      (w->que_first) ? "not " : ""));#ifndef WIN32    FREE(qi);#endif}#ifndef WIN32    /* Signal utilities */static RETSIGTYPE (*sys_sigset(int sig, RETSIGTYPE (*func)(int)))(int){    struct sigaction act, oact;    sigemptyset(&act.sa_mask);    act.sa_flags = 0;    act.sa_handler = func;    sigaction(sig, &act, &oact);    return(oact.sa_handler);}static void sys_sigblock(int sig){    sigset_t mask;    sigemptyset(&mask);    sigaddset(&mask, sig);    sigprocmask(SIG_BLOCK, &mask, (sigset_t *)NULL);}static void sys_sigrelease(int sig){    sigset_t mask;    sigemptyset(&mask);    sigaddset(&mask, sig);    sigprocmask(SIG_UNBLOCK, &mask, (sigset_t *)NULL);}/* Child signal handler */void reap_children(int ignored){    int res;    sys_sigblock(SIGCHLD);    for (;;) {	while ((res = waitpid((pid_t)-1, NULL, WNOHANG)) > 0)	    ;	if (!(res < 0 && errno == EAGAIN)) {	    DEBUGF(4,("reap_children: res = %d, errno = %d.",res,errno));	    break;	}    }    sys_sigrelease(SIGCHLD);}static void init_signals(void){    sys_sigset(SIGCHLD,&reap_children); /* SIG_IGN would give same result 				       on most (?) platforms. */    sys_sigset(SIGPIPE, SIG_IGN);}#endifstatic void stall_worker(int ndx){    --num_busy_workers;    stalled_workers[num_stalled_workers] = busy_workers[ndx];    stalled_workers[num_stalled_workers].state = WORKER_STALLED;    busy_workers[ndx] = busy_workers[num_busy_workers];    DEBUGF(3, ("Stalled worker[%ld]",	   (long) stalled_workers[num_stalled_workers].pid));    ++num_stalled_workers;}/* * Main loop message passing */#ifndef WIN32static int read_request(AddrByte **buff, size_t *buff_size){    int siz;    int r;    if ((r = READ_PACKET_BYTES(0,&siz)) != PACKET_BYTES) {	if (r == 0) {	    return 0;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?