comm_select.c
来自「-」· C语言 代码 · 共 883 行 · 第 1/2 页
C
883 行
FD_SET(fd, &write_mask); if (fd > maxfd) maxfd = fd; } } if (maxfd++ == 0) return incame;#if !ALARM_UPDATES_TIME getCurrentTime();#endif Counter.syscalls.selects++; if (select(maxfd, &read_mask, &write_mask, NULL, &zero_tv) < 1) return incame; for (i = 0; i < nfds; i++) { fd = fds[i]; if (FD_ISSET(fd, &read_mask)) { if ((hdl = fd_table[fd].read_handler) != NULL) { fd_table[fd].read_handler = NULL; commUpdateReadBits(fd, NULL); hdl(fd, &incame); } else { debug(5, 1) ("comm_select_incoming: NULL read handler\n"); } } if (FD_ISSET(fd, &write_mask)) { if ((hdl = fd_table[fd].write_handler) != NULL) { fd_table[fd].write_handler = NULL; commUpdateWriteBits(fd, NULL); hdl(fd, &incame); } else { debug(5, 1) ("comm_select_incoming: NULL write handler\n"); } } } return incame;}static voidcomm_select_icp_incoming(void){ int nfds = 0; int fds[2]; int nevents; icp_io_events = 0; if (theInIcpConnection >= 0) fds[nfds++] = theInIcpConnection; if (theInIcpConnection != theOutIcpConnection) if (theOutIcpConnection >= 0) fds[nfds++] = theOutIcpConnection; if (nfds == 0) return; nevents = comm_check_incoming_select_handlers(nfds, fds); incoming_icp_interval += Config.comm_incoming.icp_average - nevents; if (incoming_icp_interval < 0) incoming_icp_interval = 0; if (incoming_icp_interval > MAX_INCOMING_INTERVAL) incoming_icp_interval = MAX_INCOMING_INTERVAL; if (nevents > INCOMING_ICP_MAX) nevents = INCOMING_ICP_MAX; statHistCount(&Counter.comm_icp_incoming, nevents);}static voidcomm_select_http_incoming(void){ int nfds = 0; int fds[MAXHTTPPORTS]; int j; int nevents; http_io_events = 0; for (j = 0; j < NHttpSockets; j++) { if (HttpSockets[j] < 0) continue; if (commDeferRead(HttpSockets[j])) continue; fds[nfds++] = HttpSockets[j]; } nevents = comm_check_incoming_select_handlers(nfds, fds); incoming_http_interval += Config.comm_incoming.http_average - nevents; if (incoming_http_interval < 0) incoming_http_interval = 0; if (incoming_http_interval > MAX_INCOMING_INTERVAL) incoming_http_interval = MAX_INCOMING_INTERVAL; if (nevents > INCOMING_HTTP_MAX) nevents = INCOMING_HTTP_MAX; statHistCount(&Counter.comm_http_incoming, nevents);}#define DEBUG_FDBITS 0/* Select on all sockets; call handlers for those that are ready. */intcomm_select(int msec){ fd_set readfds; fd_set writefds; PF *hdl = NULL; int fd; int maxfd; int num; int callicp = 0, callhttp = 0; int maxindex; int k; int j;#if DEBUG_FDBITS int i;#endif fd_mask *fdsp; fd_mask tmask; static time_t last_timeout = 0; struct timeval poll_time; double timeout = current_dtime + (msec / 1000.0); fde *F; do {#if !ALARM_UPDATES_TIME getCurrentTime();#endif#if USE_ASYNC_IO aioCheckCallbacks();#endif if (commCheckICPIncoming) comm_select_icp_incoming(); if (commCheckHTTPIncoming) comm_select_http_incoming(); callicp = callhttp = 0; maxfd = Biggest_FD + 1; xmemcpy(&readfds, &global_readfds, howmany(maxfd, FD_MASK_BITS) * FD_MASK_BYTES); xmemcpy(&writefds, &global_writefds, howmany(maxfd, FD_MASK_BITS) * FD_MASK_BYTES); /* remove stalled FDs */ maxindex = howmany(maxfd, FD_MASK_BITS); fdsp = (fd_mask *) & readfds; for (j = 0; j < maxindex; j++) { if ((tmask = fdsp[j]) == 0) continue; /* no bits here */ for (k = 0; k < FD_MASK_BITS; k++) { if (!EBIT_TEST(tmask, k)) continue; /* Found a set bit */ fd = (j * FD_MASK_BITS) + k; if (commDeferRead(fd)) FD_CLR(fd, &readfds); } }#if DEBUG_FDBITS for (i = 0; i < maxfd; i++) { /* Check each open socket for a handler. */ if (fd_table[i].read_handler && !commDeferRead(i)) { assert(FD_ISSET(i, &readfds)); } if (fd_table[i].write_handler) { assert(FD_ISSET(i, &writefds)); } }#endif if (nreadfds + nwritefds == 0) { assert(shutting_down); return COMM_SHUTDOWN; } if (msec > MAX_POLL_TIME) msec = MAX_POLL_TIME;#ifdef _SQUID_OS2_ if (msec < 0) msec = MAX_POLL_TIME;#endif for (;;) { poll_time.tv_sec = msec / 1000; poll_time.tv_usec = (msec % 1000) * 1000; Counter.syscalls.selects++; num = select(maxfd, &readfds, &writefds, NULL, &poll_time); Counter.select_loops++; if (num >= 0) break; if (ignoreErrno(errno)) break; debug(50, 0) ("comm_select: select failure: %s\n", xstrerror()); examine_select(&readfds, &writefds); return COMM_ERROR; /* NOTREACHED */ } if (num < 0) continue; debug(5, num ? 5 : 8) ("comm_select: %d FDs ready at %d\n", num, (int) squid_curtime); statHistCount(&Counter.select_fds_hist, num); /* Check lifetime and timeout handlers ONCE each second. * Replaces brain-dead check every time through the loop! */ if (squid_curtime > last_timeout) { last_timeout = squid_curtime; checkTimeouts(); } if (num == 0) continue; /* Scan return fd masks for ready descriptors */ fdsp = (fd_mask *) & readfds; maxindex = howmany(maxfd, FD_MASK_BITS); for (j = 0; j < maxindex; j++) { if ((tmask = fdsp[j]) == 0) continue; /* no bits here */ for (k = 0; k < FD_MASK_BITS; k++) { if (!EBIT_TEST(tmask, k)) continue; /* Found a set bit */ fd = (j * FD_MASK_BITS) + k;#if DEBUG_FDBITS debug(5, 9) ("FD %d bit set for reading\n", fd); assert(FD_ISSET(fd, &readfds));#endif if (fdIsIcp(fd)) { callicp = 1; continue; } if (fdIsHttp(fd)) { callhttp = 1; continue; } F = &fd_table[fd]; debug(5, 6) ("comm_select: FD %d ready for reading\n", fd); if (F->read_handler) { hdl = F->read_handler; F->read_handler = NULL; commUpdateReadBits(fd, NULL); hdl(fd, F->read_data); Counter.select_fds++; } if (commCheckICPIncoming) comm_select_icp_incoming(); if (commCheckHTTPIncoming) comm_select_http_incoming(); EBIT_CLR(tmask, k); /* this bit is done */ if (tmask == 0) break; /* and no more bits left */ } } fdsp = (fd_mask *) & writefds; for (j = 0; j < maxindex; j++) { if ((tmask = fdsp[j]) == 0) continue; /* no bits here */ for (k = 0; k < FD_MASK_BITS; k++) { if (!EBIT_TEST(tmask, k)) continue; /* Found a set bit */ fd = (j * FD_MASK_BITS) + k;#if DEBUG_FDBITS debug(5, 9) ("FD %d bit set for writing\n", fd); assert(FD_ISSET(fd, &writefds));#endif if (fdIsIcp(fd)) { callicp = 1; continue; } if (fdIsHttp(fd)) { callhttp = 1; continue; } F = &fd_table[fd]; debug(5, 5) ("comm_select: FD %d ready for writing\n", fd); if (F->write_handler) { hdl = F->write_handler; F->write_handler = NULL; commUpdateWriteBits(fd, NULL); hdl(fd, F->write_data); Counter.select_fds++; } if (commCheckICPIncoming) comm_select_icp_incoming(); if (commCheckHTTPIncoming) comm_select_http_incoming(); EBIT_CLR(tmask, k); /* this bit is done */ if (tmask == 0) break; /* and no more bits left */ } } if (callicp) comm_select_icp_incoming(); if (callhttp) comm_select_http_incoming(); return COMM_OK; } while (timeout > current_dtime); debug(5, 8) ("comm_select: time out: %d\n", (int) squid_curtime); return COMM_TIMEOUT;}#endifvoidcomm_select_init(void){ zero_tv.tv_sec = 0; zero_tv.tv_usec = 0; cachemgrRegister("comm_incoming", "comm_incoming() stats", commIncomingStats, 0, 1); FD_ZERO(&global_readfds); FD_ZERO(&global_writefds); nreadfds = nwritefds = 0;}#if !HAVE_POLL/* * examine_select - debug routine. * * I spend the day chasing this core dump that occurs when both the client * and the server side of a cache fetch simultaneoulsy abort the * connection. While I haven't really studied the code to figure out how * it happens, the snippet below may prevent the cache from exitting: * * Call this from where the select loop fails. */static intexamine_select(fd_set * readfds, fd_set * writefds){ int fd = 0; fd_set read_x; fd_set write_x; struct timeval tv; close_handler *ch = NULL; fde *F = NULL; struct stat sb; debug(5, 0) ("examine_select: Examining open file descriptors...\n"); for (fd = 0; fd < Squid_MaxFD; fd++) { FD_ZERO(&read_x); FD_ZERO(&write_x); tv.tv_sec = tv.tv_usec = 0; if (FD_ISSET(fd, readfds)) FD_SET(fd, &read_x); else if (FD_ISSET(fd, writefds)) FD_SET(fd, &write_x); else continue; Counter.syscalls.selects++; errno = 0; if (!fstat(fd, &sb)) { debug(5, 5) ("FD %d is valid.\n", fd); continue; } F = &fd_table[fd]; debug(5, 0) ("FD %d: %s\n", fd, xstrerror()); debug(5, 0) ("WARNING: FD %d has handlers, but it's invalid.\n", fd); debug(5, 0) ("FD %d is a %s called '%s'\n", fd, fdTypeStr[F->type], F->desc); debug(5, 0) ("tmout:%p read:%p write:%p\n", F->timeout_handler, F->read_handler, F->write_handler); for (ch = F->close_handler; ch; ch = ch->next) debug(5, 0) (" close handler: %p\n", ch->handler); if (F->close_handler) { commCallCloseHandlers(fd); } else if (F->timeout_handler) { debug(5, 0) ("examine_select: Calling Timeout Handler\n"); F->timeout_handler(fd, F->timeout_data); } F->close_handler = NULL; F->timeout_handler = NULL; F->read_handler = NULL; F->write_handler = NULL; FD_CLR(fd, readfds); FD_CLR(fd, writefds); } return 0;}#endifstatic voidcheckTimeouts(void){ int fd; fde *F = NULL; PF *callback; for (fd = 0; fd <= Biggest_FD; fd++) { F = &fd_table[fd]; if (!F->flags.open) continue; if (F->timeout == 0) continue; if (F->timeout > squid_curtime) continue; debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd); if (F->timeout_handler) { debug(5, 5) ("checkTimeouts: FD %d: Call timeout handler\n", fd); callback = F->timeout_handler; F->timeout_handler = NULL; callback(fd, F->timeout_data); } else { debug(5, 5) ("checkTimeouts: FD %d: Forcing comm_close()\n", fd); comm_close(fd); } }}static voidcommIncomingStats(StoreEntry * sentry){ StatCounters *f = &Counter; storeAppendPrintf(sentry, "Current incoming_icp_interval: %d\n", incoming_icp_interval >> INCOMING_FACTOR); storeAppendPrintf(sentry, "Current incoming_http_interval: %d\n", incoming_http_interval >> INCOMING_FACTOR); storeAppendPrintf(sentry, "\n"); storeAppendPrintf(sentry, "Histogram of events per incoming socket type\n");#ifdef HAVE_POLL storeAppendPrintf(sentry, "ICP Messages handled per comm_poll_icp_incoming() call:\n");#else storeAppendPrintf(sentry, "ICP Messages handled per comm_select_icp_incoming() call:\n");#endif statHistDump(&f->comm_icp_incoming, sentry, statHistIntDumper);#ifdef HAVE_POLL storeAppendPrintf(sentry, "HTTP Messages handled per comm_poll_http_incoming() call:\n");#else storeAppendPrintf(sentry, "HTTP Messages handled per comm_select_http_incoming() call:\n");#endif statHistDump(&f->comm_http_incoming, sentry, statHistIntDumper);}voidcommUpdateReadBits(int fd, PF * handler){ if (handler && !FD_ISSET(fd, &global_readfds)) { FD_SET(fd, &global_readfds); nreadfds++; } else if (!handler && FD_ISSET(fd, &global_readfds)) { FD_CLR(fd, &global_readfds); nreadfds--; }}voidcommUpdateWriteBits(int fd, PF * handler){ if (handler && !FD_ISSET(fd, &global_writefds)) { FD_SET(fd, &global_writefds); nwritefds++; } else if (!handler && FD_ISSET(fd, &global_writefds)) { FD_CLR(fd, &global_writefds); nwritefds--; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?