📄 queue.c
字号:
/* check to see if this is the same as last time */
if (lastctladdr != NULL && uid == lastuid &&
strcmp(lastctladdr->q_paddr, a->q_paddr) == 0)
return;
lastuid = uid;
lastctladdr = a;
if (uid == 0 || user == NULL || user[0] == '\0')
fprintf(tfp, "C");
else
fprintf(tfp, "C%s:%ld:%ld",
denlstring(user, TRUE, FALSE), (long) uid, (long) gid);
fprintf(tfp, ":%s\n", denlstring(a->q_paddr, TRUE, FALSE));
}
/*
** RUNQUEUE -- run the jobs in the queue.
**
** Gets the stuff out of the queue in some presumably logical
** order and processes them.
**
** Parameters:
** forkflag -- TRUE if the queue scanning should be done in
** a child process. We double-fork so it is not our
** child and we don't have to clean up after it.
** FALSE can be ignored if we have multiple queues.
** verbose -- if TRUE, print out status information.
**
** Returns:
** TRUE if the queue run successfully began.
**
** Side Effects:
** runs things in the mail queue.
*/
static ENVELOPE QueueEnvelope; /* the queue run envelope */
int NumQueues = 0; /* number of queues */
static time_t LastQueueTime = 0; /* last time a queue ID assigned */
static pid_t LastQueuePid = -1; /* last PID which had a queue ID */
struct qpaths_s
{
char *qp_name; /* name of queue dir */
short qp_subdirs; /* use subdirs? */
};
typedef struct qpaths_s QPATHS;
/* values for qp_supdirs */
#define QP_NOSUB 0x0000 /* No subdirectories */
#define QP_SUBDF 0x0001 /* "df" subdirectory */
#define QP_SUBQF 0x0002 /* "qf" subdirectory */
#define QP_SUBXF 0x0004 /* "xf" subdirectory */
static QPATHS *QPaths = NULL; /* list of queue directories */
bool
runqueue(forkflag, verbose)
bool forkflag;
bool verbose;
{
int i;
bool ret = TRUE;
static int curnum = 0;
if (!forkflag && NumQueues > 1 && !verbose)
forkflag = TRUE;
for (i = 0; i < NumQueues; i++)
{
/*
** Pick up where we left off, in case we
** used up all the children last time
** without finishing.
*/
ret = run_single_queue(curnum, forkflag, verbose);
/*
** Failure means a message was printed for ETRN
** and subsequent queues are likely to fail as well.
*/
if (!ret)
break;
if (++curnum >= NumQueues)
curnum = 0;
}
if (QueueIntvl != 0)
(void) setevent(QueueIntvl, runqueueevent, 0);
return ret;
}
/*
** RUN_SINGLE_QUEUE -- run the jobs in a single queue.
**
** Gets the stuff out of the queue in some presumably logical
** order and processes them.
**
** Parameters:
** queuedir -- queue to process
** forkflag -- TRUE if the queue scanning should be done in
** a child process. We double-fork so it is not our
** child and we don't have to clean up after it.
** verbose -- if TRUE, print out status information.
**
** Returns:
** TRUE if the queue run successfully began.
**
** Side Effects:
** runs things in the mail queue.
*/
static bool
run_single_queue(queuedir, forkflag, verbose)
int queuedir;
bool forkflag;
bool verbose;
{
register ENVELOPE *e;
int njobs;
int sequenceno = 0;
time_t current_la_time;
extern ENVELOPE BlankEnvelope;
DoQueueRun = FALSE;
/*
** If no work will ever be selected, don't even bother reading
** the queue.
*/
CurrentLA = sm_getla(NULL); /* get load average */
current_la_time = curtime();
if (shouldqueue(WkRecipFact, current_la_time))
{
char *msg = "Skipping queue run -- load average too high";
if (verbose)
message("458 %s\n", msg);
if (LogLevel > 8)
sm_syslog(LOG_INFO, NOQID,
"runqueue: %s",
msg);
return FALSE;
}
/*
** See if we already have too many children.
*/
if (forkflag && QueueIntvl != 0 &&
MaxChildren > 0 && CurChildren >= MaxChildren)
{
char *msg = "Skipping queue run -- too many children";
if (verbose)
message("458 %s (%d)\n", msg, CurChildren);
if (LogLevel > 8)
sm_syslog(LOG_INFO, NOQID,
"runqueue: %s (%d)",
msg, CurChildren);
return FALSE;
}
/*
** See if we want to go off and do other useful work.
*/
if (forkflag)
{
pid_t pid;
(void) blocksignal(SIGCHLD);
(void) setsignal(SIGCHLD, reapchild);
pid = dofork();
if (pid == -1)
{
const char *msg = "Skipping queue run -- fork() failed";
const char *err = errstring(errno);
if (verbose)
message("458 %s: %s\n", msg, err);
if (LogLevel > 8)
sm_syslog(LOG_INFO, NOQID,
"runqueue: %s: %s",
msg, err);
(void) releasesignal(SIGCHLD);
return FALSE;
}
if (pid != 0)
{
/* parent -- pick up intermediate zombie */
(void) blocksignal(SIGALRM);
proc_list_add(pid, "Queue runner", PROC_QUEUE);
(void) releasesignal(SIGALRM);
(void) releasesignal(SIGCHLD);
return TRUE;
}
/* child -- clean up signals */
clrcontrol();
proc_list_clear();
/* Add parent process as first child item */
proc_list_add(getpid(), "Queue runner child process",
PROC_QUEUE_CHILD);
(void) releasesignal(SIGCHLD);
(void) setsignal(SIGCHLD, SIG_DFL);
(void) setsignal(SIGHUP, intsig);
}
sm_setproctitle(TRUE, CurEnv, "running queue: %s",
qid_printqueue(queuedir));
if (LogLevel > 69 || tTd(63, 99))
sm_syslog(LOG_DEBUG, NOQID,
"runqueue %s, pid=%d, forkflag=%d",
qid_printqueue(queuedir), getpid(), forkflag);
/*
** Release any resources used by the daemon code.
*/
# if DAEMON
clrdaemon();
# endif /* DAEMON */
/* force it to run expensive jobs */
NoConnect = FALSE;
/* drop privileges */
if (geteuid() == (uid_t) 0)
(void) drop_privileges(FALSE);
/*
** Create ourselves an envelope
*/
CurEnv = &QueueEnvelope;
e = newenvelope(&QueueEnvelope, CurEnv);
e->e_flags = BlankEnvelope.e_flags;
/* make sure we have disconnected from parent */
if (forkflag)
{
disconnect(1, e);
QuickAbort = FALSE;
}
/*
** If we are running part of the queue, always ignore stored
** host status.
*/
if (QueueLimitId != NULL || QueueLimitSender != NULL ||
QueueLimitRecipient != NULL)
{
IgnoreHostStatus = TRUE;
MinQueueAge = 0;
}
/*
** Start making passes through the queue.
** First, read and sort the entire queue.
** Then, process the work in that order.
** But if you take too long, start over.
*/
/* order the existing work requests */
njobs = orderq(queuedir, FALSE);
/* process them once at a time */
while (WorkQ != NULL)
{
WORK *w = WorkQ;
WorkQ = WorkQ->w_next;
e->e_to = NULL;
/*
** Ignore jobs that are too expensive for the moment.
**
** Get new load average every 30 seconds.
*/
if (current_la_time < curtime() - 30)
{
CurrentLA = sm_getla(e);
current_la_time = curtime();
}
if (shouldqueue(WkRecipFact, current_la_time))
{
char *msg = "Aborting queue run: load average too high";
if (Verbose)
message("%s", msg);
if (LogLevel > 8)
sm_syslog(LOG_INFO, NOQID,
"runqueue: %s",
msg);
break;
}
sequenceno++;
if (shouldqueue(w->w_pri, w->w_ctime))
{
if (Verbose)
message("");
if (QueueSortOrder == QSO_BYPRIORITY)
{
if (Verbose)
message("Skipping %s/%s (sequence %d of %d) and flushing rest of queue",
qid_printqueue(queuedir),
w->w_name + 2,
sequenceno,
njobs);
if (LogLevel > 8)
sm_syslog(LOG_INFO, NOQID,
"runqueue: Flushing queue from %s/%s (pri %ld, LA %d, %d of %d)",
qid_printqueue(queuedir),
w->w_name + 2,
w->w_pri,
CurrentLA,
sequenceno,
njobs);
break;
}
else if (Verbose)
message("Skipping %s/%s (sequence %d of %d)",
qid_printqueue(queuedir),
w->w_name + 2,
sequenceno, njobs);
}
else
{
pid_t pid;
if (Verbose)
{
message("");
message("Running %s/%s (sequence %d of %d)",
qid_printqueue(queuedir),
w->w_name + 2,
sequenceno, njobs);
}
if (tTd(63, 100))
sm_syslog(LOG_DEBUG, NOQID,
"runqueue %s dowork(%s)",
qid_printqueue(queuedir),
w->w_name + 2);
pid = dowork(queuedir, w->w_name + 2,
ForkQueueRuns, FALSE, e);
errno = 0;
if (pid != 0)
(void) waitfor(pid);
}
free(w->w_name);
if (w->w_host)
free(w->w_host);
free((char *) w);
}
/* exit without the usual cleanup */
e->e_id = NULL;
if (forkflag)
finis(TRUE, ExitStat);
/* NOTREACHED */
return TRUE;
}
/*
** RUNQUEUEEVENT -- stub for use in setevent
*/
static void
runqueueevent()
{
DoQueueRun = TRUE;
}
/*
** ORDERQ -- order the work queue.
**
** Parameters:
** queuedir -- the index of the queue directory.
** doall -- if set, include everything in the queue (even
** the jobs that cannot be run because the load
** average is too high). Otherwise, exclude those
** jobs.
**
** Returns:
** The number of request in the queue (not necessarily
** the number of requests in WorkQ however).
**
** Side Effects:
** Sets WorkQ to the queue of available work, in order.
*/
# define NEED_P 001
# define NEED_T 002
# define NEED_R 004
# define NEED_S 010
static WORK *WorkList = NULL;
static int WorkListSize = 0;
static int
orderq(queuedir, doall)
int queuedir;
bool doall;
{
register struct dirent *d;
register WORK *w;
register char *p;
DIR *f;
register int i;
int wn = -1;
int wc;
QUEUE_CHAR *check;
char qd[MAXPATHLEN];
char qf[MAXPATHLEN];
if (queuedir == NOQDIR)
(void) strlcpy(qd, ".", sizeof qd);
else
(void) snprintf(qd, sizeof qd, "%s%s",
QPaths[queuedir].qp_name,
(bitset(QP_SUBQF, QPaths[queuedir].qp_subdirs) ? "/qf" : ""));
if (tTd(41, 1))
{
dprintf("orderq:\n");
check = QueueLimitId;
while (check != NULL)
{
dprintf("\tQueueLimitId = %s\n",
check->queue_match);
check = check->queue_next;
}
check = QueueLimitSender;
while (check != NULL)
{
dprintf("\tQueueLimitSender = %s\n",
check->queue_match);
check = check->queue_next;
}
check = QueueLimitRecipient;
while (check != NULL)
{
dprintf("\tQueueLimitRecipient = %s\n",
check->queue_match);
check = check->queue_next;
}
}
/* clear out old WorkQ */
for (w = WorkQ; w != NULL; )
{
register WORK *nw = w->w_next;
WorkQ = nw;
free(w->w_name);
if (w->w_host != NULL)
free(w->w_host);
free((char *) w);
w = nw;
}
/* open the queue directory */
f = opendir(qd);
if (f == NULL)
{
syserr("orderq: cannot open \"%s\"", qid_printqueue(queuedir));
return 0;
}
/*
** Read the work directory.
*/
while ((d = readdir(f)) != NULL)
{
FILE *cf;
int qfver = 0;
char lbuf[MAXNAME + 1];
struct stat sbuf;
if (tTd(41, 50))
dprintf("orderq: checking %s\n", d->d_name);
/* is this an interesting entry? */
if (d->d_name[0] != 'q' || d->d_name[1] != 'f')
continue;
if (strlen(d->d_name) >= MAXQFNAME)
{
if (Verbose)
printf("orderq: %s too long, %d max characters\n",
d->d_name, MAXQFNAME);
if (LogLevel > 0)
sm_syslog(LOG_ALERT, NOQID,
"orderq: %s too long, %d max characters",
d->d_name, MAXQFNAME);
continue;
}
check = QueueLimitId;
while (check != NULL)
{
if (strcontainedin(check->queue_match, d->d_name))
break;
else
check = check->queue_next;
}
if (QueueLimitId != NULL && check == NULL)
continue;
/* grow work list if necessary */
if (++wn >= MaxQueueRun && MaxQueueRun > 0)
{
if (wn == MaxQueueRun && LogLevel > 0)
sm_syslog(LOG_WARNING, NOQID,
"WorkList for %s maxed out at %d",
qid_printqueue(queuedir),
MaxQueueRun);
continue;
}
if (wn >= WorkListSize)
{
grow_wlist(queuedir);
if (wn >= WorkListSize)
continue;
}
w = &WorkList[wn];
(void) snprintf(qf, sizeof qf, "%s/%s", qd, d->d_name);
if (stat(qf, &sbuf) < 0)
{
if (errno != ENOENT)
sm_syslog(LOG_INFO, NOQID,
"orderq: can't stat %s/%s",
qid_printqueue(queuedir), d->d_name);
wn--;
continue;
}
if (!bitset(S_IFREG, sbuf.st_mode))
{
/* Yikes! Skip it or we will hang on open! */
syserr("orderq: %s/%s is not a regular file",
qid_printqueue(queuedir), d->d_name);
wn--;
continue;
}
/* avoid work if possible */
if (QueueSortOrder == QSO_BYFILENAME &&
QueueLimitSender == NULL &&
QueueLimitRecipient == NULL)
{
w->w_name = newstr(d->d_name);
w->w_host = NULL;
w->w_lock = w->w_tooyoung = FALSE;
w->w_pri = 0;
w->w_ctime = 0;
continue;
}
/* open control file */
cf = fopen(qf, "r");
if (cf == NULL)
{
/* this may be some random person sending hir msgs */
/* syserr("orderq: cannot open %s", cbuf); */
if (tTd(41, 2))
dprintf("orderq: cannot open %s: %s\n",
d->d_name, errstring(errno));
errno = 0;
wn--;
continue;
}
w->w_name = newstr(d->d_name);
w->w_host = NULL;
w->w_lock = !lockfile(fileno(cf), w->w_name, NULL, LOCK_SH|LOCK_NB);
w->w_tooyoung = FALSE;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -