comm_select.c

来自「-」· C语言 代码 · 共 883 行 · 第 1/2 页

C
883
字号
	    FD_SET(fd, &write_mask);	    if (fd > maxfd)		maxfd = fd;	}    }    if (maxfd++ == 0)	return incame;#if !ALARM_UPDATES_TIME    getCurrentTime();#endif    Counter.syscalls.selects++;    if (select(maxfd, &read_mask, &write_mask, NULL, &zero_tv) < 1)	return incame;    for (i = 0; i < nfds; i++) {	fd = fds[i];	if (FD_ISSET(fd, &read_mask)) {	    if ((hdl = fd_table[fd].read_handler) != NULL) {		fd_table[fd].read_handler = NULL;		commUpdateReadBits(fd, NULL);		hdl(fd, &incame);	    } else {		debug(5, 1) ("comm_select_incoming: NULL read handler\n");	    }	}	if (FD_ISSET(fd, &write_mask)) {	    if ((hdl = fd_table[fd].write_handler) != NULL) {		fd_table[fd].write_handler = NULL;		commUpdateWriteBits(fd, NULL);		hdl(fd, &incame);	    } else {		debug(5, 1) ("comm_select_incoming: NULL write handler\n");	    }	}    }    return incame;}static voidcomm_select_icp_incoming(void){    int nfds = 0;    int fds[2];    int nevents;    icp_io_events = 0;    if (theInIcpConnection >= 0)	fds[nfds++] = theInIcpConnection;    if (theInIcpConnection != theOutIcpConnection)	if (theOutIcpConnection >= 0)	    fds[nfds++] = theOutIcpConnection;    if (nfds == 0)	return;    nevents = comm_check_incoming_select_handlers(nfds, fds);    incoming_icp_interval += Config.comm_incoming.icp_average - nevents;    if (incoming_icp_interval < 0)	incoming_icp_interval = 0;    if (incoming_icp_interval > MAX_INCOMING_INTERVAL)	incoming_icp_interval = MAX_INCOMING_INTERVAL;    if (nevents > INCOMING_ICP_MAX)	nevents = INCOMING_ICP_MAX;    statHistCount(&Counter.comm_icp_incoming, nevents);}static voidcomm_select_http_incoming(void){    int nfds = 0;    int fds[MAXHTTPPORTS];    int j;    int nevents;    http_io_events = 0;    for (j = 0; j < NHttpSockets; j++) {	if (HttpSockets[j] < 0)	    continue;	if (commDeferRead(HttpSockets[j]))	    continue;	fds[nfds++] = HttpSockets[j];    }    nevents = comm_check_incoming_select_handlers(nfds, fds);    incoming_http_interval += Config.comm_incoming.http_average - nevents;    if (incoming_http_interval < 0)	incoming_http_interval = 0;    if (incoming_http_interval > MAX_INCOMING_INTERVAL)	incoming_http_interval = MAX_INCOMING_INTERVAL;    if (nevents > INCOMING_HTTP_MAX)	nevents = INCOMING_HTTP_MAX;    statHistCount(&Counter.comm_http_incoming, nevents);}#define DEBUG_FDBITS 0/* Select on all sockets; call handlers for those that are ready. */intcomm_select(int msec){    fd_set readfds;    fd_set writefds;    PF *hdl = NULL;    int fd;    int maxfd;    int num;    int callicp = 0, callhttp = 0;    int maxindex;    int k;    int j;#if DEBUG_FDBITS    int i;#endif    fd_mask *fdsp;    fd_mask tmask;    static time_t last_timeout = 0;    struct timeval poll_time;    double timeout = current_dtime + (msec / 1000.0);    fde *F;    do {#if !ALARM_UPDATES_TIME	getCurrentTime();#endif#if USE_ASYNC_IO	aioCheckCallbacks();#endif	if (commCheckICPIncoming)	    comm_select_icp_incoming();	if (commCheckHTTPIncoming)	    comm_select_http_incoming();	callicp = callhttp = 0;	maxfd = Biggest_FD + 1;	xmemcpy(&readfds, &global_readfds,	    howmany(maxfd, FD_MASK_BITS) * FD_MASK_BYTES);	xmemcpy(&writefds, &global_writefds,	    howmany(maxfd, FD_MASK_BITS) * FD_MASK_BYTES);	/* remove stalled FDs */	maxindex = howmany(maxfd, FD_MASK_BITS);	fdsp = (fd_mask *) & readfds;	for (j = 0; j < maxindex; j++) {	    if ((tmask = fdsp[j]) == 0)		continue;	/* no bits here */	    for (k = 0; k < FD_MASK_BITS; k++) {		if (!EBIT_TEST(tmask, k))		    continue;		/* Found a set bit */		fd = (j * FD_MASK_BITS) + k;		if (commDeferRead(fd))		    FD_CLR(fd, &readfds);	    }	}#if DEBUG_FDBITS	for (i = 0; i < maxfd; i++) {	    /* Check each open socket for a handler. */	    if (fd_table[i].read_handler && !commDeferRead(i)) {		assert(FD_ISSET(i, &readfds));	    }	    if (fd_table[i].write_handler) {		assert(FD_ISSET(i, &writefds));	    }	}#endif	if (nreadfds + nwritefds == 0) {	    assert(shutting_down);	    return COMM_SHUTDOWN;	}	if (msec > MAX_POLL_TIME)	    msec = MAX_POLL_TIME;#ifdef _SQUID_OS2_	if (msec < 0)	    msec = MAX_POLL_TIME;#endif	for (;;) {	    poll_time.tv_sec = msec / 1000;	    poll_time.tv_usec = (msec % 1000) * 1000;	    Counter.syscalls.selects++;	    num = select(maxfd, &readfds, &writefds, NULL, &poll_time);	    Counter.select_loops++;	    if (num >= 0)		break;	    if (ignoreErrno(errno))		break;	    debug(50, 0) ("comm_select: select failure: %s\n",		xstrerror());	    examine_select(&readfds, &writefds);	    return COMM_ERROR;	    /* NOTREACHED */	}	if (num < 0)	    continue;	debug(5, num ? 5 : 8) ("comm_select: %d FDs ready at %d\n",	    num, (int) squid_curtime);	statHistCount(&Counter.select_fds_hist, num);	/* Check lifetime and timeout handlers ONCE each second.	 * Replaces brain-dead check every time through the loop! */	if (squid_curtime > last_timeout) {	    last_timeout = squid_curtime;	    checkTimeouts();	}	if (num == 0)	    continue;	/* Scan return fd masks for ready descriptors */	fdsp = (fd_mask *) & readfds;	maxindex = howmany(maxfd, FD_MASK_BITS);	for (j = 0; j < maxindex; j++) {	    if ((tmask = fdsp[j]) == 0)		continue;	/* no bits here */	    for (k = 0; k < FD_MASK_BITS; k++) {		if (!EBIT_TEST(tmask, k))		    continue;		/* Found a set bit */		fd = (j * FD_MASK_BITS) + k;#if DEBUG_FDBITS		debug(5, 9) ("FD %d bit set for reading\n", fd);		assert(FD_ISSET(fd, &readfds));#endif		if (fdIsIcp(fd)) {		    callicp = 1;		    continue;		}		if (fdIsHttp(fd)) {		    callhttp = 1;		    continue;		}		F = &fd_table[fd];		debug(5, 6) ("comm_select: FD %d ready for reading\n", fd);		if (F->read_handler) {		    hdl = F->read_handler;		    F->read_handler = NULL;		    commUpdateReadBits(fd, NULL);		    hdl(fd, F->read_data);		    Counter.select_fds++;		}		if (commCheckICPIncoming)		    comm_select_icp_incoming();		if (commCheckHTTPIncoming)		    comm_select_http_incoming();		EBIT_CLR(tmask, k);	/* this bit is done */		if (tmask == 0)		    break;	/* and no more bits left */	    }	}	fdsp = (fd_mask *) & writefds;	for (j = 0; j < maxindex; j++) {	    if ((tmask = fdsp[j]) == 0)		continue;	/* no bits here */	    for (k = 0; k < FD_MASK_BITS; k++) {		if (!EBIT_TEST(tmask, k))		    continue;		/* Found a set bit */		fd = (j * FD_MASK_BITS) + k;#if DEBUG_FDBITS		debug(5, 9) ("FD %d bit set for writing\n", fd);		assert(FD_ISSET(fd, &writefds));#endif		if (fdIsIcp(fd)) {		    callicp = 1;		    continue;		}		if (fdIsHttp(fd)) {		    callhttp = 1;		    continue;		}		F = &fd_table[fd];		debug(5, 5) ("comm_select: FD %d ready for writing\n", fd);		if (F->write_handler) {		    hdl = F->write_handler;		    F->write_handler = NULL;		    commUpdateWriteBits(fd, NULL);		    hdl(fd, F->write_data);		    Counter.select_fds++;		}		if (commCheckICPIncoming)		    comm_select_icp_incoming();		if (commCheckHTTPIncoming)		    comm_select_http_incoming();		EBIT_CLR(tmask, k);	/* this bit is done */		if (tmask == 0)		    break;	/* and no more bits left */	    }	}	if (callicp)	    comm_select_icp_incoming();	if (callhttp)	    comm_select_http_incoming();	return COMM_OK;    } while (timeout > current_dtime);    debug(5, 8) ("comm_select: time out: %d\n", (int) squid_curtime);    return COMM_TIMEOUT;}#endifvoidcomm_select_init(void){    zero_tv.tv_sec = 0;    zero_tv.tv_usec = 0;    cachemgrRegister("comm_incoming",	"comm_incoming() stats",	commIncomingStats, 0, 1);    FD_ZERO(&global_readfds);    FD_ZERO(&global_writefds);    nreadfds = nwritefds = 0;}#if !HAVE_POLL/* * examine_select - debug routine. * * I spend the day chasing this core dump that occurs when both the client * and the server side of a cache fetch simultaneoulsy abort the * connection.  While I haven't really studied the code to figure out how * it happens, the snippet below may prevent the cache from exitting: *  * Call this from where the select loop fails. */static intexamine_select(fd_set * readfds, fd_set * writefds){    int fd = 0;    fd_set read_x;    fd_set write_x;    struct timeval tv;    close_handler *ch = NULL;    fde *F = NULL;    struct stat sb;    debug(5, 0) ("examine_select: Examining open file descriptors...\n");    for (fd = 0; fd < Squid_MaxFD; fd++) {	FD_ZERO(&read_x);	FD_ZERO(&write_x);	tv.tv_sec = tv.tv_usec = 0;	if (FD_ISSET(fd, readfds))	    FD_SET(fd, &read_x);	else if (FD_ISSET(fd, writefds))	    FD_SET(fd, &write_x);	else	    continue;	Counter.syscalls.selects++;	errno = 0;	if (!fstat(fd, &sb)) {	    debug(5, 5) ("FD %d is valid.\n", fd);	    continue;	}	F = &fd_table[fd];	debug(5, 0) ("FD %d: %s\n", fd, xstrerror());	debug(5, 0) ("WARNING: FD %d has handlers, but it's invalid.\n", fd);	debug(5, 0) ("FD %d is a %s called '%s'\n",	    fd,	    fdTypeStr[F->type],	    F->desc);	debug(5, 0) ("tmout:%p read:%p write:%p\n",	    F->timeout_handler,	    F->read_handler,	    F->write_handler);	for (ch = F->close_handler; ch; ch = ch->next)	    debug(5, 0) (" close handler: %p\n", ch->handler);	if (F->close_handler) {	    commCallCloseHandlers(fd);	} else if (F->timeout_handler) {	    debug(5, 0) ("examine_select: Calling Timeout Handler\n");	    F->timeout_handler(fd, F->timeout_data);	}	F->close_handler = NULL;	F->timeout_handler = NULL;	F->read_handler = NULL;	F->write_handler = NULL;	FD_CLR(fd, readfds);	FD_CLR(fd, writefds);    }    return 0;}#endifstatic voidcheckTimeouts(void){    int fd;    fde *F = NULL;    PF *callback;    for (fd = 0; fd <= Biggest_FD; fd++) {	F = &fd_table[fd];	if (!F->flags.open)	    continue;	if (F->timeout == 0)	    continue;	if (F->timeout > squid_curtime)	    continue;	debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd);	if (F->timeout_handler) {	    debug(5, 5) ("checkTimeouts: FD %d: Call timeout handler\n", fd);	    callback = F->timeout_handler;	    F->timeout_handler = NULL;	    callback(fd, F->timeout_data);	} else {	    debug(5, 5) ("checkTimeouts: FD %d: Forcing comm_close()\n", fd);	    comm_close(fd);	}    }}static voidcommIncomingStats(StoreEntry * sentry){    StatCounters *f = &Counter;    storeAppendPrintf(sentry, "Current incoming_icp_interval: %d\n",	incoming_icp_interval >> INCOMING_FACTOR);    storeAppendPrintf(sentry, "Current incoming_http_interval: %d\n",	incoming_http_interval >> INCOMING_FACTOR);    storeAppendPrintf(sentry, "\n");    storeAppendPrintf(sentry, "Histogram of events per incoming socket type\n");#ifdef HAVE_POLL    storeAppendPrintf(sentry, "ICP Messages handled per comm_poll_icp_incoming() call:\n");#else    storeAppendPrintf(sentry, "ICP Messages handled per comm_select_icp_incoming() call:\n");#endif    statHistDump(&f->comm_icp_incoming, sentry, statHistIntDumper);#ifdef HAVE_POLL    storeAppendPrintf(sentry, "HTTP Messages handled per comm_poll_http_incoming() call:\n");#else    storeAppendPrintf(sentry, "HTTP Messages handled per comm_select_http_incoming() call:\n");#endif    statHistDump(&f->comm_http_incoming, sentry, statHistIntDumper);}voidcommUpdateReadBits(int fd, PF * handler){    if (handler && !FD_ISSET(fd, &global_readfds)) {	FD_SET(fd, &global_readfds);	nreadfds++;    } else if (!handler && FD_ISSET(fd, &global_readfds)) {	FD_CLR(fd, &global_readfds);	nreadfds--;    }}voidcommUpdateWriteBits(int fd, PF * handler){    if (handler && !FD_ISSET(fd, &global_writefds)) {	FD_SET(fd, &global_writefds);	nwritefds++;    } else if (!handler && FD_ISSET(fd, &global_writefds)) {	FD_CLR(fd, &global_writefds);	nwritefds--;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?