📄 prtpool.c
字号:
tp->jobq.lock = PR_NewLock(); if (NULL == tp->jobq.lock) goto failed; tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); if (NULL == tp->jobq.cv) goto failed; tp->join_lock = PR_NewLock(); if (NULL == tp->join_lock) goto failed;#ifdef OPT_WINNT tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (NULL == tp->jobq.nt_completion_port) goto failed;#endif tp->ioq.lock = PR_NewLock(); if (NULL == tp->ioq.lock) goto failed; /* Timer queue */ tp->timerq.lock = PR_NewLock(); if (NULL == tp->timerq.lock) goto failed; tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); if (NULL == tp->timerq.cv) goto failed; tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); if (NULL == tp->shutdown_cv) goto failed; tp->ioq.notify_fd = PR_NewPollableEvent(); if (NULL == tp->ioq.notify_fd) goto failed; return tp;failed: delete_threadpool(tp); PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); return NULL;}/* Create thread pool */PR_IMPLEMENT(PRThreadPool *)PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, PRUint32 stacksize){PRThreadPool *tp;PRThread *thr;int i;wthread *wthrp; tp = alloc_threadpool(); if (NULL == tp) return NULL; tp->init_threads = initial_threads; tp->max_threads = max_threads; tp->stacksize = stacksize; PR_INIT_CLIST(&tp->jobq.list); PR_INIT_CLIST(&tp->ioq.list); PR_INIT_CLIST(&tp->timerq.list); PR_INIT_CLIST(&tp->jobq.wthreads); PR_INIT_CLIST(&tp->ioq.wthreads); PR_INIT_CLIST(&tp->timerq.wthreads); tp->shutdown = PR_FALSE; PR_Lock(tp->jobq.lock); for(i=0; i < initial_threads; ++i) { thr = PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); PR_ASSERT(thr); wthrp = PR_NEWZAP(wthread); PR_ASSERT(wthrp); wthrp->thread = thr; PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); } tp->current_threads = initial_threads; thr = PR_CreateThread(PR_USER_THREAD, io_wstart, tp, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); PR_ASSERT(thr); wthrp = PR_NEWZAP(wthread); PR_ASSERT(wthrp); wthrp->thread = thr; PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, tp, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); PR_ASSERT(thr); wthrp = PR_NEWZAP(wthread); PR_ASSERT(wthrp); wthrp->thread = thr; PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); PR_Unlock(tp->jobq.lock); return tp;}static voiddelete_job(PRJob *jobp){ if (NULL != jobp) { if (NULL != jobp->join_cv) { PR_DestroyCondVar(jobp->join_cv); jobp->join_cv = NULL; } if (NULL != jobp->cancel_cv) { PR_DestroyCondVar(jobp->cancel_cv); jobp->cancel_cv = NULL; } PR_DELETE(jobp); }}static PRJob *alloc_job(PRBool joinable, PRThreadPool *tp){ PRJob *jobp; jobp = PR_NEWZAP(PRJob); if (NULL == jobp) goto failed; if (joinable) { jobp->join_cv = PR_NewCondVar(tp->join_lock); jobp->join_wait = PR_TRUE; if (NULL == jobp->join_cv) goto failed; } else { jobp->join_cv = NULL; }#ifdef OPT_WINNT jobp->nt_notifier.jobp = jobp;#endif return jobp;failed: delete_job(jobp); PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); return NULL;}/* queue a job */PR_IMPLEMENT(PRJob *)PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable){ PRJob *jobp; jobp = alloc_job(joinable, tpool); if (NULL == jobp) return NULL; jobp->job_func = fn; jobp->job_arg = arg; jobp->tpool = tpool; add_to_jobq(tpool, jobp); return jobp;}/* queue a job, when a socket is readable or writeable */static PRJob *queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, PRBool joinable, io_op_type op){ PRJob *jobp; PRIntervalTime now; jobp = alloc_job(joinable, tpool); if (NULL == jobp) { return NULL; } /* * Add a new job to io_jobq * wakeup io worker thread */ jobp->job_func = fn; jobp->job_arg = arg; jobp->tpool = tpool; jobp->iod = iod; if (JOB_IO_READ == op) { jobp->io_op = JOB_IO_READ; jobp->io_poll_flags = PR_POLL_READ; } else if (JOB_IO_WRITE == op) { jobp->io_op = JOB_IO_WRITE; jobp->io_poll_flags = PR_POLL_WRITE; } else if (JOB_IO_ACCEPT == op) { jobp->io_op = JOB_IO_ACCEPT; jobp->io_poll_flags = PR_POLL_READ; } else if (JOB_IO_CONNECT == op) { jobp->io_op = JOB_IO_CONNECT; jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; } else { delete_job(jobp); PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return NULL; } jobp->timeout = iod->timeout; if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || (PR_INTERVAL_NO_WAIT == iod->timeout)) { jobp->absolute = iod->timeout; } else { now = PR_IntervalNow(); jobp->absolute = now + iod->timeout; } PR_Lock(tpool->ioq.lock); if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); } else { PRCList *qp; PRJob *tmp_jobp; /* * insert into the timeout-sorted ioq */ for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; qp = qp->prev) { tmp_jobp = JOB_LINKS_PTR(qp); if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { break; } } PR_INSERT_AFTER(&jobp->links,qp); } jobp->on_ioq = PR_TRUE; tpool->ioq.cnt++; /* * notify io worker thread(s) */ PR_Unlock(tpool->ioq.lock); notify_ioq(tpool); return jobp;}/* queue a job, when a socket is readable */PR_IMPLEMENT(PRJob *)PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, PRBool joinable){ return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));}/* queue a job, when a socket is writeable */PR_IMPLEMENT(PRJob *)PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, PRBool joinable){ return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));}/* queue a job, when a socket has a pending connection */PR_IMPLEMENT(PRJob *)PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, PRBool joinable){ return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));}/* queue a job, when a socket can be connected */PR_IMPLEMENT(PRJob *)PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable){ PRStatus rv; PRErrorCode err; rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ /* connection pending */ return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); } else { /* * connection succeeded or failed; add to jobq right away */ if (rv == PR_FAILURE) iod->error = err; else iod->error = 0; return(PR_QueueJob(tpool, fn, arg, joinable)); }}/* queue a job, when a timer expires */PR_IMPLEMENT(PRJob *)PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, PRJobFn fn, void * arg, PRBool joinable){ PRIntervalTime now; PRJob *jobp; if (PR_INTERVAL_NO_TIMEOUT == timeout) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return NULL; } if (PR_INTERVAL_NO_WAIT == timeout) { /* * no waiting; add to jobq right away */ return(PR_QueueJob(tpool, fn, arg, joinable)); } jobp = alloc_job(joinable, tpool); if (NULL == jobp) { return NULL; } /* * Add a new job to timer_jobq * wakeup timer worker thread */ jobp->job_func = fn; jobp->job_arg = arg; jobp->tpool = tpool; jobp->timeout = timeout; now = PR_IntervalNow(); jobp->absolute = now + timeout; PR_Lock(tpool->timerq.lock); jobp->on_timerq = PR_TRUE; if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); else { PRCList *qp; PRJob *tmp_jobp; /* * insert into the sorted timer jobq */ for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; qp = qp->prev) { tmp_jobp = JOB_LINKS_PTR(qp); if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { break; } } PR_INSERT_AFTER(&jobp->links,qp); } tpool->timerq.cnt++; /* * notify timer worker thread(s) */ notify_timerq(tpool); PR_Unlock(tpool->timerq.lock); return jobp;}static voidnotify_timerq(PRThreadPool *tp){ /* * wakeup the timer thread(s) */ PR_NotifyCondVar(tp->timerq.cv);}static voidnotify_ioq(PRThreadPool *tp){PRStatus rval_status; /* * wakeup the io thread(s) */ rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); PR_ASSERT(PR_SUCCESS == rval_status);}/* * cancel a job * * XXXX: is this needed? likely to be removed */PR_IMPLEMENT(PRStatus)PR_CancelJob(PRJob *jobp) { PRStatus rval = PR_FAILURE; PRThreadPool *tp; if (jobp->on_timerq) { /* * now, check again while holding the timerq lock */ tp = jobp->tpool; PR_Lock(tp->timerq.lock); if (jobp->on_timerq) { jobp->on_timerq = PR_FALSE; PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->timerq.cnt--; PR_Unlock(tp->timerq.lock); if (!JOINABLE_JOB(jobp)) { delete_job(jobp); } else { JOIN_NOTIFY(jobp); } rval = PR_SUCCESS; } else PR_Unlock(tp->timerq.lock); } else if (jobp->on_ioq) { /* * now, check again while holding the ioq lock */ tp = jobp->tpool; PR_Lock(tp->ioq.lock); if (jobp->on_ioq) { jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); if (NULL == jobp->cancel_cv) { PR_Unlock(tp->ioq.lock); PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); return PR_FAILURE; } /* * mark job 'cancelled' and notify io thread(s) * XXXX: * this assumes there is only one io thread; when there * are multiple threads, the io thread processing this job * must be notified. */ jobp->cancel_io = PR_TRUE; PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ notify_ioq(tp); PR_Lock(tp->ioq.lock); while (jobp->cancel_io) PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); PR_Unlock(tp->ioq.lock); PR_ASSERT(!jobp->on_ioq); if (!JOINABLE_JOB(jobp)) { delete_job(jobp); } else { JOIN_NOTIFY(jobp); } rval = PR_SUCCESS; } else PR_Unlock(tp->ioq.lock); } if (PR_FAILURE == rval) PR_SetError(PR_INVALID_STATE_ERROR, 0); return rval;}/* join a job, wait until completion */PR_IMPLEMENT(PRStatus)PR_JoinJob(PRJob *jobp){ if (!JOINABLE_JOB(jobp)) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return PR_FAILURE; } PR_Lock(jobp->tpool->join_lock); while(jobp->join_wait) PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); PR_Unlock(jobp->tpool->join_lock); delete_job(jobp); return PR_SUCCESS;}/* shutdown threadpool */PR_IMPLEMENT(PRStatus)PR_ShutdownThreadPool(PRThreadPool *tpool){PRStatus rval = PR_SUCCESS; PR_Lock(tpool->jobq.lock); tpool->shutdown = PR_TRUE; PR_NotifyAllCondVar(tpool->shutdown_cv); PR_Unlock(tpool->jobq.lock); return rval;}/* * join thread pool * wait for termination of worker threads * reclaim threadpool resources */PR_IMPLEMENT(PRStatus)PR_JoinThreadPool(PRThreadPool *tpool){PRStatus rval = PR_SUCCESS;PRCList *head;PRStatus rval_status; PR_Lock(tpool->jobq.lock); while (!tpool->shutdown) PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); /* * wakeup worker threads */#ifdef OPT_WINNT /* * post shutdown notification for all threads */ { int i; for(i=0; i < tpool->current_threads; i++) { PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, TRUE, NULL); } }#else PR_NotifyAllCondVar(tpool->jobq.cv);#endif /* * wakeup io thread(s) */ notify_ioq(tpool); /* * wakeup timer thread(s) */ PR_Lock(tpool->timerq.lock); notify_timerq(tpool); PR_Unlock(tpool->timerq.lock); while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { wthread *wthrp; head = PR_LIST_HEAD(&tpool->jobq.wthreads); PR_REMOVE_AND_INIT_LINK(head); PR_Unlock(tpool->jobq.lock); wthrp = WTHREAD_LINKS_PTR(head); rval_status = PR_JoinThread(wthrp->thread); PR_ASSERT(PR_SUCCESS == rval_status); PR_DELETE(wthrp); PR_Lock(tpool->jobq.lock); } PR_Unlock(tpool->jobq.lock); while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { wthread *wthrp; head = PR_LIST_HEAD(&tpool->ioq.wthreads); PR_REMOVE_AND_INIT_LINK(head); wthrp = WTHREAD_LINKS_PTR(head); rval_status = PR_JoinThread(wthrp->thread); PR_ASSERT(PR_SUCCESS == rval_status); PR_DELETE(wthrp); } while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { wthread *wthrp; head = PR_LIST_HEAD(&tpool->timerq.wthreads); PR_REMOVE_AND_INIT_LINK(head); wthrp = WTHREAD_LINKS_PTR(head); rval_status = PR_JoinThread(wthrp->thread); PR_ASSERT(PR_SUCCESS == rval_status); PR_DELETE(wthrp); } /* * Delete queued jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { PRJob *jobp; head = PR_LIST_HEAD(&tpool->jobq.list); PR_REMOVE_AND_INIT_LINK(head); jobp = JOB_LINKS_PTR(head); tpool->jobq.cnt--; delete_job(jobp); } /* delete io jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { PRJob *jobp; head = PR_LIST_HEAD(&tpool->ioq.list); PR_REMOVE_AND_INIT_LINK(head); tpool->ioq.cnt--; jobp = JOB_LINKS_PTR(head); delete_job(jobp); } /* delete timer jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { PRJob *jobp; head = PR_LIST_HEAD(&tpool->timerq.list); PR_REMOVE_AND_INIT_LINK(head); tpool->timerq.cnt--; jobp = JOB_LINKS_PTR(head); delete_job(jobp); } PR_ASSERT(0 == tpool->jobq.cnt); PR_ASSERT(0 == tpool->ioq.cnt); PR_ASSERT(0 == tpool->timerq.cnt); delete_threadpool(tpool); return rval;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -