📄 threadpool.c
字号:
* Worker picks up persistent jobs first, high priority, med priority,
* then low priority.
* If worker remains idle for more than specified max, the worker
* is released.
* Internal Only.
* Parameters:
* void * arg -> is cast to ThreadPool *
*****************************************************************************/
static void *WorkerThread( void *arg ) {
STATSONLY( time_t start = 0;
)
ThreadPoolJob *job = NULL;
ListNode *head = NULL;
struct timespec timeout;
int retCode = 0;
int persistent = -1;
ThreadPool *tp = ( ThreadPool * ) arg;
assert( tp != NULL );
//Increment total thread count
ithread_mutex_lock( &tp->mutex );
tp->totalThreads++;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
ithread_mutex_lock( &tp->mutex );
ithread_mutex_unlock( &tp->mutex );
SetSeed( );
STATSONLY( time( &start );
);
while( 1 ) {
ithread_mutex_lock( &tp->mutex );
if( job ) {
FreeThreadPoolJob( tp, job );
job = NULL;
}
retCode = 0;
STATSONLY( tp->stats.idleThreads++;
);
STATSONLY( tp->stats.totalWorkTime += ( time( NULL ) - start );
); //work time
STATSONLY( time( &start );
); //idle time
if( persistent == 1 ) {
//Persistent thread
//becomes a regular thread
tp->persistentThreads--;
}
STATSONLY( if( persistent == 0 )
tp->stats.workerThreads--; );
//Check for a job or shutdown
while( ( tp->lowJobQ.size == 0 )
&& ( tp->medJobQ.size == 0 )
&& ( tp->highJobQ.size == 0 )
&& ( !tp->persistentJob )
&& ( !tp->shutdown ) ) {
//If wait timed out
//and we currently have more than the
//min threads, or if we have more than the max threads
// (only possible if the attributes have been reset)
//let this thread die.
if( ( ( retCode == ETIMEDOUT )
&& ( ( tp->totalThreads ) > tp->attr.minThreads ) )
|| ( ( tp->attr.maxThreads != -1 )
&& ( ( tp->totalThreads ) >
tp->attr.maxThreads ) ) ) {
STATSONLY( tp->stats.idleThreads-- );
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
return NULL;
}
SetRelTimeout( &timeout, tp->attr.maxIdleTime );
//wait for a job up to the specified max time
retCode = ithread_cond_timedwait( &tp->condition,
&tp->mutex, &timeout );
}
STATSONLY( tp->stats.idleThreads--;
);
STATSONLY( tp->stats.totalIdleTime += ( time( NULL ) - start );
); //idle time
STATSONLY( time( &start );
); //work time
//bump priority of starved jobs
BumpPriority( tp );
//if shutdown then stop
if( tp->shutdown ) {
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
return NULL;
} else {
//Pick up persistent job if available
if( tp->persistentJob ) {
job = tp->persistentJob;
tp->persistentJob = NULL;
tp->persistentThreads++;
persistent = 1;
ithread_cond_broadcast( &tp->start_and_shutdown );
} else {
STATSONLY( tp->stats.workerThreads++ );
persistent = 0;
//Pick the highest priority job
if( tp->highJobQ.size > 0 ) {
head = ListHead( &tp->highJobQ );
job = ( ThreadPoolJob * ) head->item;
STATSONLY( CalcWaitTime
( tp, HIGH_PRIORITY, job ) );
ListDelNode( &tp->highJobQ, head, 0 );
} else if( tp->medJobQ.size > 0 ) {
head = ListHead( &tp->medJobQ );
job = ( ThreadPoolJob * ) head->item;
STATSONLY( CalcWaitTime( tp, MED_PRIORITY, job ) );
ListDelNode( &tp->medJobQ, head, 0 );
} else if( tp->lowJobQ.size > 0 ) {
head = ListHead( &tp->lowJobQ );
job = ( ThreadPoolJob * ) head->item;
STATSONLY( CalcWaitTime( tp, LOW_PRIORITY, job ) );
ListDelNode( &tp->lowJobQ, head, 0 );
} else {
// Should never get here
assert( 0 );
STATSONLY( tp->stats.workerThreads-- );
tp->totalThreads--;
ithread_cond_broadcast( &tp->start_and_shutdown );
ithread_mutex_unlock( &tp->mutex );
return NULL;
}
}
}
ithread_mutex_unlock( &tp->mutex );
if( SetPriority( job->priority ) != 0 ) {
// In the future can log
// info
} else {
// In the future can log
// info
}
//run the job
job->func( job->arg );
//return to Normal
SetPriority( DEFAULT_PRIORITY );
}
}
/****************************************************************************
* Function: CreateThreadPoolJob
*
* Description:
* Creates a Thread Pool Job. (Dynamically allocated)
* Internal to thread pool.
* Parameters:
* ThreadPoolJob * job - job is copied
* id - id of job
*
* Returns:
* ThreadPoolJob * on success, NULL on failure.
*****************************************************************************/
static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob * job,
int id,
ThreadPool * tp ) {
ThreadPoolJob *newJob = NULL;
assert( job != NULL );
assert( tp != NULL );
newJob = ( ThreadPoolJob * ) FreeListAlloc( &tp->jobFreeList );
if( newJob ) {
( *newJob ) = ( *job );
newJob->jobId = id;
ftime( &newJob->requestTime );
}
return newJob;
}
/****************************************************************************
* Function: CreateWorker
*
* Description:
* Creates a worker thread, if the thread pool
* does not already have max threads.
* Internal to thread pool.
* Parameters:
* ThreadPool *tp
*
* Returns:
* 0 on success, <0 on failure
* EMAXTHREADS if already max threads reached
* EAGAIN if system can not create thread
*
*****************************************************************************/
static int CreateWorker( ThreadPool * tp ) {
ithread_t temp;
int rc = 0;
int currentThreads = tp->totalThreads + 1;
assert( tp != NULL );
if( ( tp->attr.maxThreads != INFINITE_THREADS )
&& ( currentThreads > tp->attr.maxThreads ) ) {
return EMAXTHREADS;
}
rc = ithread_create( &temp, NULL, WorkerThread, tp );
#ifndef _WIN32
_SetSchedulingAndPriority( temp, 0 ); //MTY turn this on?
#endif
if( rc == 0 ) {
rc = ithread_detach( temp );
while( tp->totalThreads < currentThreads ) {
ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
}
}
STATSONLY( if( tp->stats.maxThreads < tp->totalThreads ) {
tp->stats.maxThreads = tp->totalThreads;}
)
return rc;
}
/****************************************************************************
* Function: AddWorker
*
* Description:
* Determines whether or not a thread should be added
* based on the jobsPerThread ratio.
* Adds a thread if appropriate.
* Internal to Thread Pool.
* Parameters:
* ThreadPool* tp
*
*****************************************************************************/
static void AddWorker( ThreadPool * tp ) {
int jobs = 0;
int threads = 0;
assert( tp != NULL );
jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
threads = tp->totalThreads - tp->persistentThreads;
while( ( threads == 0 )
|| ( ( jobs / threads ) > tp->attr.jobsPerThread ) ) {
if( CreateWorker( tp ) != 0 )
return;
threads++;
}
}
/****************************************************************************
* Function: ThreadPoolInit
*
* Description:
* Initializes and starts ThreadPool. Must be called first.
* And only once for ThreadPool.
* Parameters:
* tp - must be valid, non null, pointer to ThreadPool.
* minWorkerThreads - minimum number of worker threads
* thread pool will never have less than this
* number of threads.
* maxWorkerThreads - maximum number of worker threads
* thread pool will never have more than this
* number of threads.
* maxIdleTime - maximum time that a worker thread will spend
* idle. If a worker is idle longer than this
* time and there are more than the min
* number of workers running, than the
* worker thread exits.
* jobsPerThread - ratio of jobs to thread to try and maintain
* if a job is scheduled and the number of jobs per
* thread is greater than this number,and
* if less than the maximum number of
* workers are running then a new thread is
* started to help out with efficiency.
* schedPolicy - scheduling policy to try and set (OS dependent)
* Returns:
* 0 on success, nonzero on failure.
* EAGAIN if not enough system resources to create minimum threads.
* INVALID_POLICY if schedPolicy can't be set
* EMAXTHREADS if minimum threads is greater than maximum threads
*****************************************************************************/
int ThreadPoolInit( ThreadPool * tp,
ThreadPoolAttr * attr ) {
int retCode = 0;
int i = 0;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
retCode += ithread_mutex_init( &tp->mutex, NULL );
assert( retCode == 0 );
retCode += ithread_mutex_lock( &tp->mutex );
assert( retCode == 0 );
retCode += ithread_cond_init( &tp->condition, NULL );
assert( retCode == 0 );
retCode += ithread_cond_init( &tp->start_and_shutdown, NULL );
assert( retCode == 0 );
if( retCode != 0 ) {
return EAGAIN;
}
if( attr ) {
tp->attr = ( *attr );
} else {
TPAttrInit( &tp->attr );
}
if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) {
ithread_mutex_unlock( &tp->mutex );
ithread_mutex_destroy( &tp->mutex );
ithread_cond_destroy( &tp->condition );
ithread_cond_destroy( &tp->start_and_shutdown );
return INVALID_POLICY;
}
retCode += FreeListInit( &tp->jobFreeList, sizeof( ThreadPoolJob ),
JOBFREELISTSIZE );
assert( retCode == 0 );
STATSONLY( StatsInit( &tp->stats ) );
retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL );
assert( retCode == 0 );
if( retCode != 0 ) {
retCode = EAGAIN;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -