⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prtpool.c

📁 Netscape NSPR库源码
💻 C
📖 第 1 页 / 共 2 页
字号:
	tp->jobq.lock = PR_NewLock();	if (NULL == tp->jobq.lock)		goto failed;	tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);	if (NULL == tp->jobq.cv)		goto failed;	tp->join_lock = PR_NewLock();	if (NULL == tp->join_lock)		goto failed;#ifdef OPT_WINNT	tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,									NULL, 0, 0);	if (NULL == tp->jobq.nt_completion_port)		goto failed;#endif	tp->ioq.lock = PR_NewLock();	if (NULL == tp->ioq.lock)		goto failed;	/* Timer queue */	tp->timerq.lock = PR_NewLock();	if (NULL == tp->timerq.lock)		goto failed;	tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);	if (NULL == tp->timerq.cv)		goto failed;	tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);	if (NULL == tp->shutdown_cv)		goto failed;	tp->ioq.notify_fd = PR_NewPollableEvent();	if (NULL == tp->ioq.notify_fd)		goto failed;	return tp;failed:	delete_threadpool(tp);	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);	return NULL;}/* Create thread pool */PR_IMPLEMENT(PRThreadPool *)PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,                                PRUint32 stacksize){PRThreadPool *tp;PRThread *thr;int i;wthread *wthrp;	tp = alloc_threadpool();	if (NULL == tp)		return NULL;	tp->init_threads = initial_threads;	tp->max_threads = max_threads;	tp->stacksize = stacksize;	PR_INIT_CLIST(&tp->jobq.list);	PR_INIT_CLIST(&tp->ioq.list);	PR_INIT_CLIST(&tp->timerq.list);	PR_INIT_CLIST(&tp->jobq.wthreads);	PR_INIT_CLIST(&tp->ioq.wthreads);	PR_INIT_CLIST(&tp->timerq.wthreads);	tp->shutdown = PR_FALSE;	PR_Lock(tp->jobq.lock);	for(i=0; i < initial_threads; ++i) {		thr = PR_CreateThread(PR_USER_THREAD, wstart,						tp, PR_PRIORITY_NORMAL,						PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);		PR_ASSERT(thr);		wthrp = PR_NEWZAP(wthread);		PR_ASSERT(wthrp);		wthrp->thread = thr;		PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);	}	tp->current_threads = initial_threads;	thr = PR_CreateThread(PR_USER_THREAD, io_wstart,					tp, PR_PRIORITY_NORMAL,					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);	PR_ASSERT(thr);	wthrp = PR_NEWZAP(wthread);	PR_ASSERT(wthrp);	wthrp->thread = thr;	PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);	thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,					tp, PR_PRIORITY_NORMAL,					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);	PR_ASSERT(thr);	wthrp = PR_NEWZAP(wthread);	PR_ASSERT(wthrp);	wthrp->thread = thr;	PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);	PR_Unlock(tp->jobq.lock);	return tp;}static voiddelete_job(PRJob *jobp){	if (NULL != jobp) {		if (NULL != jobp->join_cv) {			PR_DestroyCondVar(jobp->join_cv);			jobp->join_cv = NULL;		}		if (NULL != jobp->cancel_cv) {			PR_DestroyCondVar(jobp->cancel_cv);			jobp->cancel_cv = NULL;		}		PR_DELETE(jobp);	}}static PRJob *alloc_job(PRBool joinable, PRThreadPool *tp){	PRJob *jobp;	jobp = PR_NEWZAP(PRJob);	if (NULL == jobp) 		goto failed;	if (joinable) {		jobp->join_cv = PR_NewCondVar(tp->join_lock);		jobp->join_wait = PR_TRUE;		if (NULL == jobp->join_cv)			goto failed;	} else {		jobp->join_cv = NULL;	}#ifdef OPT_WINNT	jobp->nt_notifier.jobp = jobp;#endif	return jobp;failed:	delete_job(jobp);	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);	return NULL;}/* queue a job */PR_IMPLEMENT(PRJob *)PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable){	PRJob *jobp;	jobp = alloc_job(joinable, tpool);	if (NULL == jobp)		return NULL;	jobp->job_func = fn;	jobp->job_arg = arg;	jobp->tpool = tpool;	add_to_jobq(tpool, jobp);	return jobp;}/* queue a job, when a socket is readable or writeable */static PRJob *queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,				PRBool joinable, io_op_type op){	PRJob *jobp;	PRIntervalTime now;	jobp = alloc_job(joinable, tpool);	if (NULL == jobp) {		return NULL;	}	/*	 * Add a new job to io_jobq	 * wakeup io worker thread	 */	jobp->job_func = fn;	jobp->job_arg = arg;	jobp->tpool = tpool;	jobp->iod = iod;	if (JOB_IO_READ == op) {		jobp->io_op = JOB_IO_READ;		jobp->io_poll_flags = PR_POLL_READ;	} else if (JOB_IO_WRITE == op) {		jobp->io_op = JOB_IO_WRITE;		jobp->io_poll_flags = PR_POLL_WRITE;	} else if (JOB_IO_ACCEPT == op) {		jobp->io_op = JOB_IO_ACCEPT;		jobp->io_poll_flags = PR_POLL_READ;	} else if (JOB_IO_CONNECT == op) {		jobp->io_op = JOB_IO_CONNECT;		jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;	} else {		delete_job(jobp);		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);		return NULL;	}	jobp->timeout = iod->timeout;	if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||			(PR_INTERVAL_NO_WAIT == iod->timeout)) {		jobp->absolute = iod->timeout;	} else {		now = PR_IntervalNow();		jobp->absolute = now + iod->timeout;	}	PR_Lock(tpool->ioq.lock);	if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||			(PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {		PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);	} else if (PR_INTERVAL_NO_WAIT == iod->timeout) {		PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);	} else {		PRCList *qp;		PRJob *tmp_jobp;		/*		 * insert into the timeout-sorted ioq		 */		for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;							qp = qp->prev) {			tmp_jobp = JOB_LINKS_PTR(qp);			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {				break;			}		}		PR_INSERT_AFTER(&jobp->links,qp);	}	jobp->on_ioq = PR_TRUE;	tpool->ioq.cnt++;	/*	 * notify io worker thread(s)	 */	PR_Unlock(tpool->ioq.lock);	notify_ioq(tpool);	return jobp;}/* queue a job, when a socket is readable */PR_IMPLEMENT(PRJob *)PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,											PRBool joinable){	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));}/* queue a job, when a socket is writeable */PR_IMPLEMENT(PRJob *)PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,										PRBool joinable){	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));}/* queue a job, when a socket has a pending connection */PR_IMPLEMENT(PRJob *)PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,								void * arg, PRBool joinable){	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));}/* queue a job, when a socket can be connected */PR_IMPLEMENT(PRJob *)PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,			const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable){	PRStatus rv;	PRErrorCode err;	rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);	if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){		/* connection pending */		return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));	} else {		/*		 * connection succeeded or failed; add to jobq right away		 */		if (rv == PR_FAILURE)			iod->error = err;		else			iod->error = 0;		return(PR_QueueJob(tpool, fn, arg, joinable));	}}/* queue a job, when a timer expires */PR_IMPLEMENT(PRJob *)PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,							PRJobFn fn, void * arg, PRBool joinable){	PRIntervalTime now;	PRJob *jobp;	if (PR_INTERVAL_NO_TIMEOUT == timeout) {		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);		return NULL;	}	if (PR_INTERVAL_NO_WAIT == timeout) {		/*		 * no waiting; add to jobq right away		 */		return(PR_QueueJob(tpool, fn, arg, joinable));	}	jobp = alloc_job(joinable, tpool);	if (NULL == jobp) {		return NULL;	}	/*	 * Add a new job to timer_jobq	 * wakeup timer worker thread	 */	jobp->job_func = fn;	jobp->job_arg = arg;	jobp->tpool = tpool;	jobp->timeout = timeout;	now = PR_IntervalNow();	jobp->absolute = now + timeout;	PR_Lock(tpool->timerq.lock);	jobp->on_timerq = PR_TRUE;	if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))		PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);	else {		PRCList *qp;		PRJob *tmp_jobp;		/*		 * insert into the sorted timer jobq		 */		for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;							qp = qp->prev) {			tmp_jobp = JOB_LINKS_PTR(qp);			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {				break;			}		}		PR_INSERT_AFTER(&jobp->links,qp);	}	tpool->timerq.cnt++;	/*	 * notify timer worker thread(s)	 */	notify_timerq(tpool);	PR_Unlock(tpool->timerq.lock);	return jobp;}static voidnotify_timerq(PRThreadPool *tp){	/*	 * wakeup the timer thread(s)	 */	PR_NotifyCondVar(tp->timerq.cv);}static voidnotify_ioq(PRThreadPool *tp){PRStatus rval_status;	/*	 * wakeup the io thread(s)	 */	rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);	PR_ASSERT(PR_SUCCESS == rval_status);}/* * cancel a job * *	XXXX: is this needed? likely to be removed */PR_IMPLEMENT(PRStatus)PR_CancelJob(PRJob *jobp) {	PRStatus rval = PR_FAILURE;	PRThreadPool *tp;	if (jobp->on_timerq) {		/*		 * now, check again while holding the timerq lock		 */		tp = jobp->tpool;		PR_Lock(tp->timerq.lock);		if (jobp->on_timerq) {			jobp->on_timerq = PR_FALSE;			PR_REMOVE_AND_INIT_LINK(&jobp->links);			tp->timerq.cnt--;			PR_Unlock(tp->timerq.lock);			if (!JOINABLE_JOB(jobp)) {				delete_job(jobp);			} else {				JOIN_NOTIFY(jobp);			}			rval = PR_SUCCESS;		} else			PR_Unlock(tp->timerq.lock);	} else if (jobp->on_ioq) {		/*		 * now, check again while holding the ioq lock		 */		tp = jobp->tpool;		PR_Lock(tp->ioq.lock);		if (jobp->on_ioq) {			jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);			if (NULL == jobp->cancel_cv) {				PR_Unlock(tp->ioq.lock);				PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);				return PR_FAILURE;			}			/*			 * mark job 'cancelled' and notify io thread(s)			 * XXXX:			 *		this assumes there is only one io thread; when there			 * 		are multiple threads, the io thread processing this job			 * 		must be notified.			 */			jobp->cancel_io = PR_TRUE;			PR_Unlock(tp->ioq.lock);	/* release, reacquire ioq lock */			notify_ioq(tp);			PR_Lock(tp->ioq.lock);			while (jobp->cancel_io)				PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);			PR_Unlock(tp->ioq.lock);			PR_ASSERT(!jobp->on_ioq);			if (!JOINABLE_JOB(jobp)) {				delete_job(jobp);			} else {				JOIN_NOTIFY(jobp);			}			rval = PR_SUCCESS;		} else			PR_Unlock(tp->ioq.lock);	}	if (PR_FAILURE == rval)		PR_SetError(PR_INVALID_STATE_ERROR, 0);	return rval;}/* join a job, wait until completion */PR_IMPLEMENT(PRStatus)PR_JoinJob(PRJob *jobp){	if (!JOINABLE_JOB(jobp)) {		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);		return PR_FAILURE;	}	PR_Lock(jobp->tpool->join_lock);	while(jobp->join_wait)		PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);	PR_Unlock(jobp->tpool->join_lock);	delete_job(jobp);	return PR_SUCCESS;}/* shutdown threadpool */PR_IMPLEMENT(PRStatus)PR_ShutdownThreadPool(PRThreadPool *tpool){PRStatus rval = PR_SUCCESS;	PR_Lock(tpool->jobq.lock);	tpool->shutdown = PR_TRUE;	PR_NotifyAllCondVar(tpool->shutdown_cv);	PR_Unlock(tpool->jobq.lock);	return rval;}/* * join thread pool * 	wait for termination of worker threads *	reclaim threadpool resources */PR_IMPLEMENT(PRStatus)PR_JoinThreadPool(PRThreadPool *tpool){PRStatus rval = PR_SUCCESS;PRCList *head;PRStatus rval_status;	PR_Lock(tpool->jobq.lock);	while (!tpool->shutdown)		PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);	/*	 * wakeup worker threads	 */#ifdef OPT_WINNT	/*	 * post shutdown notification for all threads	 */	{		int i;		for(i=0; i < tpool->current_threads; i++) {			PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,												TRUE, NULL);		}	}#else	PR_NotifyAllCondVar(tpool->jobq.cv);#endif	/*	 * wakeup io thread(s)	 */	notify_ioq(tpool);	/*	 * wakeup timer thread(s)	 */	PR_Lock(tpool->timerq.lock);	notify_timerq(tpool);	PR_Unlock(tpool->timerq.lock);	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {		wthread *wthrp;		head = PR_LIST_HEAD(&tpool->jobq.wthreads);		PR_REMOVE_AND_INIT_LINK(head);		PR_Unlock(tpool->jobq.lock);		wthrp = WTHREAD_LINKS_PTR(head);		rval_status = PR_JoinThread(wthrp->thread);		PR_ASSERT(PR_SUCCESS == rval_status);		PR_DELETE(wthrp);		PR_Lock(tpool->jobq.lock);	}	PR_Unlock(tpool->jobq.lock);	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {		wthread *wthrp;		head = PR_LIST_HEAD(&tpool->ioq.wthreads);		PR_REMOVE_AND_INIT_LINK(head);		wthrp = WTHREAD_LINKS_PTR(head);		rval_status = PR_JoinThread(wthrp->thread);		PR_ASSERT(PR_SUCCESS == rval_status);		PR_DELETE(wthrp);	}	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {		wthread *wthrp;		head = PR_LIST_HEAD(&tpool->timerq.wthreads);		PR_REMOVE_AND_INIT_LINK(head);		wthrp = WTHREAD_LINKS_PTR(head);		rval_status = PR_JoinThread(wthrp->thread);		PR_ASSERT(PR_SUCCESS == rval_status);		PR_DELETE(wthrp);	}	/*	 * Delete queued jobs	 */	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {		PRJob *jobp;		head = PR_LIST_HEAD(&tpool->jobq.list);		PR_REMOVE_AND_INIT_LINK(head);		jobp = JOB_LINKS_PTR(head);		tpool->jobq.cnt--;		delete_job(jobp);	}	/* delete io jobs */	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {		PRJob *jobp;		head = PR_LIST_HEAD(&tpool->ioq.list);		PR_REMOVE_AND_INIT_LINK(head);		tpool->ioq.cnt--;		jobp = JOB_LINKS_PTR(head);		delete_job(jobp);	}	/* delete timer jobs */	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {		PRJob *jobp;		head = PR_LIST_HEAD(&tpool->timerq.list);		PR_REMOVE_AND_INIT_LINK(head);		tpool->timerq.cnt--;		jobp = JOB_LINKS_PTR(head);		delete_job(jobp);	}	PR_ASSERT(0 == tpool->jobq.cnt);	PR_ASSERT(0 == tpool->ioq.cnt);	PR_ASSERT(0 == tpool->timerq.cnt);	delete_threadpool(tpool);	return rval;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -