⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queue.c

📁 < linux网络编程工具>>配套源码
💻 C
📖 第 1 页 / 共 5 页
字号:

	/* check to see if this is the same as last time */
	if (lastctladdr != NULL && uid == lastuid &&
	    strcmp(lastctladdr->q_paddr, a->q_paddr) == 0)
		return;
	lastuid = uid;
	lastctladdr = a;

	if (uid == 0 || user == NULL || user[0] == '\0')
		fprintf(tfp, "C");
	else
		fprintf(tfp, "C%s:%ld:%ld",
			denlstring(user, TRUE, FALSE), (long) uid, (long) gid);
	fprintf(tfp, ":%s\n", denlstring(a->q_paddr, TRUE, FALSE));
}
/*
**  RUNQUEUE -- run the jobs in the queue.
**
**	Gets the stuff out of the queue in some presumably logical
**	order and processes them.
**
**	Parameters:
**		forkflag -- TRUE if the queue scanning should be done in
**			a child process.  We double-fork so it is not our
**			child and we don't have to clean up after it.
**			FALSE can be ignored if we have multiple queues.
**		verbose -- if TRUE, print out status information.
**
**	Returns:
**		TRUE if the queue run successfully began.
**
**	Side Effects:
**		runs things in the mail queue.
*/

static ENVELOPE	QueueEnvelope;		/* the queue run envelope */
int		NumQueues = 0;		/* number of queues */
static time_t	LastQueueTime = 0;	/* last time a queue ID assigned */
static pid_t	LastQueuePid = -1;	/* last PID which had a queue ID */

struct qpaths_s
{
	char	*qp_name;	/* name of queue dir */
	short	qp_subdirs;	/* use subdirs? */
};

typedef struct qpaths_s QPATHS;

/* values for qp_supdirs */
#define QP_NOSUB	0x0000	/* No subdirectories */
#define QP_SUBDF	0x0001	/* "df" subdirectory */
#define QP_SUBQF	0x0002	/* "qf" subdirectory */
#define QP_SUBXF	0x0004	/* "xf" subdirectory */

static QPATHS	*QPaths = NULL;		/* list of queue directories */

bool
runqueue(forkflag, verbose)
	bool forkflag;
	bool verbose;
{
	int i;
	bool ret = TRUE;
	static int curnum = 0;

	if (!forkflag && NumQueues > 1 && !verbose)
		forkflag = TRUE;

	for (i = 0; i < NumQueues; i++)
	{
		/*
		**  Pick up where we left off, in case we
		**  used up all the children last time
		**  without finishing.
		*/

		ret = run_single_queue(curnum, forkflag, verbose);

		/*
		**  Failure means a message was printed for ETRN
		**  and subsequent queues are likely to fail as well.
		*/

		if (!ret)
			break;

		if (++curnum >= NumQueues)
			curnum = 0;
	}
	if (QueueIntvl != 0)
		(void) setevent(QueueIntvl, runqueueevent, 0);
	return ret;
}
/*
**  RUN_SINGLE_QUEUE -- run the jobs in a single queue.
**
**	Gets the stuff out of the queue in some presumably logical
**	order and processes them.
**
**	Parameters:
**		queuedir -- queue to process
**		forkflag -- TRUE if the queue scanning should be done in
**			a child process.  We double-fork so it is not our
**			child and we don't have to clean up after it.
**		verbose -- if TRUE, print out status information.
**
**	Returns:
**		TRUE if the queue run successfully began.
**
**	Side Effects:
**		runs things in the mail queue.
*/

static bool
run_single_queue(queuedir, forkflag, verbose)
	int queuedir;
	bool forkflag;
	bool verbose;
{
	register ENVELOPE *e;
	int njobs;
	int sequenceno = 0;
	time_t current_la_time;
	extern ENVELOPE BlankEnvelope;

	DoQueueRun = FALSE;

	/*
	**  If no work will ever be selected, don't even bother reading
	**  the queue.
	*/

	CurrentLA = sm_getla(NULL);	/* get load average */
	current_la_time = curtime();

	if (shouldqueue(WkRecipFact, current_la_time))
	{
		char *msg = "Skipping queue run -- load average too high";

		if (verbose)
			message("458 %s\n", msg);
		if (LogLevel > 8)
			sm_syslog(LOG_INFO, NOQID,
				  "runqueue: %s",
				  msg);
		return FALSE;
	}

	/*
	**  See if we already have too many children.
	*/

	if (forkflag && QueueIntvl != 0 &&
	    MaxChildren > 0 && CurChildren >= MaxChildren)
	{
		char *msg = "Skipping queue run -- too many children";

		if (verbose)
			message("458 %s (%d)\n", msg, CurChildren);
		if (LogLevel > 8)
			sm_syslog(LOG_INFO, NOQID,
				  "runqueue: %s (%d)",
				  msg, CurChildren);
		return FALSE;
	}

	/*
	**  See if we want to go off and do other useful work.
	*/

	if (forkflag)
	{
		pid_t pid;

		(void) blocksignal(SIGCHLD);
		(void) setsignal(SIGCHLD, reapchild);

		pid = dofork();
		if (pid == -1)
		{
			const char *msg = "Skipping queue run -- fork() failed";
			const char *err = errstring(errno);

			if (verbose)
				message("458 %s: %s\n", msg, err);
			if (LogLevel > 8)
				sm_syslog(LOG_INFO, NOQID,
					  "runqueue: %s: %s",
					  msg, err);
			(void) releasesignal(SIGCHLD);
			return FALSE;
		}
		if (pid != 0)
		{
			/* parent -- pick up intermediate zombie */
			(void) blocksignal(SIGALRM);
			proc_list_add(pid, "Queue runner", PROC_QUEUE);
			(void) releasesignal(SIGALRM);
			(void) releasesignal(SIGCHLD);
			return TRUE;
		}
		/* child -- clean up signals */
		clrcontrol();
		proc_list_clear();

		/* Add parent process as first child item */
		proc_list_add(getpid(), "Queue runner child process",
			      PROC_QUEUE_CHILD);
		(void) releasesignal(SIGCHLD);
		(void) setsignal(SIGCHLD, SIG_DFL);
		(void) setsignal(SIGHUP, intsig);

	}

	sm_setproctitle(TRUE, CurEnv, "running queue: %s",
			qid_printqueue(queuedir));

	if (LogLevel > 69 || tTd(63, 99))
		sm_syslog(LOG_DEBUG, NOQID,
			  "runqueue %s, pid=%d, forkflag=%d",
			  qid_printqueue(queuedir), getpid(), forkflag);

	/*
	**  Release any resources used by the daemon code.
	*/

# if DAEMON
	clrdaemon();
# endif /* DAEMON */

	/* force it to run expensive jobs */
	NoConnect = FALSE;

	/* drop privileges */
	if (geteuid() == (uid_t) 0)
		(void) drop_privileges(FALSE);

	/*
	**  Create ourselves an envelope
	*/

	CurEnv = &QueueEnvelope;
	e = newenvelope(&QueueEnvelope, CurEnv);
	e->e_flags = BlankEnvelope.e_flags;

	/* make sure we have disconnected from parent */
	if (forkflag)
	{
		disconnect(1, e);
		QuickAbort = FALSE;
	}

	/*
	**  If we are running part of the queue, always ignore stored
	**  host status.
	*/

	if (QueueLimitId != NULL || QueueLimitSender != NULL ||
	    QueueLimitRecipient != NULL)
	{
		IgnoreHostStatus = TRUE;
		MinQueueAge = 0;
	}

	/*
	**  Start making passes through the queue.
	**	First, read and sort the entire queue.
	**	Then, process the work in that order.
	**		But if you take too long, start over.
	*/

	/* order the existing work requests */
	njobs = orderq(queuedir, FALSE);


	/* process them once at a time */
	while (WorkQ != NULL)
	{
		WORK *w = WorkQ;

		WorkQ = WorkQ->w_next;
		e->e_to = NULL;

		/*
		**  Ignore jobs that are too expensive for the moment.
		**
		**	Get new load average every 30 seconds.
		*/

		if (current_la_time < curtime() - 30)
		{
			CurrentLA = sm_getla(e);
			current_la_time = curtime();
		}
		if (shouldqueue(WkRecipFact, current_la_time))
		{
			char *msg = "Aborting queue run: load average too high";

			if (Verbose)
				message("%s", msg);
			if (LogLevel > 8)
				sm_syslog(LOG_INFO, NOQID,
					  "runqueue: %s",
					  msg);
			break;
		}
		sequenceno++;
		if (shouldqueue(w->w_pri, w->w_ctime))
		{
			if (Verbose)
				message("");
			if (QueueSortOrder == QSO_BYPRIORITY)
			{
				if (Verbose)
					message("Skipping %s/%s (sequence %d of %d) and flushing rest of queue",
						qid_printqueue(queuedir),
						w->w_name + 2,
						sequenceno,
						njobs);
				if (LogLevel > 8)
					sm_syslog(LOG_INFO, NOQID,
						  "runqueue: Flushing queue from %s/%s (pri %ld, LA %d, %d of %d)",
						  qid_printqueue(queuedir),
						  w->w_name + 2,
						  w->w_pri,
						  CurrentLA,
						  sequenceno,
						  njobs);
				break;
			}
			else if (Verbose)
				message("Skipping %s/%s (sequence %d of %d)",
					qid_printqueue(queuedir),
					w->w_name + 2,
					sequenceno, njobs);
		}
		else
		{
			pid_t pid;

			if (Verbose)
			{
				message("");
				message("Running %s/%s (sequence %d of %d)",
					qid_printqueue(queuedir),
					w->w_name + 2,
					sequenceno, njobs);
			}
			if (tTd(63, 100))
				sm_syslog(LOG_DEBUG, NOQID,
					  "runqueue %s dowork(%s)",
					  qid_printqueue(queuedir),
					  w->w_name + 2);

			pid = dowork(queuedir, w->w_name + 2,
				     ForkQueueRuns, FALSE, e);
			errno = 0;
			if (pid != 0)
				(void) waitfor(pid);
		}
		free(w->w_name);
		if (w->w_host)
			free(w->w_host);
		free((char *) w);
	}

	/* exit without the usual cleanup */
	e->e_id = NULL;
	if (forkflag)
		finis(TRUE, ExitStat);
	/* NOTREACHED */
	return TRUE;
}

/*
**  RUNQUEUEEVENT -- stub for use in setevent
*/

static void
runqueueevent()
{
	DoQueueRun = TRUE;
}
/*
**  ORDERQ -- order the work queue.
**
**	Parameters:
**		queuedir -- the index of the queue directory.
**		doall -- if set, include everything in the queue (even
**			the jobs that cannot be run because the load
**			average is too high).  Otherwise, exclude those
**			jobs.
**
**	Returns:
**		The number of request in the queue (not necessarily
**		the number of requests in WorkQ however).
**
**	Side Effects:
**		Sets WorkQ to the queue of available work, in order.
*/

# define NEED_P		001
# define NEED_T		002
# define NEED_R		004
# define NEED_S		010

static WORK	*WorkList = NULL;
static int	WorkListSize = 0;

static int
orderq(queuedir, doall)
	int queuedir;
	bool doall;
{
	register struct dirent *d;
	register WORK *w;
	register char *p;
	DIR *f;
	register int i;
	int wn = -1;
	int wc;
	QUEUE_CHAR *check;
	char qd[MAXPATHLEN];
	char qf[MAXPATHLEN];

	if (queuedir == NOQDIR)
		(void) strlcpy(qd, ".", sizeof qd);
	else
		(void) snprintf(qd, sizeof qd, "%s%s",
				QPaths[queuedir].qp_name,
				(bitset(QP_SUBQF, QPaths[queuedir].qp_subdirs) ? "/qf" : ""));

	if (tTd(41, 1))
	{
		dprintf("orderq:\n");

		check = QueueLimitId;
		while (check != NULL)
		{
			dprintf("\tQueueLimitId = %s\n",
				check->queue_match);
			check = check->queue_next;
		}

		check = QueueLimitSender;
		while (check != NULL)
		{
			dprintf("\tQueueLimitSender = %s\n",
				check->queue_match);
			check = check->queue_next;
		}

		check = QueueLimitRecipient;
		while (check != NULL)
		{
			dprintf("\tQueueLimitRecipient = %s\n",
				check->queue_match);
			check = check->queue_next;
		}
	}

	/* clear out old WorkQ */
	for (w = WorkQ; w != NULL; )
	{
		register WORK *nw = w->w_next;

		WorkQ = nw;
		free(w->w_name);
		if (w->w_host != NULL)
			free(w->w_host);
		free((char *) w);
		w = nw;
	}

	/* open the queue directory */
	f = opendir(qd);
	if (f == NULL)
	{
		syserr("orderq: cannot open \"%s\"", qid_printqueue(queuedir));
		return 0;
	}

	/*
	**  Read the work directory.
	*/

	while ((d = readdir(f)) != NULL)
	{
		FILE *cf;
		int qfver = 0;
		char lbuf[MAXNAME + 1];
		struct stat sbuf;

		if (tTd(41, 50))
			dprintf("orderq: checking %s\n", d->d_name);

		/* is this an interesting entry? */
		if (d->d_name[0] != 'q' || d->d_name[1] != 'f')
			continue;

		if (strlen(d->d_name) >= MAXQFNAME)
		{
			if (Verbose)
				printf("orderq: %s too long, %d max characters\n",
					d->d_name, MAXQFNAME);
			if (LogLevel > 0)
				sm_syslog(LOG_ALERT, NOQID,
					  "orderq: %s too long, %d max characters",
					  d->d_name, MAXQFNAME);
			continue;
		}

		check = QueueLimitId;
		while (check != NULL)
		{
			if (strcontainedin(check->queue_match, d->d_name))
				break;
			else
				check = check->queue_next;
		}
		if (QueueLimitId != NULL && check == NULL)
			continue;

		/* grow work list if necessary */
		if (++wn >= MaxQueueRun && MaxQueueRun > 0)
		{
			if (wn == MaxQueueRun && LogLevel > 0)
				sm_syslog(LOG_WARNING, NOQID,
					  "WorkList for %s maxed out at %d",
					  qid_printqueue(queuedir),
					  MaxQueueRun);
			continue;
		}
		if (wn >= WorkListSize)
		{
			grow_wlist(queuedir);
			if (wn >= WorkListSize)
				continue;
		}
		w = &WorkList[wn];

		(void) snprintf(qf, sizeof qf, "%s/%s", qd, d->d_name);
		if (stat(qf, &sbuf) < 0)
		{
			if (errno != ENOENT)
				sm_syslog(LOG_INFO, NOQID,
					  "orderq: can't stat %s/%s",
					  qid_printqueue(queuedir), d->d_name);
			wn--;
			continue;
		}
		if (!bitset(S_IFREG, sbuf.st_mode))
		{
			/* Yikes!  Skip it or we will hang on open! */
			syserr("orderq: %s/%s is not a regular file",
			       qid_printqueue(queuedir), d->d_name);
			wn--;
			continue;
		}

		/* avoid work if possible */
		if (QueueSortOrder == QSO_BYFILENAME &&
		    QueueLimitSender == NULL &&
		    QueueLimitRecipient == NULL)
		{
			w->w_name = newstr(d->d_name);
			w->w_host = NULL;
			w->w_lock = w->w_tooyoung = FALSE;
			w->w_pri = 0;
			w->w_ctime = 0;
			continue;
		}

		/* open control file */
		cf = fopen(qf, "r");
		if (cf == NULL)
		{
			/* this may be some random person sending hir msgs */
			/* syserr("orderq: cannot open %s", cbuf); */
			if (tTd(41, 2))
				dprintf("orderq: cannot open %s: %s\n",
					d->d_name, errstring(errno));
			errno = 0;
			wn--;
			continue;
		}
		w->w_name = newstr(d->d_name);
		w->w_host = NULL;
		w->w_lock = !lockfile(fileno(cf), w->w_name, NULL, LOCK_SH|LOCK_NB);
		w->w_tooyoung = FALSE;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -