📄 chan.c
字号:
** Mark a channel as an active writer. Don't reset the Out->Left field** since we could have buffered I/O already in there.*/voidWCHANadd(cp) register CHANNEL *cp;{ if (cp->Out.Left > 0) { FD_SET(cp->fd, &WCHANmask); if (cp->fd > CHANlastfd) CHANlastfd = cp->fd; }}/*** Remove a channel from the set of writers.*/voidWCHANremove(cp) register CHANNEL *cp;{ if (FD_ISSET(cp->fd, &WCHANmask)) { FD_CLR(cp->fd, &WCHANmask); if (cp->Out.Left <= 0) { /* No data left -- reset used so we don't grow the buffer. */ cp->Out.Used = 0; cp->Out.Left = 0; } if (cp->fd == CHANlastfd) { /* This was the highest descriptor, get a new highest. */ while (!FD_ISSET(CHANlastfd, &RCHANmask) && !FD_ISSET(CHANlastfd, &WCHANmask) && CHANlastfd > 1) CHANlastfd--; } }}/*** Set a channel to start off with the contents of an existing channel.*/voidWCHANsetfrombuffer(cp, bp) CHANNEL *cp; BUFFER *bp;{ WCHANset(cp, &bp->Data[bp->Used], bp->Left);}/*** Read in text data, return the amount we read.*/intCHANreadtext(cp) register CHANNEL *cp;{ register int i; register BUFFER *bp; char *p; p = CHANname(cp); /* Read in whatever is there. */ bp = &cp->In; bp->Left = bp->Size - bp->Used; i = read(cp->fd, &bp->Data[bp->Used], bp->Left - 1); if (i < 0) { syslog(L_ERROR, "%s cant read %m", p); return -1; } if (i == 0) { syslog(L_NOTICE, "%s readclose", p); CHANclose(cp, p); return 0; } /* Update values, grow buffer if we're getting close. */ bp->Used += i; bp->Left -= i; if (bp->Left <= LOW_WATER) { i = GROW_AMOUNT(bp->Size); bp->Size += i; bp->Left += i; RENEW(bp->Data, char, bp->Size); } return i;}/*** If I/O backs up a lot, we can get EMSGSIZE on some systems. If that** happens we want to do the I/O in chunks. We assume stdio's BUFSIZ is** a good chunk value.*/STATIC intlargewrite(fd, p, length) register int fd; register char *p; register int length;{ register int i; register char *save; do { /* Try the standard case -- write it all. */ i = write(fd, (POINTER)p, (SIZE_T)length); if (i > 0 || (i < 0 && errno != EMSGSIZE && errno != EINTR)) return i; } while (i < 0 && errno == EINTR); /* Write it in pieces. */ for (save = p, i = 0; length; p += i, length -= i) { i = write(fd, (POINTER)p, (SIZE_T)(length > BUFSIZ ? BUFSIZ : length)); if (i <= 0) break; } /* Return error, or partial results if we got something. */ return p == save ? i : p - save;}/*** Try to flush out the buffer. Use this only on file channels!*/BOOLWCHANflush(cp) register CHANNEL *cp;{ register BUFFER *bp; register int i; /* If nothing in there, or nothing left, nothing to do. */ bp = &cp->Out; if (bp->Left == 0) return TRUE; /* Write it. */ while (bp->Left > 0) { i = write(cp->fd, (POINTER)&bp->Data[bp->Used], (SIZE_T)bp->Left); if (i < 0) { syslog(L_ERROR, "%s cant flush count %d %m", CHANname(cp), bp->Left); return FALSE; } if (i == 0) { syslog(L_ERROR, "%s cant flush count %d", CHANname(cp), bp->Left); return FALSE; } bp->Left -= i; bp->Used += i; if (bp->Left <= 0) WCHANremove(cp); } return TRUE;}/*** Wakeup routine called after a write channel was put to sleep.*/STATIC FUNCTYPECHANwakeup(cp) CHANNEL *cp;{ syslog(L_NOTICE, "%s wakeup", CHANname(cp)); WCHANadd(cp);}/*** Attempting to write would block; stop output or give up.*/STATIC voidCHANwritesleep(cp, p) register CHANNEL *cp; char *p;{ int i; if ((i = ++(cp->BlockedWrites)) > BAD_IO_COUNT) switch (cp->Type) { default: break; case CTnntp: case CTfile: case CTexploder: case CTprocess: syslog(L_ERROR, "%s blocked closing", p); SITEchanclose(cp); CHANclose(cp, p); return; } i *= BLOCK_BACKOFF; syslog(L_ERROR, "%s blocked sleeping %d", p, i); SCHANadd(cp, (time_t)(Now.time + i), (POINTER)NULL, CHANwakeup, (POINTER)NULL);}#if defined(INND_FIND_BAD_FDS)/*** We got an unknown error in select. Find out the culprit.** Not really ready for production use yet, and it's expensive, too.*/STATIC voidCHANdiagnose(){ FDSET Test; int i; struct timeval t; FD_ZERO(&Test); for (i = CHANlastfd; i >= 0; i--) { if (FD_ISSET(i, &RCHANmask)) { FD_SET(i, &Test); t.tv_sec = 0; t.tv_usec = 0; if (select(i + 1, &Test, (FDSET *)NULL, (FDSET *)NULL, &t) < 0 && errno != EINTR) { syslog(L_ERROR, "%s Bad Read File %d", LogName, i); FD_CLR(i, &RCHANmask); /* Probably do something about the file descriptor here; call * CHANclose on it? */ } FD_CLR(i, &Test); } if (FD_ISSET(i, &WCHANmask)) { FD_SET(i, &Test); t.tv_sec = 0; t.tv_usec = 0; if (select(i + 1, (FDSET *)NULL, &Test, (FDSET *)NULL, &t) < 0 && errno != EINTR) { syslog(L_ERROR, "%s Bad Write File %d", LogName, i); FD_CLR(i, &WCHANmask); /* Probably do something about the file descriptor here; call * CHANclose on it? */ } FD_CLR(i, &Test); } }}#endif /* defined(INND_FIND_BAD_FDS) *//*** Main I/O loop. Wait for data, call the channel's handler when there is** something to read or when the queued write is finished. In order to** be fair (i.e., don't always give descriptor n priority over n+1), we** remember where we last had something and pick up from there.*/voidCHANreadloop(){ static char EXITING[] = "INND exiting because of signal\n"; static int fd; register int i; register int startpoint; register int count; register int lastfd; int oerrno; register CHANNEL *cp; register BUFFER *bp; FDSET MyRead; FDSET MyWrite; struct timeval MyTime; long silence; char *p; for ( ; ; ) { /* See if any processes died. */ PROCscan(); /* Wait for data, note the time. */ MyRead = RCHANmask; MyWrite = WCHANmask; MyTime = TimeOut; count = select(CHANlastfd + 1, &MyRead, &MyWrite, (FDSET *)NULL, &MyTime); if (GotTerminate) { (void)write(2, EXITING, STRLEN(EXITING)); CleanupAndExit(0, "received signal"); } if (count < 0) { if (errno != EINTR) { syslog(L_ERROR, "%s cant select %m", LogName);#if defined(INND_FIND_BAD_FDS) CHANdiagnose();#endif /* defined(INND_FIND_BAD_FDS) */ } continue; } if (count == 0) { /* No channels active, so flush and skip if nobody's * sleeping. */ if (Mode == OMrunning) ICDwrite(); if (SCHANcount == 0) continue; } /* Update the "reasonably accurate" time. */ if (GetTimeInfo(&Now) < 0) syslog(L_ERROR, "%s cant gettimeinfo %m", LogName); /* Try the control channel first. */ if (FD_ISSET(CHANccfd, &RCHANmask) && FD_ISSET(CHANccfd, &MyRead)) { count--; (*CHANcc->Reader)(CHANcc); FD_CLR(CHANccfd, &MyRead); } /* Loop through all active channels. Somebody could have closed * closed a channel so we double-check the global mask before * looking at what select returned. The code here is written so * that a channel could be reading and writing and sleeping at the * same time, even though that's not possible. (Just as well, * since in SysVr4 the count would be wrong.) */ lastfd = CHANlastfd; if (lastfd < CHANlastsleepfd) lastfd = CHANlastsleepfd; if (fd > lastfd) fd = 0; startpoint = fd; do { cp = &CHANtable[fd]; if (cp->Type == CTfree) goto Next; /* Anything to read? */ if (FD_ISSET(fd, &RCHANmask) && FD_ISSET(fd, &MyRead)) { count--; cp->LastActive = Now.time; (*cp->Reader)(cp); } /* Possibly recheck for dead children so we don't get SIGPIPE * on readerless channels. */ if (PROCneedscan) PROCscan(); /* Ready to write? */ if (FD_ISSET(fd, &WCHANmask) && FD_ISSET(fd, &MyWrite)) { count--; bp = &cp->Out; if (bp->Left) { cp->LastActive = Now.time; i = largewrite(fd, &bp->Data[bp->Used], bp->Left); if (i <= 0) { oerrno = errno; p = CHANname(cp); errno = oerrno; if (i < 0) syslog(L_ERROR, "%s cant write %m", p); else syslog(L_ERROR, "%s cant write", p); cp->BadWrites++; if (i < 0 && oerrno == EPIPE) { SITEchanclose(cp); CHANclose(cp, p); } else if (i < 0 && oerrno == EWOULDBLOCK) { WCHANremove(cp); CHANwritesleep(cp, p); } else if (cp->BadWrites >= BAD_IO_COUNT) { syslog(L_ERROR, "%s sleeping", p); WCHANremove(cp); SCHANadd(cp, (time_t)(Now.time + PAUSE_RETRY_TIME), (POINTER)NULL, CHANwakeup, (POINTER)NULL); } } else { cp->BadWrites = 0; cp->BlockedWrites = 0; bp->Left -= i; bp->Used += i; if (bp->Left <= 0) { WCHANremove(cp); (*cp->WriteDone)(cp); } } } } /* Coming off a sleep? */ if (FD_ISSET(fd, &SCHANmask) && cp->Waketime <= Now.time) { cp->LastActive = Now.time; SCHANremove(cp); (*cp->Waker)(cp); } /* Has this channel been inactive very long? */ if (cp->Type == CTnntp && cp->LastActive + cp->NextLog < Now.time) { p = CHANname(cp); silence = Now.time - cp->LastActive; cp->NextLog += CHANNEL_INACTIVE_TIME; syslog(L_NOTICE, "%s inactive %ld", p, silence / 60L); if (silence > PEER_TIMEOUT) { syslog(L_NOTICE, "%s timeout", p); CHANclose(cp, p); } } Next: /* Bump pointer, modulo the table size. */ if (fd >= lastfd) fd = 0; else fd++; /* If there is nothing to do, break out. */ if (count == 0 && SCHANcount == 0) break; } while (fd != startpoint); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -