📄 threadpool.c
字号:
} else { // Should never get here assert( 0 ); tp->stats.workerThreads--; tp->totalThreads--; ithread_cond_broadcast( &tp->start_and_shutdown ); ithread_mutex_unlock( &tp->mutex ); return NULL; } } } ithread_mutex_unlock( &tp->mutex ); if( SetPriority( job->priority ) != 0 ) { // In the future can log // info } else { // In the future can log // info } // run the job job->func( job->arg ); // return to Normal SetPriority( DEFAULT_PRIORITY ); }}/**************************************************************************** * Function: CreateThreadPoolJob * * Description: * Creates a Thread Pool Job. (Dynamically allocated) * Internal to thread pool. * Parameters: * ThreadPoolJob *job - job is copied * id - id of job * * Returns: * ThreadPoolJob *on success, NULL on failure. *****************************************************************************/static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob *job, int id, ThreadPool *tp ){ ThreadPoolJob *newJob = NULL; assert( job != NULL ); assert( tp != NULL ); newJob = (ThreadPoolJob *)FreeListAlloc( &tp->jobFreeList ); if( newJob ) { *newJob = *job; newJob->jobId = id; gettimeofday( &newJob->requestTime, NULL ); } return newJob;}/**************************************************************************** * Function: CreateWorker * * Description: * Creates a worker thread, if the thread pool * does not already have max threads. * Internal to thread pool. * Parameters: * ThreadPool *tp * * Returns: * 0 on success, <0 on failure * EMAXTHREADS if already max threads reached * EAGAIN if system can not create thread * *****************************************************************************/static int CreateWorker( ThreadPool *tp ){ ithread_t temp; int rc = 0; int currentThreads = tp->totalThreads + 1; assert( tp != NULL ); if ( tp->attr.maxThreads != INFINITE_THREADS && currentThreads > tp->attr.maxThreads ) { return EMAXTHREADS; } rc = ithread_create( &temp, NULL, WorkerThread, tp ); if( rc == 0 ) { rc = ithread_detach( temp ); while( tp->totalThreads < currentThreads ) { ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex ); } } if( tp->stats.maxThreads < tp->totalThreads ) { tp->stats.maxThreads = tp->totalThreads; } return rc;}/**************************************************************************** * Function: AddWorker * * Description: * Determines whether or not a thread should be added * based on the jobsPerThread ratio. * Adds a thread if appropriate. * Internal to Thread Pool. * Parameters: * ThreadPool* tp * *****************************************************************************/static void AddWorker( ThreadPool *tp ){ int jobs = 0; int threads = 0; assert( tp != NULL ); jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size; threads = tp->totalThreads - tp->persistentThreads; while( threads == 0 || (jobs / threads) >= tp->attr.jobsPerThread ) { if( CreateWorker( tp ) != 0 ) { return; } threads++; }}/**************************************************************************** * Function: ThreadPoolInit * * Description: * Initializes and starts ThreadPool. Must be called first. * And only once for ThreadPool. * Parameters: * tp - must be valid, non null, pointer to ThreadPool. * minWorkerThreads - minimum number of worker threads * thread pool will never have less than this * number of threads. * maxWorkerThreads - maximum number of worker threads * thread pool will never have more than this * number of threads. * maxIdleTime - maximum time that a worker thread will spend * idle. If a worker is idle longer than this * time and there are more than the min * number of workers running, than the * worker thread exits. * jobsPerThread - ratio of jobs to thread to try and maintain * if a job is scheduled and the number of jobs per * thread is greater than this number,and * if less than the maximum number of * workers are running then a new thread is * started to help out with efficiency. * schedPolicy - scheduling policy to try and set (OS dependent) * Returns: * 0 on success, nonzero on failure. * EAGAIN if not enough system resources to create minimum threads. * INVALID_POLICY if schedPolicy can't be set * EMAXTHREADS if minimum threads is greater than maximum threads *****************************************************************************/int ThreadPoolInit( ThreadPool *tp, ThreadPoolAttr *attr ){ int retCode = 0; int i = 0; assert( tp != NULL ); if( tp == NULL ) { return EINVAL; }#ifdef WIN32#ifdef PTW32_STATIC_LIB pthread_win32_process_attach_np();#endif#endif retCode += ithread_mutex_init( &tp->mutex, NULL ); assert( retCode == 0 ); retCode += ithread_mutex_lock( &tp->mutex ); assert( retCode == 0 ); retCode += ithread_cond_init( &tp->condition, NULL ); assert( retCode == 0 ); retCode += ithread_cond_init( &tp->start_and_shutdown, NULL ); assert( retCode == 0 ); if( retCode != 0 ) { return EAGAIN; } if( attr ) { tp->attr = ( *attr ); } else { TPAttrInit( &tp->attr ); } if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) { ithread_mutex_unlock( &tp->mutex ); ithread_mutex_destroy( &tp->mutex ); ithread_cond_destroy( &tp->condition ); ithread_cond_destroy( &tp->start_and_shutdown ); return INVALID_POLICY; } retCode += FreeListInit( &tp->jobFreeList, sizeof( ThreadPoolJob ), JOBFREELISTSIZE ); assert( retCode == 0 ); StatsInit( &tp->stats ); retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL ); assert( retCode == 0 ); retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL ); assert( retCode == 0 ); retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL ); assert( retCode == 0 ); if( retCode != 0 ) { retCode = EAGAIN; } else { tp->persistentJob = NULL; tp->lastJobId = 0; tp->shutdown = 0; tp->totalThreads = 0; tp->persistentThreads = 0; for( i = 0; i < tp->attr.minThreads; ++i ) { if( ( retCode = CreateWorker( tp ) ) != 0 ) { break; } } } ithread_mutex_unlock( &tp->mutex ); if( retCode != 0 ) { // clean up if the min threads could not be created ThreadPoolShutdown( tp ); } return retCode;}/**************************************************************************** * Function: ThreadPoolAddPersistent * * Description: * Adds a long term job to the thread pool. * Job will be run as soon as possible. * Call will block until job is scheduled. * Parameters: * tp - valid thread pool pointer * job-> valid ThreadPoolJob pointer with following fields * func - ThreadFunction to run * arg - argument to function. * priority - priority of job. * free_function - function to use when freeing argument * Returns: * 0 on success, nonzero on failure * EOUTOFMEM not enough memory to add job. * EMAXTHREADS not enough threads to add persistent job. *****************************************************************************/int ThreadPoolAddPersistent( ThreadPool *tp, ThreadPoolJob *job, int *jobId ){ int tempId = -1; ThreadPoolJob *temp = NULL; assert( tp != NULL ); assert( job != NULL ); if( ( tp == NULL ) || ( job == NULL ) ) { return EINVAL; } if( jobId == NULL ) { jobId = &tempId; } *jobId = INVALID_JOB_ID; ithread_mutex_lock( &tp->mutex ); assert( job->priority == LOW_PRIORITY || job->priority == MED_PRIORITY || job->priority == HIGH_PRIORITY ); // Create A worker if less than max threads running if( tp->totalThreads < tp->attr.maxThreads ) { CreateWorker( tp ); } else { // if there is more than one worker thread // available then schedule job, otherwise fail if( tp->totalThreads - tp->persistentThreads - 1 == 0 ) { ithread_mutex_unlock( &tp->mutex ); return EMAXTHREADS; } } temp = CreateThreadPoolJob( job, tp->lastJobId, tp ); if( temp == NULL ) { ithread_mutex_unlock( &tp->mutex ); return EOUTOFMEM; } tp->persistentJob = temp; // Notify a waiting thread ithread_cond_signal( &tp->condition ); // wait until long job has been picked up while( tp->persistentJob != NULL ) { ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex ); } *jobId = tp->lastJobId++; ithread_mutex_unlock( &tp->mutex ); return 0;}/**************************************************************************** * Function: ThreadPoolAdd * * Description: * Adds a job to the thread pool. * Job will be run as soon as possible. * Parameters: * tp - valid thread pool pointer * func - ThreadFunction to run * arg - argument to function. * priority - priority of job. * jobId - id of job * duration - whether or not this is a persistent thread * free_function - function to use when freeing argument * Returns: * 0 on success, nonzero on failure * EOUTOFMEM if not enough memory to add job. *****************************************************************************/int ThreadPoolAdd( ThreadPool *tp, ThreadPoolJob *job, int *jobId ){ int rc = EOUTOFMEM; int tempId = -1; int totalJobs; ThreadPoolJob *temp = NULL; assert( tp != NULL ); assert( job != NULL ); if( ( tp == NULL ) || ( job == NULL ) ) { return EINVAL; } ithread_mutex_lock( &tp->mutex ); assert( job->priority == LOW_PRIORITY || job->priority == MED_PRIORITY || job->priority == HIGH_PRIORITY ); totalJobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size; if (totalJobs >= tp->attr.maxJobsTotal) { fprintf(stderr, "total jobs = %d, too many jobs", totalJobs); ithread_mutex_unlock( &tp->mutex ); return rc; } if( jobId == NULL ) { jobId = &tempId; } *jobId = INVALID_JOB_ID; temp = CreateThreadPoolJob( job, tp->lastJobId, tp ); if( temp == NULL ) { ithread_mutex_unlock( &tp->mutex ); return rc; } if( job->priority == HIGH_PRIORITY ) { if( ListAddTail( &tp->highJobQ, temp ) ) { rc = 0; } } else if( job->priority == MED_PRIORITY ) { if( ListAddTail( &tp->medJobQ, temp ) ) { rc = 0; } } else { if( ListAddTail( &tp->lowJobQ, temp ) ) { rc = 0; } } // AddWorker if appropriate AddWorker( tp ); // Notify a waiting thread if( rc == 0 ) { ithread_cond_signal( &tp->condition ); } else { FreeThreadPoolJob( tp, temp ); } *jobId = tp->lastJobId++; ithread_mutex_unlock( &tp->mutex ); return rc;}/**************************************************************************** * Function: ThreadPoolRemove * * Description: * Removes a job from the thread pool. * Can only remove jobs which are not * currently running. * Parameters: * tp - valid thread pool pointer * jobId - id of job * ThreadPoolJob *out - space for removed job. * Can be null if not needed. * * Returns: * 0 on success. INVALID_JOB_ID on failure. *****************************************************************************/int ThreadPoolRemove( ThreadPool *tp, int jobId, ThreadPoolJob *out ){ ThreadPoolJob *temp = NULL; int ret = INVALID_JOB_ID; ListNode *tempNode = NULL; ThreadPoolJob dummy; assert( tp != NULL ); if( tp == NULL ) { return EINVAL; } if( out == NULL ) { out = &dummy; } dummy.jobId = jobId; ithread_mutex_lock( &tp->mutex ); tempNode = ListFind( &tp->highJobQ, NULL, &dummy ); if( tempNode ) { temp = (ThreadPoolJob *)tempNode->item; *out = *temp; ListDelNode( &tp->highJobQ, tempNode, 0 ); FreeThreadPoolJob( tp, temp ); ithread_mutex_unlock( &tp->mutex ); return 0; } tempNode = ListFind( &tp->medJobQ, NULL, &dummy ); if( tempNode ) { temp = (ThreadPoolJob *)tempNode->item; *out = *temp; ListDelNode( &tp->medJobQ, tempNode, 0 ); FreeThreadPoolJob( tp, temp ); ithread_mutex_unlock( &tp->mutex ); return 0; } tempNode = ListFind( &tp->lowJobQ, NULL, &dummy ); if( tempNode ) { temp = (ThreadPoolJob *)tempNode->item; *out = *temp; ListDelNode( &tp->lowJobQ, tempNode, 0 ); FreeThreadPoolJob( tp, temp ); ithread_mutex_unlock( &tp->mutex ); return 0; } if( tp->persistentJob && tp->persistentJob->jobId == jobId ) { *out = *tp->persistentJob; FreeThreadPoolJob( tp, tp->persistentJob ); tp->persistentJob = NULL; ithread_mutex_unlock( &tp->mutex ); return 0; } ithread_mutex_unlock( &tp->mutex ); return ret;}/**************************************************************************** * Function: ThreadPoolGetAttr * * Description: * Gets the current set of attributes * associated with the thread pool. * Parameters: * tp - valid thread pool pointer * out - non null pointer to store attributes * Returns: * 0 on success, nonzero on failure * Always returns 0. *****************************************************************************/int ThreadPoolGetAttr( ThreadPool *tp, ThreadPoolAttr *out ){ assert( tp != NULL ); assert( out != NULL ); if( tp == NULL || out == NULL ) { return EINVAL; } if( !tp->shutdown ) { ithread_mutex_lock( &tp->mutex ); } *out = tp->attr; if( !tp->shutdown ) { ithread_mutex_unlock( &tp->mutex ); } return 0;}/**************************************************************************** * Function: ThreadPoolSetAttr * * Description: * Sets the attributes for the thread pool. * Only affects future calculations. * Parameters: * tp - valid thread pool pointer * attr - pointer to attributes, null sets attributes to default. * Returns: * 0 on success, nonzero on failure * Returns INVALID_POLICY if policy can not be set. *****************************************************************************/int ThreadPoolSetAttr( ThreadPool *tp, ThreadPoolAttr *attr ){ int retCode = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -