📄 queue.c
字号:
NoConnect = FALSE; /* ** Create ourselves an envelope */ CurEnv = &QueueEnvelope; e = newenvelope(&QueueEnvelope, CurEnv); e->e_flags = BlankEnvelope.e_flags; /* ** Make sure the alias database is open. */ initmaps(FALSE, e); /* ** Start making passes through the queue. ** First, read and sort the entire queue. ** Then, process the work in that order. ** But if you take too long, start over. */ /* order the existing work requests */ (void) orderq(FALSE); /* process them once at a time */ while (WorkQ != NULL) { WORK *w = WorkQ; WorkQ = WorkQ->w_next; /* ** Ignore jobs that are too expensive for the moment. */ if (shouldqueue(w->w_pri, w->w_ctime)) { if (Verbose) printf("\nSkipping %s\n", w->w_name + 2); } else { pid_t pid; extern pid_t dowork(); pid = dowork(w->w_name + 2, ForkQueueRuns, FALSE, e); errno = 0; if (pid != 0) (void) waitfor(pid); } free(w->w_name); free((char *) w); } /* exit without the usual cleanup */ e->e_id = NULL; finis();}/*** ORDERQ -- order the work queue.**** Parameters:** doall -- if set, include everything in the queue (even** the jobs that cannot be run because the load** average is too high). Otherwise, exclude those** jobs.**** Returns:** The number of request in the queue (not necessarily** the number of requests in WorkQ however).**** Side Effects:** Sets WorkQ to the queue of available work, in order.*/# define NEED_P 001# define NEED_T 002# define NEED_R 004# define NEED_S 010orderq(doall) bool doall;{ register struct dirent *d; register WORK *w; DIR *f; register int i; WORK wlist[QUEUESIZE+1]; int wn = -1; extern workcmpf(); if (tTd(41, 1)) { printf("orderq:\n"); if (QueueLimitId != NULL) printf("\tQueueLimitId = %s\n", QueueLimitId); if (QueueLimitSender != NULL) printf("\tQueueLimitSender = %s\n", QueueLimitSender); if (QueueLimitRecipient != NULL) printf("\tQueueLimitRecipient = %s\n", QueueLimitRecipient); } /* clear out old WorkQ */ for (w = WorkQ; w != NULL; ) { register WORK *nw = w->w_next; WorkQ = nw; free(w->w_name); free((char *) w); w = nw; } /* open the queue directory */ f = opendir("."); if (f == NULL) { syserr("orderq: cannot open \"%s\" as \".\"", QueueDir); return (0); } /* ** Read the work directory. */ while ((d = readdir(f)) != NULL) { FILE *cf; register char *p; char lbuf[MAXNAME]; extern bool strcontainedin(); /* is this an interesting entry? */ if (d->d_name[0] != 'q' || d->d_name[1] != 'f') continue; if (QueueLimitId != NULL && !strcontainedin(QueueLimitId, d->d_name)) continue; /* ** Check queue name for plausibility. This handles ** both old and new type ids. */ p = d->d_name + 2; if (isupper(p[0]) && isupper(p[2])) p += 3; else if (isupper(p[1])) p += 2; else p = d->d_name; for (i = 0; isdigit(*p); p++) i++; if (i < 5 || *p != '\0') { if (Verbose) printf("orderq: bogus qf name %s\n", d->d_name);#ifdef LOG if (LogLevel > 3) syslog(LOG_CRIT, "orderq: bogus qf name %s", d->d_name);#endif if (strlen(d->d_name) >= MAXNAME) d->d_name[MAXNAME - 1] = '\0'; strcpy(lbuf, d->d_name); lbuf[0] = 'Q'; (void) rename(d->d_name, lbuf); continue; } /* yes -- open control file (if not too many files) */ if (++wn >= QUEUESIZE) continue; cf = fopen(d->d_name, "r"); if (cf == NULL) { /* this may be some random person sending hir msgs */ /* syserr("orderq: cannot open %s", cbuf); */ if (tTd(41, 2)) printf("orderq: cannot open %s (%d)\n", d->d_name, errno); errno = 0; wn--; continue; } w = &wlist[wn]; w->w_name = newstr(d->d_name); /* make sure jobs in creation don't clog queue */ w->w_pri = 0x7fffffff; w->w_ctime = 0; /* extract useful information */ i = NEED_P | NEED_T; if (QueueLimitSender != NULL) i |= NEED_S; if (QueueLimitRecipient != NULL) i |= NEED_R; while (i != 0 && fgets(lbuf, sizeof lbuf, cf) != NULL) { extern long atol(); extern bool strcontainedin(); switch (lbuf[0]) { case 'P': w->w_pri = atol(&lbuf[1]); i &= ~NEED_P; break; case 'T': w->w_ctime = atol(&lbuf[1]); i &= ~NEED_T; break; case 'R': if (QueueLimitRecipient != NULL && strcontainedin(QueueLimitRecipient, &lbuf[1])) i &= ~NEED_R; break; case 'S': if (QueueLimitSender != NULL && strcontainedin(QueueLimitSender, &lbuf[1])) i &= ~NEED_S; break; } } (void) fclose(cf); if ((!doall && shouldqueue(w->w_pri, w->w_ctime)) || bitset(NEED_R|NEED_S, i)) { /* don't even bother sorting this job in */ wn--; } } (void) closedir(f); wn++; /* ** Sort the work directory. */ qsort((char *) wlist, min(wn, QUEUESIZE), sizeof *wlist, workcmpf); /* ** Convert the work list into canonical form. ** Should be turning it into a list of envelopes here perhaps. */ WorkQ = NULL; for (i = min(wn, QUEUESIZE); --i >= 0; ) { w = (WORK *) xalloc(sizeof *w); w->w_name = wlist[i].w_name; w->w_pri = wlist[i].w_pri; w->w_ctime = wlist[i].w_ctime; w->w_next = WorkQ; WorkQ = w; } if (tTd(40, 1)) { for (w = WorkQ; w != NULL; w = w->w_next) printf("%32s: pri=%ld\n", w->w_name, w->w_pri); } return (wn);}/*** WORKCMPF -- compare function for ordering work.**** Parameters:** a -- the first argument.** b -- the second argument.**** Returns:** -1 if a < b** 0 if a == b** +1 if a > b**** Side Effects:** none.*/workcmpf(a, b) register WORK *a; register WORK *b;{ long pa = a->w_pri; long pb = b->w_pri; if (pa == pb) return (0); else if (pa > pb) return (1); else return (-1);}/*** DOWORK -- do a work request.**** Parameters:** id -- the ID of the job to run.** forkflag -- if set, run this in background.** requeueflag -- if set, reinstantiate the queue quickly.** This is used when expanding aliases in the queue.** If forkflag is also set, it doesn't wait for the** child.** e - the envelope in which to run it.**** Returns:** process id of process that is running the queue job.**** Side Effects:** The work request is satisfied if possible.*/pid_tdowork(id, forkflag, requeueflag, e) char *id; bool forkflag; bool requeueflag; register ENVELOPE *e;{ register pid_t pid; extern bool readqf(); if (tTd(40, 1)) printf("dowork(%s)\n", id); /* ** Fork for work. */ if (forkflag) { pid = fork(); if (pid < 0) { syserr("dowork: cannot fork"); return 0; } else if (pid > 0) { /* parent -- clean out connection cache */ mci_flush(FALSE, NULL); } } else { pid = 0; } if (pid == 0) { /* ** CHILD ** Lock the control file to avoid duplicate deliveries. ** Then run the file as though we had just read it. ** We save an idea of the temporary name so we ** can recover on interrupt. */ /* set basic modes, etc. */ (void) alarm(0); clearenvelope(e, FALSE); e->e_flags |= EF_QUEUERUN|EF_GLOBALERRS; e->e_errormode = EM_MAIL; e->e_id = id; GrabTo = UseErrorsTo = FALSE; ExitStat = EX_OK; if (forkflag) { disconnect(1, e); OpMode = MD_DELIVER; }# ifdef LOG if (LogLevel > 76) syslog(LOG_DEBUG, "%s: dowork, pid=%d", e->e_id, getpid());# endif /* LOG */ /* don't use the headers from sendmail.cf... */ e->e_header = NULL; /* read the queue control file -- return if locked */ if (!readqf(e)) { if (tTd(40, 4)) printf("readqf(%s) failed\n", e->e_id); if (forkflag) exit(EX_OK); else return 0; } e->e_flags |= EF_INQUEUE; eatheader(e, requeueflag); if (requeueflag) queueup(e, TRUE, FALSE); /* do the delivery */ sendall(e, SM_DELIVER); /* finish up and exit */ if (forkflag) finis(); else dropenvelope(e); } e->e_id = NULL; return pid;}/*** READQF -- read queue file and set up environment.**** Parameters:** e -- the envelope of the job to run.**** Returns:** TRUE if it successfully read the queue file.** FALSE otherwise.**** Side Effects:** The queue file is returned locked.*/boolreadqf(e) register ENVELOPE *e;{ register FILE *qfp; ADDRESS *ctladdr; struct stat st; char *bp; char qf[20]; char buf[MAXLINE]; extern long atol(); extern ADDRESS *setctluser(); /* ** Read and process the file. */ strcpy(qf, queuename(e, 'q')); qfp = fopen(qf, "r+"); if (qfp == NULL) { if (tTd(40, 8)) printf("readqf(%s): fopen failure (%s)\n", qf, errstring(errno)); if (errno != ENOENT) syserr("readqf: no control file %s", qf); return FALSE; } if (!lockfile(fileno(qfp), qf, NULL, LOCK_EX|LOCK_NB)) { /* being processed by another queuer */ if (tTd(40, 8)) printf("readqf(%s): locked\n", qf); if (Verbose) printf("%s: locked\n", e->e_id);# ifdef LOG if (LogLevel > 19) syslog(LOG_DEBUG, "%s: locked", e->e_id);# endif /* LOG */ (void) fclose(qfp); return FALSE; } /* ** Check the queue file for plausibility to avoid attacks. */ if (fstat(fileno(qfp), &st) < 0) { /* must have been being processed by someone else */ if (tTd(40, 8)) printf("readqf(%s): fstat failure (%s)\n", qf, errstring(errno)); fclose(qfp); return FALSE; } if (st.st_uid != geteuid()) {# ifdef LOG if (LogLevel > 0) { syslog(LOG_ALERT, "%s: bogus queue file, uid=%d, mode=%o", e->e_id, st.st_uid, st.st_mode); }# endif /* LOG */ if (tTd(40, 8)) printf("readqf(%s): bogus file\n", qf); rename(qf, queuename(e, 'Q')); fclose(qfp); return FALSE; } if (st.st_size == 0) { /* must be a bogus file -- just remove it */ (void) unlink(qf); fclose(qfp); return FALSE; } if (st.st_nlink == 0) { /* ** Race condition -- we got a file just as it was being ** unlinked. Just assume it is zero length. */ fclose(qfp);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -