⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prtpool.c

📁 Netscape NSPR库源码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- *//*  * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ *  * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. *  * The Original Code is the Netscape Portable Runtime (NSPR). *  * The Initial Developer of the Original Code is Netscape * Communications Corporation.  Portions created by Netscape are  * Copyright (C) 1999-2000 Netscape Communications Corporation.  All * Rights Reserved. *  * Contributor(s): *  * Alternatively, the contents of this file may be used under the * terms of the GNU General Public License Version 2 or later (the * "GPL"), in which case the provisions of the GPL are applicable  * instead of those above.  If you wish to allow use of your  * version of this file only under the terms of the GPL and not to * allow others to use your version of this file under the MPL, * indicate your decision by deleting the provisions above and * replace them with the notice and other provisions required by * the GPL.  If you do not delete the provisions above, a recipient * may use your version of this file under either the MPL or the * GPL. */#include "nspr.h"/* * Thread pools *	Thread pools create and manage threads to provide support for *	scheduling jobs onto one or more threads. * */#ifdef OPT_WINNT#include <windows.h>#endif/* * worker thread */typedef struct wthread {	PRCList		links;	PRThread	*thread;} wthread;/* * queue of timer jobs */typedef struct timer_jobq {	PRCList		list;	PRLock		*lock;	PRCondVar	*cv;	PRInt32		cnt;	PRCList 	wthreads;} timer_jobq;/* * queue of jobs */typedef struct tp_jobq {	PRCList		list;	PRInt32		cnt;	PRLock		*lock;	PRCondVar	*cv;	PRCList 	wthreads;#ifdef OPT_WINNT	HANDLE		nt_completion_port;#endif} tp_jobq;/* * queue of IO jobs */typedef struct io_jobq {	PRCList		list;	PRPollDesc  *pollfds;	PRInt32  	npollfds;	PRJob		**polljobs;	PRLock		*lock;	PRInt32		cnt;	PRFileDesc	*notify_fd;	PRCList 	wthreads;} io_jobq;/* * Threadpool */struct PRThreadPool {	PRInt32		init_threads;	PRInt32		max_threads;	PRInt32		current_threads;	PRInt32		idle_threads;	PRUint32	stacksize;	tp_jobq		jobq;	io_jobq		ioq;	timer_jobq	timerq;	PRLock		*join_lock;		/* used with jobp->join_cv */	PRCondVar	*shutdown_cv;	PRBool		shutdown;};typedef enum io_op_type	{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;#ifdef OPT_WINNTtypedef struct NT_notifier {	OVERLAPPED overlapped;		/* must be first */	PRJob	*jobp;} NT_notifier;#endifstruct PRJob {	PRCList			links;		/* 	for linking jobs */	PRBool			on_ioq;		/* job on ioq */	PRBool			on_timerq;	/* job on timerq */	PRJobFn			job_func;	void 			*job_arg;	PRCondVar		*join_cv;	PRBool			join_wait;	/* == PR_TRUE, when waiting to join */	PRCondVar		*cancel_cv;	/* for cancelling IO jobs */	PRBool			cancel_io;	/* for cancelling IO jobs */	PRThreadPool	*tpool;		/* back pointer to thread pool */	PRJobIoDesc		*iod;	io_op_type		io_op;	PRInt16			io_poll_flags;	PRNetAddr		*netaddr;	PRIntervalTime	timeout;	/* relative value */	PRIntervalTime	absolute;#ifdef OPT_WINNT	NT_notifier		nt_notifier;	#endif};#define JOB_LINKS_PTR(_qp) \    ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))#define WTHREAD_LINKS_PTR(_qp) \    ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)#define JOIN_NOTIFY(_jobp)								\				PR_BEGIN_MACRO							\				PR_Lock(_jobp->tpool->join_lock);		\				_jobp->join_wait = PR_FALSE;			\				PR_NotifyCondVar(_jobp->join_cv);		\				PR_Unlock(_jobp->tpool->join_lock);		\				PR_END_MACRO#define CANCEL_IO_JOB(jobp)								\				PR_BEGIN_MACRO							\				jobp->cancel_io = PR_FALSE;				\				jobp->on_ioq = PR_FALSE;				\				PR_REMOVE_AND_INIT_LINK(&jobp->links);	\				tp->ioq.cnt--;							\				PR_NotifyCondVar(jobp->cancel_cv);		\				PR_END_MACROstatic void delete_job(PRJob *jobp);static PRThreadPool * alloc_threadpool(void);static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);static void notify_ioq(PRThreadPool *tp);static void notify_timerq(PRThreadPool *tp);/* * locks are acquired in the following order * *	tp->ioq.lock,tp->timerq.lock *			| *			V *		tp->jobq->lock		 *//* * worker thread function */static void wstart(void *arg){PRThreadPool *tp = (PRThreadPool *) arg;PRCList *head;	/*	 * execute jobs until shutdown	 */	while (!tp->shutdown) {		PRJob *jobp;#ifdef OPT_WINNT		BOOL rv;		DWORD unused, shutdown;		LPOVERLAPPED olp;		PR_Lock(tp->jobq.lock);		tp->idle_threads++;		PR_Unlock(tp->jobq.lock);		rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,					&unused, &shutdown, &olp, INFINITE);				PR_ASSERT(rv);		if (shutdown)			break;		jobp = ((NT_notifier *) olp)->jobp;		PR_Lock(tp->jobq.lock);		tp->idle_threads--;		tp->jobq.cnt--;		PR_Unlock(tp->jobq.lock);#else		PR_Lock(tp->jobq.lock);		while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {			tp->idle_threads++;			PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);			tp->idle_threads--;		}			if (tp->shutdown) {			PR_Unlock(tp->jobq.lock);			break;		}		head = PR_LIST_HEAD(&tp->jobq.list);		/*		 * remove job from queue		 */		PR_REMOVE_AND_INIT_LINK(head);		tp->jobq.cnt--;		jobp = JOB_LINKS_PTR(head);		PR_Unlock(tp->jobq.lock);#endif		jobp->job_func(jobp->job_arg);		if (!JOINABLE_JOB(jobp)) {			delete_job(jobp);		} else {			JOIN_NOTIFY(jobp);		}	}	PR_Lock(tp->jobq.lock);	tp->current_threads--;	PR_Unlock(tp->jobq.lock);}/* * add a job to the work queue */static voidadd_to_jobq(PRThreadPool *tp, PRJob *jobp){	/*	 * add to jobq	 */#ifdef OPT_WINNT	PR_Lock(tp->jobq.lock);	tp->jobq.cnt++;	PR_Unlock(tp->jobq.lock);	/*	 * notify worker thread(s)	 */	PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,            FALSE, &jobp->nt_notifier.overlapped);#else	PR_Lock(tp->jobq.lock);	PR_APPEND_LINK(&jobp->links,&tp->jobq.list);	tp->jobq.cnt++;	if ((tp->idle_threads < tp->jobq.cnt) &&					(tp->current_threads < tp->max_threads)) {		wthread *wthrp;		/*		 * increment thread count and unlock the jobq lock		 */		tp->current_threads++;		PR_Unlock(tp->jobq.lock);		/* create new worker thread */		wthrp = PR_NEWZAP(wthread);		if (wthrp) {			wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,						tp, PR_PRIORITY_NORMAL,						PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);			if (NULL == wthrp->thread) {				PR_DELETE(wthrp);  /* this sets wthrp to NULL */			}		}		PR_Lock(tp->jobq.lock);		if (NULL == wthrp) {			tp->current_threads--;		} else {			PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);		}	}	/*	 * wakeup a worker thread	 */	PR_NotifyCondVar(tp->jobq.cv);	PR_Unlock(tp->jobq.lock);#endif}/* * io worker thread function */static void io_wstart(void *arg){PRThreadPool *tp = (PRThreadPool *) arg;int pollfd_cnt, pollfds_used;int rv;PRCList *qp;PRPollDesc *pollfds;PRJob **polljobs;int poll_timeout;PRIntervalTime now;	/*	 * scan io_jobq	 * construct poll list	 * call PR_Poll	 * for all fds, for which poll returns true, move the job to	 * jobq and wakeup worker thread.	 */	while (!tp->shutdown) {		PRJob *jobp;		pollfd_cnt = tp->ioq.cnt + 10;		if (pollfd_cnt > tp->ioq.npollfds) {			/*			 * re-allocate pollfd array if the current one is not large			 * enough			 */			if (NULL != tp->ioq.pollfds)				PR_Free(tp->ioq.pollfds);			tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *						(sizeof(PRPollDesc) + sizeof(PRJob *)));			PR_ASSERT(NULL != tp->ioq.pollfds);			/*			 * array of pollfds			 */			pollfds = tp->ioq.pollfds;			tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);			/*			 * parallel array of jobs			 */			polljobs = tp->ioq.polljobs;			tp->ioq.npollfds = pollfd_cnt;		}		pollfds_used = 0;		/*		 * add the notify fd; used for unblocking io thread(s)		 */		pollfds[pollfds_used].fd = tp->ioq.notify_fd;		pollfds[pollfds_used].in_flags = PR_POLL_READ;		pollfds[pollfds_used].out_flags = 0;		polljobs[pollfds_used] = NULL;		pollfds_used++;		/*		 * fill in the pollfd array		 */		PR_Lock(tp->ioq.lock);		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) {			jobp = JOB_LINKS_PTR(qp);			if (jobp->cancel_io) {				CANCEL_IO_JOB(jobp);				continue;			}			if (pollfds_used == (pollfd_cnt))				break;			pollfds[pollfds_used].fd = jobp->iod->socket;			pollfds[pollfds_used].in_flags = jobp->io_poll_flags;			pollfds[pollfds_used].out_flags = 0;			polljobs[pollfds_used] = jobp;			pollfds_used++;		}		if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {			qp = tp->ioq.list.next;			jobp = JOB_LINKS_PTR(qp);			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)				poll_timeout = PR_INTERVAL_NO_TIMEOUT;			else if (PR_INTERVAL_NO_WAIT == jobp->timeout)				poll_timeout = PR_INTERVAL_NO_WAIT;			else {				poll_timeout = jobp->absolute - PR_IntervalNow();				if (poll_timeout <= 0) /* already timed out */					poll_timeout = PR_INTERVAL_NO_WAIT;			}		} else {			poll_timeout = PR_INTERVAL_NO_TIMEOUT;		}		PR_Unlock(tp->ioq.lock);		/*		 * XXXX		 * should retry if more jobs have been added to the queue?		 *		 */		PR_ASSERT(pollfds_used <= pollfd_cnt);		rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);		if (tp->shutdown) {			break;		}		if (rv > 0) {			/*			 * at least one io event is set			 */			PRStatus rval_status;			PRInt32 index;			PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);			/*			 * reset the pollable event, if notified			 */			if (pollfds[0].out_flags & PR_POLL_READ) {				rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);				PR_ASSERT(PR_SUCCESS == rval_status);			}			for(index = 1; index < (pollfds_used); index++) {                PRInt16 events = pollfds[index].in_flags;                PRInt16 revents = pollfds[index].out_flags;					jobp = polljobs[index];	                if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */                	(revents & PR_POLL_ERR) ||                			((events & PR_POLL_WRITE) &&							(revents & PR_POLL_HUP))) { /* write op & hup */					PR_Lock(tp->ioq.lock);					if (jobp->cancel_io) {						CANCEL_IO_JOB(jobp);						PR_Unlock(tp->ioq.lock);						continue;					}					PR_REMOVE_AND_INIT_LINK(&jobp->links);					tp->ioq.cnt--;					jobp->on_ioq = PR_FALSE;					PR_Unlock(tp->ioq.lock);					/* set error */                    if (PR_POLL_NVAL & revents)						jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;                    else if (PR_POLL_HUP & revents)						jobp->iod->error = PR_CONNECT_RESET_ERROR;                    else 						jobp->iod->error = PR_IO_ERROR;					/*					 * add to jobq					 */					add_to_jobq(tp, jobp);				} else if (revents) {					/*					 * add to jobq					 */					PR_Lock(tp->ioq.lock);					if (jobp->cancel_io) {						CANCEL_IO_JOB(jobp);						PR_Unlock(tp->ioq.lock);						continue;					}					PR_REMOVE_AND_INIT_LINK(&jobp->links);					tp->ioq.cnt--;					jobp->on_ioq = PR_FALSE;					PR_Unlock(tp->ioq.lock);					if (jobp->io_op == JOB_IO_CONNECT) {						if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)							jobp->iod->error = 0;						else							jobp->iod->error = PR_GetError();					} else						jobp->iod->error = 0;					add_to_jobq(tp, jobp);				}			}		}		/*		 * timeout processing		 */		now = PR_IntervalNow();		PR_Lock(tp->ioq.lock);		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) {			jobp = JOB_LINKS_PTR(qp);			if (jobp->cancel_io) {				CANCEL_IO_JOB(jobp);				continue;			}			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)				break;			if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&								((PRInt32)(jobp->absolute - now) > 0))				break;			PR_REMOVE_AND_INIT_LINK(&jobp->links);			tp->ioq.cnt--;			jobp->on_ioq = PR_FALSE;			jobp->iod->error = PR_IO_TIMEOUT_ERROR;			add_to_jobq(tp, jobp);		}		PR_Unlock(tp->ioq.lock);	}}/* * timer worker thread function */static void timer_wstart(void *arg){PRThreadPool *tp = (PRThreadPool *) arg;PRCList *qp;PRIntervalTime timeout;PRIntervalTime now;	/*	 * call PR_WaitCondVar with minimum value of all timeouts	 */	while (!tp->shutdown) {		PRJob *jobp;		PR_Lock(tp->timerq.lock);		if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {			timeout = PR_INTERVAL_NO_TIMEOUT;		} else {			PRCList *qp;			qp = tp->timerq.list.next;			jobp = JOB_LINKS_PTR(qp);			timeout = jobp->absolute - PR_IntervalNow();            if (timeout <= 0)				timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */		}		if (PR_INTERVAL_NO_WAIT != timeout)			PR_WaitCondVar(tp->timerq.cv, timeout);		if (tp->shutdown) {			PR_Unlock(tp->timerq.lock);			break;		}		/*		 * move expired-timer jobs to jobq		 */		now = PR_IntervalNow();			while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {			qp = tp->timerq.list.next;			jobp = JOB_LINKS_PTR(qp);			if ((PRInt32)(jobp->absolute - now) > 0) {				break;			}			/*			 * job timed out			 */			PR_REMOVE_AND_INIT_LINK(&jobp->links);			tp->timerq.cnt--;			jobp->on_timerq = PR_FALSE;			add_to_jobq(tp, jobp);		}		PR_Unlock(tp->timerq.lock);	}}static voiddelete_threadpool(PRThreadPool *tp){	if (NULL != tp) {		if (NULL != tp->shutdown_cv)			PR_DestroyCondVar(tp->shutdown_cv);		if (NULL != tp->jobq.cv)			PR_DestroyCondVar(tp->jobq.cv);		if (NULL != tp->jobq.lock)			PR_DestroyLock(tp->jobq.lock);		if (NULL != tp->join_lock)			PR_DestroyLock(tp->join_lock);#ifdef OPT_WINNT		if (NULL != tp->jobq.nt_completion_port)			CloseHandle(tp->jobq.nt_completion_port);#endif		/* Timer queue */		if (NULL != tp->timerq.cv)			PR_DestroyCondVar(tp->timerq.cv);		if (NULL != tp->timerq.lock)			PR_DestroyLock(tp->timerq.lock);		if (NULL != tp->ioq.lock)			PR_DestroyLock(tp->ioq.lock);		if (NULL != tp->ioq.pollfds)			PR_Free(tp->ioq.pollfds);		if (NULL != tp->ioq.notify_fd)			PR_DestroyPollableEvent(tp->ioq.notify_fd);		PR_Free(tp);	}	return;}static PRThreadPool *alloc_threadpool(void){PRThreadPool *tp;	tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));	if (NULL == tp)		goto failed;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -