📄 threadpool.c
字号:
} else {
tp->persistentJob = NULL;
tp->lastJobId = 0;
tp->shutdown = 0;
tp->totalThreads = 0;
tp->persistentThreads = 0;
for( i = 0; i < tp->attr.minThreads; i++ ) {
if( ( retCode = CreateWorker( tp ) ) != 0 ) {
break;
}
}
}
ithread_mutex_unlock( &tp->mutex );
if( retCode != 0 ) {
//clean up if the min threads could
//not be created
ThreadPoolShutdown( tp );
}
return retCode;
}
/****************************************************************************
* Function: ThreadPoolAddPersistent
*
* Description:
* Adds a long term job to the thread pool.
* Job will be run as soon as possible.
* Call will block until job is scheduled.
* Parameters:
* tp - valid thread pool pointer
* job-> valid ThreadPoolJob pointer with following fields
* func - ThreadFunction to run
* arg - argument to function.
* priority - priority of job.
* free_function - function to use when freeing argument
* Returns:
* 0 on success, nonzero on failure
* EOUTOFMEM not enough memory to add job.
* EMAXTHREADS not enough threads to add persistent job.
*****************************************************************************/
int ThreadPoolAddPersistent( ThreadPool * tp,
ThreadPoolJob * job,
int *jobId ) {
int tempId = -1;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
assert( job != NULL );
if( ( tp == NULL ) || ( job == NULL ) ) {
return EINVAL;
}
if( jobId == NULL )
jobId = &tempId;
( *jobId ) = INVALID_JOB_ID;
ithread_mutex_lock( &tp->mutex );
assert( ( job->priority == LOW_PRIORITY )
|| ( job->priority == MED_PRIORITY )
|| ( job->priority == HIGH_PRIORITY ) );
//Create A worker if less than max threads running
if( tp->totalThreads < tp->attr.maxThreads ) {
CreateWorker( tp ); // MTY this gets stuck
} else {
//if there is more than one worker thread
//available then schedule job, otherwise fail
if( ( tp->totalThreads - tp->persistentThreads ) - 1 == 0 ) {
ithread_mutex_unlock( &tp->mutex );
return EMAXTHREADS;
}
}
temp = CreateThreadPoolJob( job, tp->lastJobId, tp );
if( temp == NULL ) {
ithread_mutex_unlock( &tp->mutex );
return EOUTOFMEM;
}
tp->persistentJob = temp;
//Notify a waiting thread
ithread_cond_signal( &tp->condition );
//wait until long job has been picked up
while( tp->persistentJob != NULL ) {
ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
}
( *jobId ) = tp->lastJobId++;
ithread_mutex_unlock( &tp->mutex );
return 0;
}
/****************************************************************************
* Function: ThreadPoolAdd
*
* Description:
* Adds a job to the thread pool.
* Job will be run as soon as possible.
* Parameters:
* tp - valid thread pool pointer
* func - ThreadFunction to run
* arg - argument to function.
* priority - priority of job.
* jobId - id of job
* duration - whether or not this is a persistent thread
* free_function - function to use when freeing argument
* Returns:
* 0 on success, nonzero on failure
* EOUTOFMEM if not enough memory to add job.
*****************************************************************************/
int ThreadPoolAdd( ThreadPool * tp,
ThreadPoolJob * job,
int *jobId ) {
int rc = EOUTOFMEM;
int tempId = -1;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
assert( job != NULL );
if( ( tp == NULL ) || ( job == NULL ) ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
assert( ( job->priority == LOW_PRIORITY )
|| ( job->priority == MED_PRIORITY )
|| ( job->priority == HIGH_PRIORITY ) );
if( jobId == NULL )
jobId = &tempId;
( *jobId ) = INVALID_JOB_ID;
temp = CreateThreadPoolJob( job, tp->lastJobId, tp );
if( temp == NULL ) {
ithread_mutex_unlock( &tp->mutex );
return rc;
}
if( job->priority == HIGH_PRIORITY ) {
if( ListAddTail( &tp->highJobQ, temp ) )
rc = 0;
} else if( job->priority == MED_PRIORITY ) {
if( ListAddTail( &tp->medJobQ, temp ) )
rc = 0;
} else {
if( ListAddTail( &tp->lowJobQ, temp ) )
rc = 0;
}
//AddWorker if appropriate
AddWorker( tp );
//Notify a waiting thread
if( rc == 0 ) {
ithread_cond_signal( &tp->condition );
} else {
FreeThreadPoolJob( tp, temp );
}
( *jobId ) = tp->lastJobId++;
ithread_mutex_unlock( &tp->mutex );
return rc;
}
/****************************************************************************
* Function: ThreadPoolRemove
*
* Description:
* Removes a job from the thread pool.
* Can only remove jobs which are not
* currently running.
* Parameters:
* tp - valid thread pool pointer
* jobId - id of job
* ThreadPoolJob *out - space for removed job.
* Can be null if not needed.
*
* Returns:
* 0 on success. INVALID_JOB_ID on failure.
*****************************************************************************/
int ThreadPoolRemove( ThreadPool * tp,
int jobId,
ThreadPoolJob * out ) {
ThreadPoolJob *temp = NULL;
int ret = INVALID_JOB_ID;
ListNode *tempNode = NULL;
ThreadPoolJob dummy;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
if( out == NULL ) {
out = &dummy;
}
dummy.jobId = jobId;
ithread_mutex_lock( &tp->mutex );
tempNode = ListFind( &tp->highJobQ, NULL, &dummy );
if( tempNode ) {
temp = ( ThreadPoolJob * ) tempNode->item;
( *out ) = ( *temp );
ListDelNode( &tp->highJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
tempNode = ListFind( &tp->medJobQ, NULL, &dummy );
if( tempNode ) {
temp = ( ThreadPoolJob * ) tempNode->item;
( *out ) = ( *temp );
ListDelNode( &tp->medJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
tempNode = ListFind( &tp->lowJobQ, NULL, &dummy );
if( tempNode ) {
temp = ( ThreadPoolJob * ) tempNode->item;
( *out ) = ( *temp );
ListDelNode( &tp->lowJobQ, tempNode, 0 );
FreeThreadPoolJob( tp, temp );
ithread_mutex_unlock( &tp->mutex );
return 0;
}
if( ( tp->persistentJob )
&& ( tp->persistentJob->jobId == jobId ) ) {
( *out ) = ( *tp->persistentJob );
FreeThreadPoolJob( tp, tp->persistentJob );
tp->persistentJob = NULL;
ithread_mutex_unlock( &tp->mutex );
return 0;
}
ithread_mutex_unlock( &tp->mutex );
return ret;
}
/****************************************************************************
* Function: ThreadPoolGetAttr
*
* Description:
* Gets the current set of attributes
* associated with the thread pool.
* Parameters:
* tp - valid thread pool pointer
* out - non null pointer to store attributes
* Returns:
* 0 on success, nonzero on failure
* Always returns 0.
*****************************************************************************/
int ThreadPoolGetAttr( ThreadPool * tp,
ThreadPoolAttr * out ) {
assert( tp != NULL );
assert( out != NULL );
if( ( tp == NULL ) || ( out == NULL ) ) {
return EINVAL;
}
if( !tp->shutdown ) {
ithread_mutex_lock( &tp->mutex );
}
( *out ) = tp->attr;
if( !tp->shutdown ) {
ithread_mutex_unlock( &tp->mutex );
}
return 0;
}
/****************************************************************************
* Function: ThreadPoolSetAttr
*
* Description:
* Sets the attributes for the thread pool.
* Only affects future calculations.
* Parameters:
* tp - valid thread pool pointer
* attr - pointer to attributes, null sets attributes to default.
* Returns:
* 0 on success, nonzero on failure
* Returns INVALID_POLICY if policy can not be set.
*****************************************************************************/
int ThreadPoolSetAttr( ThreadPool * tp,
ThreadPoolAttr * attr ) {
int retCode = 0;
ThreadPoolAttr temp;
int i = 0;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
if( attr != NULL ) {
temp = ( *attr );
} else {
TPAttrInit( &temp );
}
if( SetPolicyType( temp.schedPolicy ) != 0 ) {
ithread_mutex_unlock( &tp->mutex );
return INVALID_POLICY;
}
tp->attr = ( temp );
if( tp->totalThreads < tp->attr.minThreads ) //add threads
{
for( i = tp->totalThreads; i < tp->attr.minThreads; i++ ) {
if( ( retCode = CreateWorker( tp ) ) != 0 ) {
break;
}
}
}
ithread_cond_signal( &tp->condition ); //signal changes
ithread_mutex_unlock( &tp->mutex );
if( retCode != 0 ) {
//clean up if the min threads could
//not be created
ThreadPoolShutdown( tp );
}
return retCode;
}
/****************************************************************************
* Function: ThreadPoolShutdown
*
* Description:
* Shuts the thread pool down.
* Waits for all threads to finish.
* May block indefinitely if jobs do not
* exit.
* Parameters:
* tp - must be valid tp
* Returns:
* 0 on success, nonzero on failure
* Always returns 0.
*****************************************************************************/
int ThreadPoolShutdown( ThreadPool * tp ) {
ListNode *head = NULL;
ThreadPoolJob *temp = NULL;
assert( tp != NULL );
if( tp == NULL ) {
return EINVAL;
}
ithread_mutex_lock( &tp->mutex );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -