📄 prtpool.c
字号:
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- *//* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code is the Netscape Portable Runtime (NSPR). * * The Initial Developer of the Original Code is Netscape * Communications Corporation. Portions created by Netscape are * Copyright (C) 1999-2000 Netscape Communications Corporation. All * Rights Reserved. * * Contributor(s): * * Alternatively, the contents of this file may be used under the * terms of the GNU General Public License Version 2 or later (the * "GPL"), in which case the provisions of the GPL are applicable * instead of those above. If you wish to allow use of your * version of this file only under the terms of the GPL and not to * allow others to use your version of this file under the MPL, * indicate your decision by deleting the provisions above and * replace them with the notice and other provisions required by * the GPL. If you do not delete the provisions above, a recipient * may use your version of this file under either the MPL or the * GPL. */#include "nspr.h"/* * Thread pools * Thread pools create and manage threads to provide support for * scheduling jobs onto one or more threads. * */#ifdef OPT_WINNT#include <windows.h>#endif/* * worker thread */typedef struct wthread { PRCList links; PRThread *thread;} wthread;/* * queue of timer jobs */typedef struct timer_jobq { PRCList list; PRLock *lock; PRCondVar *cv; PRInt32 cnt; PRCList wthreads;} timer_jobq;/* * queue of jobs */typedef struct tp_jobq { PRCList list; PRInt32 cnt; PRLock *lock; PRCondVar *cv; PRCList wthreads;#ifdef OPT_WINNT HANDLE nt_completion_port;#endif} tp_jobq;/* * queue of IO jobs */typedef struct io_jobq { PRCList list; PRPollDesc *pollfds; PRInt32 npollfds; PRJob **polljobs; PRLock *lock; PRInt32 cnt; PRFileDesc *notify_fd; PRCList wthreads;} io_jobq;/* * Threadpool */struct PRThreadPool { PRInt32 init_threads; PRInt32 max_threads; PRInt32 current_threads; PRInt32 idle_threads; PRUint32 stacksize; tp_jobq jobq; io_jobq ioq; timer_jobq timerq; PRLock *join_lock; /* used with jobp->join_cv */ PRCondVar *shutdown_cv; PRBool shutdown;};typedef enum io_op_type { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;#ifdef OPT_WINNTtypedef struct NT_notifier { OVERLAPPED overlapped; /* must be first */ PRJob *jobp;} NT_notifier;#endifstruct PRJob { PRCList links; /* for linking jobs */ PRBool on_ioq; /* job on ioq */ PRBool on_timerq; /* job on timerq */ PRJobFn job_func; void *job_arg; PRCondVar *join_cv; PRBool join_wait; /* == PR_TRUE, when waiting to join */ PRCondVar *cancel_cv; /* for cancelling IO jobs */ PRBool cancel_io; /* for cancelling IO jobs */ PRThreadPool *tpool; /* back pointer to thread pool */ PRJobIoDesc *iod; io_op_type io_op; PRInt16 io_poll_flags; PRNetAddr *netaddr; PRIntervalTime timeout; /* relative value */ PRIntervalTime absolute;#ifdef OPT_WINNT NT_notifier nt_notifier; #endif};#define JOB_LINKS_PTR(_qp) \ ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))#define WTHREAD_LINKS_PTR(_qp) \ ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)#define JOIN_NOTIFY(_jobp) \ PR_BEGIN_MACRO \ PR_Lock(_jobp->tpool->join_lock); \ _jobp->join_wait = PR_FALSE; \ PR_NotifyCondVar(_jobp->join_cv); \ PR_Unlock(_jobp->tpool->join_lock); \ PR_END_MACRO#define CANCEL_IO_JOB(jobp) \ PR_BEGIN_MACRO \ jobp->cancel_io = PR_FALSE; \ jobp->on_ioq = PR_FALSE; \ PR_REMOVE_AND_INIT_LINK(&jobp->links); \ tp->ioq.cnt--; \ PR_NotifyCondVar(jobp->cancel_cv); \ PR_END_MACROstatic void delete_job(PRJob *jobp);static PRThreadPool * alloc_threadpool(void);static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);static void notify_ioq(PRThreadPool *tp);static void notify_timerq(PRThreadPool *tp);/* * locks are acquired in the following order * * tp->ioq.lock,tp->timerq.lock * | * V * tp->jobq->lock *//* * worker thread function */static void wstart(void *arg){PRThreadPool *tp = (PRThreadPool *) arg;PRCList *head; /* * execute jobs until shutdown */ while (!tp->shutdown) { PRJob *jobp;#ifdef OPT_WINNT BOOL rv; DWORD unused, shutdown; LPOVERLAPPED olp; PR_Lock(tp->jobq.lock); tp->idle_threads++; PR_Unlock(tp->jobq.lock); rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, &unused, &shutdown, &olp, INFINITE); PR_ASSERT(rv); if (shutdown) break; jobp = ((NT_notifier *) olp)->jobp; PR_Lock(tp->jobq.lock); tp->idle_threads--; tp->jobq.cnt--; PR_Unlock(tp->jobq.lock);#else PR_Lock(tp->jobq.lock); while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { tp->idle_threads++; PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); tp->idle_threads--; } if (tp->shutdown) { PR_Unlock(tp->jobq.lock); break; } head = PR_LIST_HEAD(&tp->jobq.list); /* * remove job from queue */ PR_REMOVE_AND_INIT_LINK(head); tp->jobq.cnt--; jobp = JOB_LINKS_PTR(head); PR_Unlock(tp->jobq.lock);#endif jobp->job_func(jobp->job_arg); if (!JOINABLE_JOB(jobp)) { delete_job(jobp); } else { JOIN_NOTIFY(jobp); } } PR_Lock(tp->jobq.lock); tp->current_threads--; PR_Unlock(tp->jobq.lock);}/* * add a job to the work queue */static voidadd_to_jobq(PRThreadPool *tp, PRJob *jobp){ /* * add to jobq */#ifdef OPT_WINNT PR_Lock(tp->jobq.lock); tp->jobq.cnt++; PR_Unlock(tp->jobq.lock); /* * notify worker thread(s) */ PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, FALSE, &jobp->nt_notifier.overlapped);#else PR_Lock(tp->jobq.lock); PR_APPEND_LINK(&jobp->links,&tp->jobq.list); tp->jobq.cnt++; if ((tp->idle_threads < tp->jobq.cnt) && (tp->current_threads < tp->max_threads)) { wthread *wthrp; /* * increment thread count and unlock the jobq lock */ tp->current_threads++; PR_Unlock(tp->jobq.lock); /* create new worker thread */ wthrp = PR_NEWZAP(wthread); if (wthrp) { wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); if (NULL == wthrp->thread) { PR_DELETE(wthrp); /* this sets wthrp to NULL */ } } PR_Lock(tp->jobq.lock); if (NULL == wthrp) { tp->current_threads--; } else { PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); } } /* * wakeup a worker thread */ PR_NotifyCondVar(tp->jobq.cv); PR_Unlock(tp->jobq.lock);#endif}/* * io worker thread function */static void io_wstart(void *arg){PRThreadPool *tp = (PRThreadPool *) arg;int pollfd_cnt, pollfds_used;int rv;PRCList *qp;PRPollDesc *pollfds;PRJob **polljobs;int poll_timeout;PRIntervalTime now; /* * scan io_jobq * construct poll list * call PR_Poll * for all fds, for which poll returns true, move the job to * jobq and wakeup worker thread. */ while (!tp->shutdown) { PRJob *jobp; pollfd_cnt = tp->ioq.cnt + 10; if (pollfd_cnt > tp->ioq.npollfds) { /* * re-allocate pollfd array if the current one is not large * enough */ if (NULL != tp->ioq.pollfds) PR_Free(tp->ioq.pollfds); tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * (sizeof(PRPollDesc) + sizeof(PRJob *))); PR_ASSERT(NULL != tp->ioq.pollfds); /* * array of pollfds */ pollfds = tp->ioq.pollfds; tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); /* * parallel array of jobs */ polljobs = tp->ioq.polljobs; tp->ioq.npollfds = pollfd_cnt; } pollfds_used = 0; /* * add the notify fd; used for unblocking io thread(s) */ pollfds[pollfds_used].fd = tp->ioq.notify_fd; pollfds[pollfds_used].in_flags = PR_POLL_READ; pollfds[pollfds_used].out_flags = 0; polljobs[pollfds_used] = NULL; pollfds_used++; /* * fill in the pollfd array */ PR_Lock(tp->ioq.lock); for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) { jobp = JOB_LINKS_PTR(qp); if (jobp->cancel_io) { CANCEL_IO_JOB(jobp); continue; } if (pollfds_used == (pollfd_cnt)) break; pollfds[pollfds_used].fd = jobp->iod->socket; pollfds[pollfds_used].in_flags = jobp->io_poll_flags; pollfds[pollfds_used].out_flags = 0; polljobs[pollfds_used] = jobp; pollfds_used++; } if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { qp = tp->ioq.list.next; jobp = JOB_LINKS_PTR(qp); if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) poll_timeout = PR_INTERVAL_NO_TIMEOUT; else if (PR_INTERVAL_NO_WAIT == jobp->timeout) poll_timeout = PR_INTERVAL_NO_WAIT; else { poll_timeout = jobp->absolute - PR_IntervalNow(); if (poll_timeout <= 0) /* already timed out */ poll_timeout = PR_INTERVAL_NO_WAIT; } } else { poll_timeout = PR_INTERVAL_NO_TIMEOUT; } PR_Unlock(tp->ioq.lock); /* * XXXX * should retry if more jobs have been added to the queue? * */ PR_ASSERT(pollfds_used <= pollfd_cnt); rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); if (tp->shutdown) { break; } if (rv > 0) { /* * at least one io event is set */ PRStatus rval_status; PRInt32 index; PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); /* * reset the pollable event, if notified */ if (pollfds[0].out_flags & PR_POLL_READ) { rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); PR_ASSERT(PR_SUCCESS == rval_status); } for(index = 1; index < (pollfds_used); index++) { PRInt16 events = pollfds[index].in_flags; PRInt16 revents = pollfds[index].out_flags; jobp = polljobs[index]; if ((revents & PR_POLL_NVAL) || /* busted in all cases */ (revents & PR_POLL_ERR) || ((events & PR_POLL_WRITE) && (revents & PR_POLL_HUP))) { /* write op & hup */ PR_Lock(tp->ioq.lock); if (jobp->cancel_io) { CANCEL_IO_JOB(jobp); PR_Unlock(tp->ioq.lock); continue; } PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; jobp->on_ioq = PR_FALSE; PR_Unlock(tp->ioq.lock); /* set error */ if (PR_POLL_NVAL & revents) jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; else if (PR_POLL_HUP & revents) jobp->iod->error = PR_CONNECT_RESET_ERROR; else jobp->iod->error = PR_IO_ERROR; /* * add to jobq */ add_to_jobq(tp, jobp); } else if (revents) { /* * add to jobq */ PR_Lock(tp->ioq.lock); if (jobp->cancel_io) { CANCEL_IO_JOB(jobp); PR_Unlock(tp->ioq.lock); continue; } PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; jobp->on_ioq = PR_FALSE; PR_Unlock(tp->ioq.lock); if (jobp->io_op == JOB_IO_CONNECT) { if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) jobp->iod->error = 0; else jobp->iod->error = PR_GetError(); } else jobp->iod->error = 0; add_to_jobq(tp, jobp); } } } /* * timeout processing */ now = PR_IntervalNow(); PR_Lock(tp->ioq.lock); for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) { jobp = JOB_LINKS_PTR(qp); if (jobp->cancel_io) { CANCEL_IO_JOB(jobp); continue; } if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) break; if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && ((PRInt32)(jobp->absolute - now) > 0)) break; PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; jobp->on_ioq = PR_FALSE; jobp->iod->error = PR_IO_TIMEOUT_ERROR; add_to_jobq(tp, jobp); } PR_Unlock(tp->ioq.lock); }}/* * timer worker thread function */static void timer_wstart(void *arg){PRThreadPool *tp = (PRThreadPool *) arg;PRCList *qp;PRIntervalTime timeout;PRIntervalTime now; /* * call PR_WaitCondVar with minimum value of all timeouts */ while (!tp->shutdown) { PRJob *jobp; PR_Lock(tp->timerq.lock); if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { timeout = PR_INTERVAL_NO_TIMEOUT; } else { PRCList *qp; qp = tp->timerq.list.next; jobp = JOB_LINKS_PTR(qp); timeout = jobp->absolute - PR_IntervalNow(); if (timeout <= 0) timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ } if (PR_INTERVAL_NO_WAIT != timeout) PR_WaitCondVar(tp->timerq.cv, timeout); if (tp->shutdown) { PR_Unlock(tp->timerq.lock); break; } /* * move expired-timer jobs to jobq */ now = PR_IntervalNow(); while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { qp = tp->timerq.list.next; jobp = JOB_LINKS_PTR(qp); if ((PRInt32)(jobp->absolute - now) > 0) { break; } /* * job timed out */ PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->timerq.cnt--; jobp->on_timerq = PR_FALSE; add_to_jobq(tp, jobp); } PR_Unlock(tp->timerq.lock); }}static voiddelete_threadpool(PRThreadPool *tp){ if (NULL != tp) { if (NULL != tp->shutdown_cv) PR_DestroyCondVar(tp->shutdown_cv); if (NULL != tp->jobq.cv) PR_DestroyCondVar(tp->jobq.cv); if (NULL != tp->jobq.lock) PR_DestroyLock(tp->jobq.lock); if (NULL != tp->join_lock) PR_DestroyLock(tp->join_lock);#ifdef OPT_WINNT if (NULL != tp->jobq.nt_completion_port) CloseHandle(tp->jobq.nt_completion_port);#endif /* Timer queue */ if (NULL != tp->timerq.cv) PR_DestroyCondVar(tp->timerq.cv); if (NULL != tp->timerq.lock) PR_DestroyLock(tp->timerq.lock); if (NULL != tp->ioq.lock) PR_DestroyLock(tp->ioq.lock); if (NULL != tp->ioq.pollfds) PR_Free(tp->ioq.pollfds); if (NULL != tp->ioq.notify_fd) PR_DestroyPollableEvent(tp->ioq.notify_fd); PR_Free(tp); } return;}static PRThreadPool *alloc_threadpool(void){PRThreadPool *tp; tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); if (NULL == tp) goto failed;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -