📄 threadpool.c
字号:
return retCode; }/**************************************************************************** * Function: ThreadPoolAddPersistent * * Description: * Adds a long term job to the thread pool. * Job will be run as soon as possible. * Call will block until job is scheduled. * Parameters: * tp - valid thread pool pointer * job-> valid ThreadPoolJob pointer with following fields * func - ThreadFunction to run * arg - argument to function. * priority - priority of job. * free_function - function to use when freeing argument * Returns: * 0 on success, nonzero on failure * EOUTOFMEM not enough memory to add job. * EMAXTHREADS not enough threads to add persistent job. *****************************************************************************/ int ThreadPoolAddPersistent( ThreadPool * tp, ThreadPoolJob * job, int *jobId ) { int tempId = -1; ThreadPoolJob *temp = NULL; assert( tp != NULL ); assert( job != NULL ); if( ( tp == NULL ) || ( job == NULL ) ) { return EINVAL; } if( jobId == NULL ) jobId = &tempId; ( *jobId ) = INVALID_JOB_ID; ithread_mutex_lock( &tp->mutex ); assert( ( job->priority == LOW_PRIORITY ) || ( job->priority == MED_PRIORITY ) || ( job->priority == HIGH_PRIORITY ) ); //Create A worker if less than max threads running if( tp->totalThreads < tp->attr.maxThreads ) { CreateWorker( tp ); } else { //if there is more than one worker thread //available then schedule job, otherwise fail if( ( tp->totalThreads - tp->persistentThreads ) - 1 == 0 ) { ithread_mutex_unlock( &tp->mutex ); return EMAXTHREADS; } } temp = CreateThreadPoolJob( job, tp->lastJobId, tp ); if( temp == NULL ) { ithread_mutex_unlock( &tp->mutex ); return EOUTOFMEM; } tp->persistentJob = temp; //Notify a waiting thread ithread_cond_signal( &tp->condition ); //wait until long job has been picked up while( tp->persistentJob != NULL ) { ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex ); } ( *jobId ) = tp->lastJobId++; ithread_mutex_unlock( &tp->mutex ); return 0; }/**************************************************************************** * Function: ThreadPoolAdd * * Description: * Adds a job to the thread pool. * Job will be run as soon as possible. * Parameters: * tp - valid thread pool pointer * func - ThreadFunction to run * arg - argument to function. * priority - priority of job. * jobId - id of job * duration - whether or not this is a persistent thread * free_function - function to use when freeing argument * Returns: * 0 on success, nonzero on failure * EOUTOFMEM if not enough memory to add job. *****************************************************************************/ int ThreadPoolAdd( ThreadPool * tp, ThreadPoolJob * job, int *jobId ) { int rc = EOUTOFMEM; int tempId = -1; ThreadPoolJob *temp = NULL; assert( tp != NULL ); assert( job != NULL ); if( ( tp == NULL ) || ( job == NULL ) ) { return EINVAL; } ithread_mutex_lock( &tp->mutex ); assert( ( job->priority == LOW_PRIORITY ) || ( job->priority == MED_PRIORITY ) || ( job->priority == HIGH_PRIORITY ) ); if( jobId == NULL ) jobId = &tempId; ( *jobId ) = INVALID_JOB_ID; temp = CreateThreadPoolJob( job, tp->lastJobId, tp ); if( temp == NULL ) { ithread_mutex_unlock( &tp->mutex ); return rc; } if( job->priority == HIGH_PRIORITY ) { if( ListAddTail( &tp->highJobQ, temp ) ) rc = 0; } else if( job->priority == MED_PRIORITY ) { if( ListAddTail( &tp->medJobQ, temp ) ) rc = 0; } else { if( ListAddTail( &tp->lowJobQ, temp ) ) rc = 0; } //AddWorker if appropriate AddWorker( tp ); //Notify a waiting thread if( rc == 0 ) { ithread_cond_signal( &tp->condition ); } else { FreeThreadPoolJob( tp, temp ); } ( *jobId ) = tp->lastJobId++; ithread_mutex_unlock( &tp->mutex ); return rc; }/**************************************************************************** * Function: ThreadPoolRemove * * Description: * Removes a job from the thread pool. * Can only remove jobs which are not * currently running. * Parameters: * tp - valid thread pool pointer * jobId - id of job * ThreadPoolJob *out - space for removed job. * Can be null if not needed. * * Returns: * 0 on success. INVALID_JOB_ID on failure. *****************************************************************************/ int ThreadPoolRemove( ThreadPool * tp, int jobId, ThreadPoolJob * out ) { ThreadPoolJob *temp = NULL; int ret = INVALID_JOB_ID; ListNode *tempNode = NULL; ThreadPoolJob dummy; assert( tp != NULL ); if( tp == NULL ) { return EINVAL; } if( out == NULL ) { out = &dummy; } dummy.jobId = jobId; ithread_mutex_lock( &tp->mutex ); tempNode = ListFind( &tp->highJobQ, NULL, &dummy ); if( tempNode ) { temp = ( ThreadPoolJob * ) tempNode->item; ( *out ) = ( *temp ); ListDelNode( &tp->highJobQ, tempNode, 0 ); FreeThreadPoolJob( tp, temp ); ithread_mutex_unlock( &tp->mutex ); return 0; } tempNode = ListFind( &tp->medJobQ, NULL, &dummy ); if( tempNode ) { temp = ( ThreadPoolJob * ) tempNode->item; ( *out ) = ( *temp ); ListDelNode( &tp->medJobQ, tempNode, 0 ); FreeThreadPoolJob( tp, temp ); ithread_mutex_unlock( &tp->mutex ); return 0; } tempNode = ListFind( &tp->lowJobQ, NULL, &dummy ); if( tempNode ) { temp = ( ThreadPoolJob * ) tempNode->item; ( *out ) = ( *temp ); ListDelNode( &tp->lowJobQ, tempNode, 0 ); FreeThreadPoolJob( tp, temp ); ithread_mutex_unlock( &tp->mutex ); return 0; } if( ( tp->persistentJob ) && ( tp->persistentJob->jobId == jobId ) ) { ( *out ) = ( *tp->persistentJob ); FreeThreadPoolJob( tp, tp->persistentJob ); tp->persistentJob = NULL; ithread_mutex_unlock( &tp->mutex ); return 0; } ithread_mutex_unlock( &tp->mutex ); return ret; }/**************************************************************************** * Function: ThreadPoolGetAttr * * Description: * Gets the current set of attributes * associated with the thread pool. * Parameters: * tp - valid thread pool pointer * out - non null pointer to store attributes * Returns: * 0 on success, nonzero on failure * Always returns 0. *****************************************************************************/ int ThreadPoolGetAttr( ThreadPool * tp, ThreadPoolAttr * out ) { assert( tp != NULL ); assert( out != NULL ); if( ( tp == NULL ) || ( out == NULL ) ) { return EINVAL; } if( !tp->shutdown ) { ithread_mutex_lock( &tp->mutex ); } ( *out ) = tp->attr; if( !tp->shutdown ) { ithread_mutex_unlock( &tp->mutex ); } return 0; }/**************************************************************************** * Function: ThreadPoolSetAttr * * Description: * Sets the attributes for the thread pool. * Only affects future calculations. * Parameters: * tp - valid thread pool pointer * attr - pointer to attributes, null sets attributes to default. * Returns: * 0 on success, nonzero on failure * Returns INVALID_POLICY if policy can not be set. *****************************************************************************/ int ThreadPoolSetAttr( ThreadPool * tp, ThreadPoolAttr * attr ) { int retCode = 0; ThreadPoolAttr temp; int i = 0; assert( tp != NULL ); if( tp == NULL ) { return EINVAL; } ithread_mutex_lock( &tp->mutex ); if( attr != NULL ) { temp = ( *attr ); } else { TPAttrInit( &temp ); } if( SetPolicyType( temp.schedPolicy ) != 0 ) { ithread_mutex_unlock( &tp->mutex ); return INVALID_POLICY; } tp->attr = ( temp ); if( tp->totalThreads < tp->attr.minThreads ) //add threads { for( i = tp->totalThreads; i < tp->attr.minThreads; i++ ) { if( ( retCode = CreateWorker( tp ) ) != 0 ) { break; } } } ithread_cond_signal( &tp->condition ); //signal changes ithread_mutex_unlock( &tp->mutex ); if( retCode != 0 ) { //clean up if the min threads could //not be created ThreadPoolShutdown( tp ); } return retCode; }/**************************************************************************** * Function: ThreadPoolShutdown * * Description: * Shuts the thread pool down. * Waits for all threads to finish. * May block indefinitely if jobs do not * exit. * Parameters: * tp - must be valid tp * Returns: * 0 on success, nonzero on failure * Always returns 0. *****************************************************************************/ int ThreadPoolShutdown( ThreadPool * tp ) { ListNode *head = NULL; ThreadPoolJob *temp = NULL; assert( tp != NULL ); if( tp == NULL ) { return EINVAL; } ithread_mutex_lock( &tp->mutex ); //clean up high priority jobs while( tp->highJobQ.size ) { head = ListHead( &tp->highJobQ ); temp = ( ThreadPoolJob * ) head->item; if( temp->free_func ) temp->free_func( temp->arg ); FreeThreadPoolJob( tp, temp ); ListDelNode( &tp->highJobQ, head, 0 ); } ListDestroy( &tp->highJobQ, 0 );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -