📄 threadpool.c
字号:
STATSONLY( time( &start ); ); while( 1 ) { ithread_mutex_lock( &tp->mutex ); if( job ) { FreeThreadPoolJob( tp, job ); job = NULL; } retCode = 0; STATSONLY( tp->stats.idleThreads++; ); STATSONLY( tp->stats.totalWorkTime += ( time( NULL ) - start ); ); //work time STATSONLY( time( &start ); ); //idle time if( persistent == 1 ) { //Persistent thread //becomes a regular thread tp->persistentThreads--; } STATSONLY( if( persistent == 0 ) tp->stats.workerThreads--; ); //Check for a job or shutdown while( ( tp->lowJobQ.size == 0 ) && ( tp->medJobQ.size == 0 ) && ( tp->highJobQ.size == 0 ) && ( !tp->persistentJob ) && ( !tp->shutdown ) ) { //If wait timed out //and we currently have more than the //min threads, or if we have more than the max threads // (only possible if the attributes have been reset) //let this thread die. if( ( ( retCode == ETIMEDOUT ) && ( ( tp->totalThreads ) > tp->attr.minThreads ) ) || ( ( tp->attr.maxThreads != -1 ) && ( ( tp->totalThreads ) > tp->attr.maxThreads ) ) ) { STATSONLY( tp->stats.idleThreads-- ); tp->totalThreads--; ithread_cond_broadcast( &tp->start_and_shutdown ); ithread_mutex_unlock( &tp->mutex ); return NULL; } SetRelTimeout( &timeout, tp->attr.maxIdleTime ); //wait for a job up to the specified max time retCode = ithread_cond_timedwait( &tp->condition, &tp->mutex, &timeout ); } STATSONLY( tp->stats.idleThreads--; ); STATSONLY( tp->stats.totalIdleTime += ( time( NULL ) - start ); ); //idle time STATSONLY( time( &start ); ); //work time //bump priority of starved jobs BumpPriority( tp ); //if shutdown then stop if( tp->shutdown ) { tp->totalThreads--; ithread_cond_broadcast( &tp->start_and_shutdown ); ithread_mutex_unlock( &tp->mutex ); return NULL; } else { //Pick up persistent job if available if( tp->persistentJob ) { job = tp->persistentJob; tp->persistentJob = NULL; tp->persistentThreads++; persistent = 1; ithread_cond_broadcast( &tp->start_and_shutdown ); } else { STATSONLY( tp->stats.workerThreads++ ); persistent = 0; //Pick the highest priority job if( tp->highJobQ.size > 0 ) { head = ListHead( &tp->highJobQ ); job = ( ThreadPoolJob * ) head->item; STATSONLY( CalcWaitTime ( tp, HIGH_PRIORITY, job ) ); ListDelNode( &tp->highJobQ, head, 0 ); } else if( tp->medJobQ.size > 0 ) { head = ListHead( &tp->medJobQ ); job = ( ThreadPoolJob * ) head->item; STATSONLY( CalcWaitTime( tp, MED_PRIORITY, job ) ); ListDelNode( &tp->medJobQ, head, 0 ); } else if( tp->lowJobQ.size > 0 ) { head = ListHead( &tp->lowJobQ ); job = ( ThreadPoolJob * ) head->item; STATSONLY( CalcWaitTime( tp, LOW_PRIORITY, job ) ); ListDelNode( &tp->lowJobQ, head, 0 ); } else { // Should never get here assert( 0 ); STATSONLY( tp->stats.workerThreads-- ); tp->totalThreads--; ithread_cond_broadcast( &tp->start_and_shutdown ); ithread_mutex_unlock( &tp->mutex ); return NULL; } } } ithread_mutex_unlock( &tp->mutex ); if( SetPriority( job->priority ) != 0 ) { // In the future can log // info } else { // In the future can log // info } //run the job job->func( job->arg ); //return to Normal SetPriority( DEFAULT_PRIORITY ); } }/**************************************************************************** * Function: CreateThreadPoolJob * * Description: * Creates a Thread Pool Job. (Dynamically allocated) * Internal to thread pool. * Parameters: * ThreadPoolJob * job - job is copied * id - id of job * * Returns: * ThreadPoolJob * on success, NULL on failure. *****************************************************************************/ static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob * job, int id, ThreadPool * tp ) { ThreadPoolJob *newJob = NULL; assert( job != NULL ); assert( tp != NULL ); newJob = ( ThreadPoolJob * ) FreeListAlloc( &tp->jobFreeList ); if( newJob ) { ( *newJob ) = ( *job ); newJob->jobId = id; ftime( &newJob->requestTime ); } return newJob; }/**************************************************************************** * Function: CreateWorker * * Description: * Creates a worker thread, if the thread pool * does not already have max threads. * Internal to thread pool. * Parameters: * ThreadPool *tp * * Returns: * 0 on success, <0 on failure * EMAXTHREADS if already max threads reached * EAGAIN if system can not create thread * *****************************************************************************/ static int CreateWorker( ThreadPool * tp ) { ithread_t temp; int rc = 0; int currentThreads = tp->totalThreads + 1; assert( tp != NULL ); if( ( tp->attr.maxThreads != INFINITE_THREADS ) && ( currentThreads > tp->attr.maxThreads ) ) { return EMAXTHREADS; } rc = ithread_create( &temp, NULL, WorkerThread, tp ); if( rc == 0 ) { rc = ithread_detach( temp ); while( tp->totalThreads < currentThreads ) { ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex ); } } STATSONLY( if( tp->stats.maxThreads < tp->totalThreads ) { tp->stats.maxThreads = tp->totalThreads;} ) return rc; }/**************************************************************************** * Function: AddWorker * * Description: * Determines whether or not a thread should be added * based on the jobsPerThread ratio. * Adds a thread if appropriate. * Internal to Thread Pool. * Parameters: * ThreadPool* tp * *****************************************************************************/ static void AddWorker( ThreadPool * tp ) { int jobs = 0; int threads = 0; assert( tp != NULL ); jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size; threads = tp->totalThreads - tp->persistentThreads; while( ( threads == 0 ) || ( ( jobs / threads ) > tp->attr.jobsPerThread ) ) { if( CreateWorker( tp ) != 0 ) return; threads++; } }/**************************************************************************** * Function: ThreadPoolInit * * Description: * Initializes and starts ThreadPool. Must be called first. * And only once for ThreadPool. * Parameters: * tp - must be valid, non null, pointer to ThreadPool. * minWorkerThreads - minimum number of worker threads * thread pool will never have less than this * number of threads. * maxWorkerThreads - maximum number of worker threads * thread pool will never have more than this * number of threads. * maxIdleTime - maximum time that a worker thread will spend * idle. If a worker is idle longer than this * time and there are more than the min * number of workers running, than the * worker thread exits. * jobsPerThread - ratio of jobs to thread to try and maintain * if a job is scheduled and the number of jobs per * thread is greater than this number,and * if less than the maximum number of * workers are running then a new thread is * started to help out with efficiency. * schedPolicy - scheduling policy to try and set (OS dependent) * Returns: * 0 on success, nonzero on failure. * EAGAIN if not enough system resources to create minimum threads. * INVALID_POLICY if schedPolicy can't be set * EMAXTHREADS if minimum threads is greater than maximum threads *****************************************************************************/ int ThreadPoolInit( ThreadPool * tp, ThreadPoolAttr * attr ) { int retCode = 0; int i = 0; assert( tp != NULL ); if( tp == NULL ) { return EINVAL; } retCode += ithread_mutex_init( &tp->mutex, NULL ); assert( retCode == 0 ); retCode += ithread_mutex_lock( &tp->mutex ); assert( retCode == 0 ); retCode += ithread_cond_init( &tp->condition, NULL ); assert( retCode == 0 ); retCode += ithread_cond_init( &tp->start_and_shutdown, NULL ); assert( retCode == 0 ); if( retCode != 0 ) { return EAGAIN; } if( attr ) { tp->attr = ( *attr ); } else { TPAttrInit( &tp->attr ); } if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) { ithread_mutex_unlock( &tp->mutex ); ithread_mutex_destroy( &tp->mutex ); ithread_cond_destroy( &tp->condition ); ithread_cond_destroy( &tp->start_and_shutdown ); return INVALID_POLICY; } retCode += FreeListInit( &tp->jobFreeList, sizeof( ThreadPoolJob ), JOBFREELISTSIZE ); assert( retCode == 0 ); STATSONLY( StatsInit( &tp->stats ) ); retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL ); assert( retCode == 0 ); retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL ); assert( retCode == 0 ); retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL ); assert( retCode == 0 ); if( retCode != 0 ) { retCode = EAGAIN; } else { tp->persistentJob = NULL; tp->lastJobId = 0; tp->shutdown = 0; tp->totalThreads = 0; tp->persistentThreads = 0; for( i = 0; i < tp->attr.minThreads; i++ ) { if( ( retCode = CreateWorker( tp ) ) != 0 ) { break; } } } ithread_mutex_unlock( &tp->mutex ); if( retCode != 0 ) { //clean up if the min threads could //not be created ThreadPoolShutdown( tp ); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -