⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.c

📁 基于LINUX/UNIX的UPN库,是智能家具的用的底层库.
💻 C
📖 第 1 页 / 共 3 页
字号:
/////////////////////////////////////////////////////////////////////////////// Copyright (c) 2000-2003 Intel Corporation // All rights reserved. //// Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: //// * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // * Neither name of Intel Corporation nor the names of its contributors // may be used to endorse or promote products derived from this software // without specific prior written permission.// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY // OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./////////////////////////////////////////////////////////////////////////////#include "ThreadPool.h"#include "FreeList.h"#include <assert.h>#include <stdlib.h>#include <stdio.h>/**************************************************************************** * Function: DiffMillis * *  Description: *      Returns the difference in milliseconds between two *      timeval structures. *      Internal Only. *  Parameters: *      struct timeval *time1, *      struct timeval *time2, *  Returns: *       the difference in milliseconds, time1-time2. *****************************************************************************/static unsigned long DiffMillis( struct timeval *time1, struct timeval *time2 ){	double temp = 0;	assert( time1 != NULL );	assert( time2 != NULL );	temp = time1->tv_sec - time2->tv_sec;	/* convert to milliseconds */	temp *= 1000;	/* convert microseconds to milliseconds and add to temp */	/* implicit flooring of unsigned long data type */	temp += (time1->tv_usec - time2->tv_usec) / 1000;	return temp;}#ifdef STATS/**************************************************************************** * Function: StatsInit * *  Description: *      Initializes the statistics structure. *      Internal Only. *  Parameters: *      ThreadPoolStats *stats must be valid non null stats structure *****************************************************************************/static void StatsInit( ThreadPoolStats *stats ){	assert( stats != NULL );	stats->totalIdleTime = 0;	stats->totalJobsHQ = 0;	stats->totalJobsLQ = 0;	stats->totalJobsMQ = 0;	stats->totalTimeHQ = 0;	stats->totalTimeMQ = 0;	stats->totalTimeLQ = 0;	stats->totalWorkTime = 0;	stats->totalIdleTime = 0;	stats->avgWaitHQ = 0;	stats->avgWaitMQ = 0;	stats->avgWaitLQ = 0;	stats->workerThreads = 0;	stats->idleThreads = 0;	stats->persistentThreads = 0;	stats->maxThreads = 0; stats->totalThreads = 0;}static void StatsAccountLQ( ThreadPool *tp, unsigned long diffTime ){	tp->stats.totalJobsLQ++;	tp->stats.totalTimeLQ += diffTime;}static void StatsAccountMQ( ThreadPool *tp, unsigned long diffTime ){	tp->stats.totalJobsMQ++;	tp->stats.totalTimeMQ += diffTime;}static void StatsAccountHQ( ThreadPool *tp, unsigned long diffTime ){	tp->stats.totalJobsHQ++;	tp->stats.totalTimeHQ += diffTime;}/**************************************************************************** * Function: CalcWaitTime * *  Description: *      Calculates the time the job has been waiting at the specified *      priority. Adds to the totalTime and totalJobs kept in the *      thread pool statistics structure. *      Internal Only. * *  Parameters: *      ThreadPool *tp *      ThreadPriority p *      ThreadPoolJob *job *****************************************************************************/static void CalcWaitTime( ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job ){	struct timeval now;	unsigned long diff;	assert( tp != NULL );	assert( job != NULL );	gettimeofday( &now, NULL );	diff = DiffMillis( &now, &job->requestTime );	switch ( p ) {	case LOW_PRIORITY:		StatsAccountLQ( tp, diff );		break;	case MED_PRIORITY:		StatsAccountMQ( tp, diff );		break;	case HIGH_PRIORITY:		StatsAccountHQ( tp, diff );		break;	default:		assert( 0 );	}}static time_t StatsTime( time_t *t ){	struct timeval tv;	gettimeofday( &tv, NULL );	if (t) {		*t = tv.tv_sec;	}	return tv.tv_sec;}#else /* STATS */static UPNP_INLINE void StatsInit( ThreadPoolStats *stats ) {}static UPNP_INLINE void StatsAccountLQ( ThreadPool *tp, unsigned long diffTime ) {}static UPNP_INLINE void StatsAccountMQ( ThreadPool *tp, unsigned long diffTime ) {}static UPNP_INLINE void StatsAccountHQ( ThreadPool *tp, unsigned long diffTime ) {}static UPNP_INLINE void CalcWaitTime( ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job ) {}static UPNP_INLINE time_t StatsTime( time_t *t ) { return 0; }#endif /* STATS *//**************************************************************************** * Function: CmpThreadPoolJob * *  Description: *      Compares thread pool jobs. *  Parameters: *      void * - job A *      void * - job B *****************************************************************************/static int CmpThreadPoolJob( void *jobA, void *jobB ){	ThreadPoolJob *a = ( ThreadPoolJob *) jobA;	ThreadPoolJob *b = ( ThreadPoolJob *) jobB;	assert( jobA != NULL );	assert( jobB != NULL );	return ( a->jobId == b->jobId );}/**************************************************************************** * Function: FreeThreadPoolJob * *  Description: *      Deallocates a dynamically allocated ThreadPoolJob. *  Parameters: *      ThreadPoolJob *tpj - must be allocated with CreateThreadPoolJob *****************************************************************************/static void FreeThreadPoolJob( ThreadPool *tp, ThreadPoolJob *tpj ){	assert( tp != NULL );	FreeListFree( &tp->jobFreeList, tpj );}/**************************************************************************** * Function: SetPolicyType * *  Description: *      Sets the scheduling policy of the current process. *      Internal only. *  Parameters: *      PolocyType in *  Returns: *      0 on success, nonzero on failure *      Returns result of GetLastError() on failure. * *****************************************************************************/static int SetPolicyType( PolicyType in ){#ifdef __CYGWIN__	/* TODO not currently working... */	return 0;#elif defined(__OSX__) || defined(__APPLE__)	setpriority( PRIO_PROCESS, 0, 0 );	return 0;#elif defined(WIN32)	return sched_setscheduler( 0, in );#elif defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0	struct sched_param current;	sched_getparam( 0, &current );	current.sched_priority = DEFAULT_SCHED_PARAM;	return sched_setscheduler( 0, in, &current );#else	return 0;#endif}/**************************************************************************** * Function: SetPriority * *  Description: *      Sets the priority of the currently running thread. *      Internal only. *  Parameters: *      ThreadPriority priority *  Returns: *      0 on success, nonzero on failure *      EINVAL invalid priority *      Returns result of GerLastError on failure. * *****************************************************************************/static int SetPriority( ThreadPriority priority ){#if defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0	int currentPolicy;	int minPriority = 0;	int maxPriority = 0;	int actPriority = 0;	int midPriority = 0;	struct sched_param newPriority;	pthread_getschedparam( ithread_self(), &currentPolicy, &newPriority );	minPriority = sched_get_priority_min( currentPolicy );	maxPriority = sched_get_priority_max( currentPolicy );	midPriority = ( maxPriority - minPriority ) / 2;	switch ( priority ) {	case LOW_PRIORITY:		actPriority = minPriority;		break;	case MED_PRIORITY:		actPriority = midPriority;		break;	case HIGH_PRIORITY:		actPriority = maxPriority;		break;	default:		return EINVAL;	};	newPriority.sched_priority = actPriority;	return pthread_setschedparam(ithread_self(), currentPolicy, &newPriority );#else	return 0;#endif}/**************************************************************************** * Function: BumpPriority * *  Description: *      Determines whether any jobs *      need to be bumped to a higher priority Q and bumps them. * *      tp->mutex must be locked. *      Internal Only. *  Parameters: *      ThreadPool *tp *****************************************************************************/static void BumpPriority( ThreadPool *tp ){    int done = 0;    struct timeval now;    unsigned long diffTime = 0;    ThreadPoolJob *tempJob = NULL;    assert( tp != NULL );    gettimeofday(&now, NULL);	    while( !done ) {        if( tp->medJobQ.size ) {            tempJob = ( ThreadPoolJob *) tp->medJobQ.head.next->item;            diffTime = DiffMillis( &now, &tempJob->requestTime );            if( diffTime >= ( tp->attr.starvationTime ) ) {                // If job has waited longer than the starvation time                // bump priority (add to higher priority Q)                StatsAccountMQ( tp, diffTime );                ListDelNode( &tp->medJobQ, tp->medJobQ.head.next, 0 );                ListAddTail( &tp->highJobQ, tempJob );                continue;            }        }        if( tp->lowJobQ.size ) {            tempJob = ( ThreadPoolJob *) tp->lowJobQ.head.next->item;            diffTime = DiffMillis( &now, &tempJob->requestTime );            if( diffTime >= ( tp->attr.maxIdleTime ) ) {                // If job has waited longer than the starvation time                // bump priority (add to higher priority Q)                StatsAccountLQ( tp, diffTime );                ListDelNode( &tp->lowJobQ, tp->lowJobQ.head.next, 0 );                ListAddTail( &tp->medJobQ, tempJob );                continue;            }        }        done = 1;    }}/**************************************************************************** * Function: SetRelTimeout * *  Description: *      Sets the fields of the *      passed in timespec to be relMillis milliseconds in the future. *      Internal Only. *  Parameters: *      struct timespec *time *      int relMillis - milliseconds in the future *****************************************************************************/static void SetRelTimeout( struct timespec *time, int relMillis ){	struct timeval now;	int sec = relMillis / 1000;	int milliSeconds = relMillis % 1000;	assert( time != NULL );	gettimeofday( &now, NULL );	time->tv_sec = now.tv_sec + sec;	time->tv_nsec = ( (now.tv_usec/1000) + milliSeconds ) * 1000000;}/**************************************************************************** * Function: SetSeed * *  Description: *      Sets seed for random number generator. *      Each thread sets the seed random number generator. *      Internal Only. *  Parameters: *       *****************************************************************************/static void SetSeed(){	struct timeval t;  	gettimeofday(&t, NULL);#if defined(WIN32) 	srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id().p );#elif defined(__FreeBSD__) || defined(__OSX__) || defined(__APPLE__) 	srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id() );#elif defined(__linux__) || defined(__sun) 	srand( ( unsigned int )t.tv_usec + ithread_get_current_thread_id() );#else	{		volatile union { volatile pthread_t tid; volatile unsigned i; } idu;		idu.tid = ithread_get_current_thread_id();		srand( ( unsigned int )t.millitm + idu.i );	}#endif}/**************************************************************************** * Function: WorkerThread * *  Description: *      Implements a thread pool worker. *      Worker waits for a job to become available. *      Worker picks up persistent jobs first, high priority, med priority, *             then low priority. *      If worker remains idle for more than specified max, the worker *      is released. *      Internal Only. *  Parameters: *      void * arg -> is cast to ThreadPool * *****************************************************************************/static void *WorkerThread( void *arg ){	time_t start = 0;	ThreadPoolJob *job = NULL;	ListNode *head = NULL;	struct timespec timeout;	int retCode = 0;	int persistent = -1;	ThreadPool *tp = ( ThreadPool *) arg;	// allow static linking#ifdef WIN32#ifdef PTW32_STATIC_LIB	pthread_win32_thread_attach_np();#endif#endif	assert( tp != NULL );	// Increment total thread count	ithread_mutex_lock( &tp->mutex );	tp->totalThreads++;	ithread_cond_broadcast( &tp->start_and_shutdown );	ithread_mutex_unlock( &tp->mutex );	SetSeed();	StatsTime( &start );	while( 1 ) {		ithread_mutex_lock( &tp->mutex );		if( job ) {			FreeThreadPoolJob( tp, job );			job = NULL;		}		retCode = 0;		tp->stats.idleThreads++;		tp->stats.totalWorkTime += ( StatsTime( NULL ) - start ); // work time		StatsTime( &start ); // idle time		if( persistent == 1 ) {			// Persistent thread			// becomes a regular thread			tp->persistentThreads--;		}		if( persistent == 0 ) {			tp->stats.workerThreads--;		}		// Check for a job or shutdown		while( tp->lowJobQ.size  == 0 &&		       tp->medJobQ.size  == 0 &&		       tp->highJobQ.size == 0 &&		       !tp->persistentJob     &&		       !tp->shutdown ) {			// If wait timed out			// and we currently have more than the			// min threads, or if we have more than the max threads			// (only possible if the attributes have been reset)			// let this thread die.			if( ( retCode == ETIMEDOUT &&			      tp->totalThreads > tp->attr.minThreads ) ||			    ( tp->attr.maxThreads != -1 &&			      tp->totalThreads > tp->attr.maxThreads ) ) {				tp->stats.idleThreads--;				tp->totalThreads--;				ithread_cond_broadcast( &tp->start_and_shutdown );				ithread_mutex_unlock( &tp->mutex );#ifdef WIN32#ifdef PTW32_STATIC_LIB				// allow static linking				pthread_win32_thread_detach_np ();#endif#endif				return NULL;			}			SetRelTimeout( &timeout, tp->attr.maxIdleTime );			// wait for a job up to the specified max time			retCode = ithread_cond_timedwait(				&tp->condition, &tp->mutex, &timeout );		}		tp->stats.idleThreads--;		tp->stats.totalIdleTime += ( StatsTime( NULL ) - start ); // idle time		StatsTime( &start ); // work time		// bump priority of starved jobs		BumpPriority( tp );		// if shutdown then stop		if( tp->shutdown ) {			tp->totalThreads--;			ithread_cond_broadcast( &tp->start_and_shutdown );			ithread_mutex_unlock( &tp->mutex );#ifdef WIN32#ifdef PTW32_STATIC_LIB			// allow static linking			pthread_win32_thread_detach_np ();#endif#endif			return NULL;		} else {			// Pick up persistent job if available			if( tp->persistentJob ) {				job = tp->persistentJob;				tp->persistentJob = NULL;				tp->persistentThreads++;				persistent = 1;				ithread_cond_broadcast( &tp->start_and_shutdown );			} else {				tp->stats.workerThreads++;				persistent = 0;				// Pick the highest priority job				if( tp->highJobQ.size > 0 ) {					head = ListHead( &tp->highJobQ );					job = ( ThreadPoolJob *) head->item;					CalcWaitTime( tp, HIGH_PRIORITY, job );					ListDelNode( &tp->highJobQ, head, 0 );				} else if( tp->medJobQ.size > 0 ) {					head = ListHead( &tp->medJobQ );					job = ( ThreadPoolJob *) head->item;					CalcWaitTime( tp, MED_PRIORITY, job );					ListDelNode( &tp->medJobQ, head, 0 );				} else if( tp->lowJobQ.size > 0 ) {					head = ListHead( &tp->lowJobQ );					job = ( ThreadPoolJob *) head->item;					CalcWaitTime( tp, LOW_PRIORITY, job );					ListDelNode( &tp->lowJobQ, head, 0 );

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -