⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.c

📁 基于LINUX/UNIX的UPN库,是智能家具的用的底层库.
💻 C
📖 第 1 页 / 共 3 页
字号:
				} else {					// Should never get here					assert( 0 );					tp->stats.workerThreads--;					tp->totalThreads--;					ithread_cond_broadcast( &tp->start_and_shutdown );					ithread_mutex_unlock( &tp->mutex );					return NULL;				}			}		}		ithread_mutex_unlock( &tp->mutex );		if( SetPriority( job->priority ) != 0 ) {			// In the future can log			// info		} else {			// In the future can log			// info		}		// run the job		job->func( job->arg );		// return to Normal		SetPriority( DEFAULT_PRIORITY );	}}/**************************************************************************** * Function: CreateThreadPoolJob * *  Description: *      Creates a Thread Pool Job. (Dynamically allocated) *      Internal to thread pool. *  Parameters: *      ThreadPoolJob *job - job is copied *      id - id of job * *  Returns: *      ThreadPoolJob *on success, NULL on failure. *****************************************************************************/static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob *job, int id, ThreadPool *tp ){	ThreadPoolJob *newJob = NULL;	assert( job != NULL );	assert( tp != NULL );	newJob = (ThreadPoolJob *)FreeListAlloc( &tp->jobFreeList );	if( newJob ) {		*newJob = *job;		newJob->jobId = id;		gettimeofday( &newJob->requestTime, NULL );	}	return newJob;}/**************************************************************************** * Function: CreateWorker * *  Description: *      Creates a worker thread, if the thread pool *      does not already have max threads. *      Internal to thread pool. *  Parameters: *      ThreadPool *tp * *  Returns: *      0 on success, <0 on failure *      EMAXTHREADS if already max threads reached *      EAGAIN if system can not create thread * *****************************************************************************/static int CreateWorker( ThreadPool *tp ){	ithread_t temp;	int rc = 0;	int currentThreads = tp->totalThreads + 1;	assert( tp != NULL );	if ( tp->attr.maxThreads != INFINITE_THREADS &&	     currentThreads > tp->attr.maxThreads ) {		return EMAXTHREADS;	}	rc = ithread_create( &temp, NULL, WorkerThread, tp );	if( rc == 0 ) {		rc = ithread_detach( temp );		while( tp->totalThreads < currentThreads ) {			ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );		}	}	if( tp->stats.maxThreads < tp->totalThreads ) {		tp->stats.maxThreads = tp->totalThreads;	}	return rc;}/**************************************************************************** * Function: AddWorker * *  Description: *      Determines whether or not a thread should be added *      based on the jobsPerThread ratio. *      Adds a thread if appropriate. *      Internal to Thread Pool. *  Parameters: *      ThreadPool* tp * *****************************************************************************/static void AddWorker( ThreadPool *tp ){	int jobs = 0;	int threads = 0;	assert( tp != NULL );	jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;	threads = tp->totalThreads - tp->persistentThreads;	while( threads == 0 || (jobs / threads) >= tp->attr.jobsPerThread ) {		if( CreateWorker( tp ) != 0 ) {			return;		}		threads++;	}}/**************************************************************************** * Function: ThreadPoolInit * *  Description: *      Initializes and starts ThreadPool. Must be called first. *      And only once for ThreadPool. *  Parameters: *      tp  - must be valid, non null, pointer to ThreadPool. *      minWorkerThreads - minimum number of worker threads *                         thread pool will never have less than this *                         number of threads. *      maxWorkerThreads - maximum number of worker threads *                         thread pool will never have more than this *                         number of threads. *      maxIdleTime      - maximum time that a worker thread will spend *                         idle. If a worker is idle longer than this *                         time and there are more than the min *                         number of workers running, than the *                         worker thread exits. *      jobsPerThread    - ratio of jobs to thread to try and maintain *                         if a job is scheduled and the number of jobs per *                         thread is greater than this number,and *                         if less than the maximum number of *                         workers are running then a new thread is *                         started to help out with efficiency. *      schedPolicy      - scheduling policy to try and set (OS dependent) *  Returns: *      0 on success, nonzero on failure. *      EAGAIN if not enough system resources to create minimum threads. *      INVALID_POLICY if schedPolicy can't be set *      EMAXTHREADS if minimum threads is greater than maximum threads *****************************************************************************/int ThreadPoolInit( ThreadPool *tp, ThreadPoolAttr *attr ){	int retCode = 0;	int i = 0;	assert( tp != NULL );	if( tp == NULL ) {		return EINVAL;	}#ifdef WIN32#ifdef PTW32_STATIC_LIB	pthread_win32_process_attach_np();#endif#endif	retCode += ithread_mutex_init( &tp->mutex, NULL );	assert( retCode == 0 );	retCode += ithread_mutex_lock( &tp->mutex );	assert( retCode == 0 );	retCode += ithread_cond_init( &tp->condition, NULL );	assert( retCode == 0 );	retCode += ithread_cond_init( &tp->start_and_shutdown, NULL );	assert( retCode == 0 );	if( retCode != 0 ) {		return EAGAIN;	}	if( attr ) {		tp->attr = ( *attr );	} else {		TPAttrInit( &tp->attr );	}	if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) {		ithread_mutex_unlock( &tp->mutex );		ithread_mutex_destroy( &tp->mutex );		ithread_cond_destroy( &tp->condition );		ithread_cond_destroy( &tp->start_and_shutdown );		return INVALID_POLICY;	}	retCode += FreeListInit(		&tp->jobFreeList, sizeof( ThreadPoolJob ), JOBFREELISTSIZE );	assert( retCode == 0 );	StatsInit( &tp->stats );	retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL );	assert( retCode == 0 );	retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL );	assert( retCode == 0 );	retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL );	assert( retCode == 0 );	if( retCode != 0 ) {		retCode = EAGAIN;	} else {		tp->persistentJob = NULL;		tp->lastJobId = 0;		tp->shutdown = 0;		tp->totalThreads = 0;		tp->persistentThreads = 0;		for( i = 0; i < tp->attr.minThreads; ++i ) {			if( ( retCode = CreateWorker( tp ) ) != 0 ) {				break;			}		}	}	ithread_mutex_unlock( &tp->mutex );	if( retCode != 0 ) {		// clean up if the min threads could not be created		ThreadPoolShutdown( tp );	}	return retCode;}/**************************************************************************** * Function: ThreadPoolAddPersistent * *  Description: *      Adds a long term job to the thread pool. *      Job will be run as soon as possible. *      Call will block until job is scheduled. *  Parameters: *      tp - valid thread pool pointer *      job-> valid ThreadPoolJob pointer with following fields *          func - ThreadFunction to run *          arg - argument to function. *          priority - priority of job. *          free_function - function to use when freeing argument *  Returns: *      0 on success, nonzero on failure *      EOUTOFMEM not enough memory to add job. *      EMAXTHREADS not enough threads to add persistent job. *****************************************************************************/int ThreadPoolAddPersistent( ThreadPool *tp, ThreadPoolJob *job, int *jobId ){	int tempId = -1;	ThreadPoolJob *temp = NULL;	assert( tp != NULL );	assert( job != NULL );	if( ( tp == NULL ) || ( job == NULL ) ) {		return EINVAL;	}	if( jobId == NULL ) {		jobId = &tempId;	}	*jobId = INVALID_JOB_ID;	ithread_mutex_lock( &tp->mutex );	assert( job->priority == LOW_PRIORITY ||	        job->priority == MED_PRIORITY ||	        job->priority == HIGH_PRIORITY );	// Create A worker if less than max threads running	if( tp->totalThreads < tp->attr.maxThreads ) {		CreateWorker( tp );	} else {		// if there is more than one worker thread		// available then schedule job, otherwise fail		if( tp->totalThreads - tp->persistentThreads - 1 == 0 ) {			ithread_mutex_unlock( &tp->mutex );			return EMAXTHREADS;		}	}	temp = CreateThreadPoolJob( job, tp->lastJobId, tp );	if( temp == NULL ) {		ithread_mutex_unlock( &tp->mutex );		return EOUTOFMEM;	}	tp->persistentJob = temp;	// Notify a waiting thread	ithread_cond_signal( &tp->condition );	// wait until long job has been picked up	while( tp->persistentJob != NULL ) {		ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );	}	*jobId = tp->lastJobId++;	ithread_mutex_unlock( &tp->mutex );	return 0;}/**************************************************************************** * Function: ThreadPoolAdd * *  Description: *      Adds a job to the thread pool. *      Job will be run as soon as possible. *  Parameters: *      tp - valid thread pool pointer *      func - ThreadFunction to run *      arg - argument to function. *      priority - priority of job. *      jobId - id of job *      duration - whether or not this is a persistent thread *      free_function - function to use when freeing argument *  Returns: *      0 on success, nonzero on failure *      EOUTOFMEM if not enough memory to add job. *****************************************************************************/int ThreadPoolAdd( ThreadPool *tp, ThreadPoolJob *job, int *jobId ){	int rc = EOUTOFMEM;	int tempId = -1;	int totalJobs;	ThreadPoolJob *temp = NULL;	assert( tp != NULL );	assert( job != NULL );	if( ( tp == NULL ) || ( job == NULL ) ) {		return EINVAL;	}	ithread_mutex_lock( &tp->mutex );	assert( job->priority == LOW_PRIORITY ||	job->priority == MED_PRIORITY ||	job->priority == HIGH_PRIORITY );	totalJobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;	if (totalJobs >= tp->attr.maxJobsTotal) {		fprintf(stderr, "total jobs = %d, too many jobs", totalJobs);		ithread_mutex_unlock( &tp->mutex );		return rc;	}	if( jobId == NULL ) {		jobId = &tempId;	}	*jobId = INVALID_JOB_ID;	temp = CreateThreadPoolJob( job, tp->lastJobId, tp );	if( temp == NULL ) {		ithread_mutex_unlock( &tp->mutex );		return rc;	}	if( job->priority == HIGH_PRIORITY ) {		if( ListAddTail( &tp->highJobQ, temp ) ) {			rc = 0;		}	} else if( job->priority == MED_PRIORITY ) {		if( ListAddTail( &tp->medJobQ, temp ) ) {			rc = 0;		}	} else {		if( ListAddTail( &tp->lowJobQ, temp ) ) {			rc = 0;		}	}	// AddWorker if appropriate	AddWorker( tp );	// Notify a waiting thread	if( rc == 0 ) {		ithread_cond_signal( &tp->condition );	} else {		FreeThreadPoolJob( tp, temp );	}	*jobId = tp->lastJobId++;	ithread_mutex_unlock( &tp->mutex );	return rc;}/**************************************************************************** * Function: ThreadPoolRemove * *  Description: *      Removes a job from the thread pool. *      Can only remove jobs which are not *      currently running. *  Parameters: *      tp - valid thread pool pointer *      jobId - id of job *      ThreadPoolJob *out - space for removed job. *                           Can be null if not needed. * *  Returns: *      0 on success. INVALID_JOB_ID on failure. *****************************************************************************/int ThreadPoolRemove( ThreadPool *tp, int jobId, ThreadPoolJob *out ){	ThreadPoolJob *temp = NULL;	int ret = INVALID_JOB_ID;	ListNode *tempNode = NULL;	ThreadPoolJob dummy;	assert( tp != NULL );	if( tp == NULL ) {		return EINVAL;	}	if( out == NULL ) {		out = &dummy;	}	dummy.jobId = jobId;	ithread_mutex_lock( &tp->mutex );	tempNode = ListFind( &tp->highJobQ, NULL, &dummy );	if( tempNode ) {		temp = (ThreadPoolJob *)tempNode->item;		*out = *temp;		ListDelNode( &tp->highJobQ, tempNode, 0 );		FreeThreadPoolJob( tp, temp );		ithread_mutex_unlock( &tp->mutex );		return 0;	}	tempNode = ListFind( &tp->medJobQ, NULL, &dummy );	if( tempNode ) {		temp = (ThreadPoolJob *)tempNode->item;		*out = *temp;		ListDelNode( &tp->medJobQ, tempNode, 0 );		FreeThreadPoolJob( tp, temp );		ithread_mutex_unlock( &tp->mutex );		return 0;	}	tempNode = ListFind( &tp->lowJobQ, NULL, &dummy );	if( tempNode ) {		temp = (ThreadPoolJob *)tempNode->item;		*out = *temp;		ListDelNode( &tp->lowJobQ, tempNode, 0 );		FreeThreadPoolJob( tp, temp );		ithread_mutex_unlock( &tp->mutex );		return 0;	}	if( tp->persistentJob && tp->persistentJob->jobId == jobId ) {		*out = *tp->persistentJob;		FreeThreadPoolJob( tp, tp->persistentJob );		tp->persistentJob = NULL;		ithread_mutex_unlock( &tp->mutex );		return 0;	}	ithread_mutex_unlock( &tp->mutex );	return ret;}/**************************************************************************** * Function: ThreadPoolGetAttr * *  Description: *      Gets the current set of attributes *      associated with the thread pool. *  Parameters: *      tp - valid thread pool pointer *      out - non null pointer to store attributes *  Returns: *      0 on success, nonzero on failure *      Always returns 0. *****************************************************************************/int ThreadPoolGetAttr( ThreadPool *tp, ThreadPoolAttr *out ){	assert( tp != NULL );	assert( out != NULL );	if( tp == NULL || out == NULL ) {		return EINVAL;	}	if( !tp->shutdown ) {		ithread_mutex_lock( &tp->mutex );	}	*out = tp->attr;	if( !tp->shutdown ) {		ithread_mutex_unlock( &tp->mutex );	}	return 0;}/**************************************************************************** * Function: ThreadPoolSetAttr * *  Description: *      Sets the attributes for the thread pool. *      Only affects future calculations. *  Parameters: *      tp - valid thread pool pointer *      attr - pointer to attributes, null sets attributes to default. *  Returns: *      0 on success, nonzero on failure *      Returns INVALID_POLICY if policy can not be set. *****************************************************************************/int ThreadPoolSetAttr( ThreadPool *tp, ThreadPoolAttr *attr ){	int retCode = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -