📄 threadpool.c
字号:
/////////////////////////////////////////////////////////////////////////////// Copyright (c) 2000-2003 Intel Corporation // All rights reserved. //// Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: //// * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // * Neither name of Intel Corporation nor the names of its contributors // may be used to endorse or promote products derived from this software // without specific prior written permission.// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY // OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./////////////////////////////////////////////////////////////////////////////#include "ThreadPool.h"#include "FreeList.h"#include <assert.h>#include <stdlib.h>#include <stdio.h>/**************************************************************************** * Function: DiffMillis * * Description: * Returns the difference in milliseconds between two * timeval structures. * Internal Only. * Parameters: * struct timeval *time1, * struct timeval *time2, * Returns: * the difference in milliseconds, time1-time2. *****************************************************************************/static unsigned long DiffMillis( struct timeval *time1, struct timeval *time2 ){ double temp = 0; assert( time1 != NULL ); assert( time2 != NULL ); temp = time1->tv_sec - time2->tv_sec; /* convert to milliseconds */ temp *= 1000; /* convert microseconds to milliseconds and add to temp */ /* implicit flooring of unsigned long data type */ temp += (time1->tv_usec - time2->tv_usec) / 1000; return temp;}#ifdef STATS/**************************************************************************** * Function: StatsInit * * Description: * Initializes the statistics structure. * Internal Only. * Parameters: * ThreadPoolStats *stats must be valid non null stats structure *****************************************************************************/static void StatsInit( ThreadPoolStats *stats ){ assert( stats != NULL ); stats->totalIdleTime = 0; stats->totalJobsHQ = 0; stats->totalJobsLQ = 0; stats->totalJobsMQ = 0; stats->totalTimeHQ = 0; stats->totalTimeMQ = 0; stats->totalTimeLQ = 0; stats->totalWorkTime = 0; stats->totalIdleTime = 0; stats->avgWaitHQ = 0; stats->avgWaitMQ = 0; stats->avgWaitLQ = 0; stats->workerThreads = 0; stats->idleThreads = 0; stats->persistentThreads = 0; stats->maxThreads = 0; stats->totalThreads = 0;}static void StatsAccountLQ( ThreadPool *tp, unsigned long diffTime ){ tp->stats.totalJobsLQ++; tp->stats.totalTimeLQ += diffTime;}static void StatsAccountMQ( ThreadPool *tp, unsigned long diffTime ){ tp->stats.totalJobsMQ++; tp->stats.totalTimeMQ += diffTime;}static void StatsAccountHQ( ThreadPool *tp, unsigned long diffTime ){ tp->stats.totalJobsHQ++; tp->stats.totalTimeHQ += diffTime;}/**************************************************************************** * Function: CalcWaitTime * * Description: * Calculates the time the job has been waiting at the specified * priority. Adds to the totalTime and totalJobs kept in the * thread pool statistics structure. * Internal Only. * * Parameters: * ThreadPool *tp * ThreadPriority p * ThreadPoolJob *job *****************************************************************************/static void CalcWaitTime( ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job ){ struct timeval now; unsigned long diff; assert( tp != NULL ); assert( job != NULL ); gettimeofday( &now, NULL ); diff = DiffMillis( &now, &job->requestTime ); switch ( p ) { case LOW_PRIORITY: StatsAccountLQ( tp, diff ); break; case MED_PRIORITY: StatsAccountMQ( tp, diff ); break; case HIGH_PRIORITY: StatsAccountHQ( tp, diff ); break; default: assert( 0 ); }}static time_t StatsTime( time_t *t ){ struct timeval tv; gettimeofday( &tv, NULL ); if (t) { *t = tv.tv_sec; } return tv.tv_sec;}#else /* STATS */static UPNP_INLINE void StatsInit( ThreadPoolStats *stats ) {}static UPNP_INLINE void StatsAccountLQ( ThreadPool *tp, unsigned long diffTime ) {}static UPNP_INLINE void StatsAccountMQ( ThreadPool *tp, unsigned long diffTime ) {}static UPNP_INLINE void StatsAccountHQ( ThreadPool *tp, unsigned long diffTime ) {}static UPNP_INLINE void CalcWaitTime( ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job ) {}static UPNP_INLINE time_t StatsTime( time_t *t ) { return 0; }#endif /* STATS *//**************************************************************************** * Function: CmpThreadPoolJob * * Description: * Compares thread pool jobs. * Parameters: * void * - job A * void * - job B *****************************************************************************/static int CmpThreadPoolJob( void *jobA, void *jobB ){ ThreadPoolJob *a = ( ThreadPoolJob *) jobA; ThreadPoolJob *b = ( ThreadPoolJob *) jobB; assert( jobA != NULL ); assert( jobB != NULL ); return ( a->jobId == b->jobId );}/**************************************************************************** * Function: FreeThreadPoolJob * * Description: * Deallocates a dynamically allocated ThreadPoolJob. * Parameters: * ThreadPoolJob *tpj - must be allocated with CreateThreadPoolJob *****************************************************************************/static void FreeThreadPoolJob( ThreadPool *tp, ThreadPoolJob *tpj ){ assert( tp != NULL ); FreeListFree( &tp->jobFreeList, tpj );}/**************************************************************************** * Function: SetPolicyType * * Description: * Sets the scheduling policy of the current process. * Internal only. * Parameters: * PolocyType in * Returns: * 0 on success, nonzero on failure * Returns result of GetLastError() on failure. * *****************************************************************************/static int SetPolicyType( PolicyType in ){#ifdef __CYGWIN__ /* TODO not currently working... */ return 0;#elif defined(__OSX__) || defined(__APPLE__) setpriority( PRIO_PROCESS, 0, 0 ); return 0;#elif defined(WIN32) return sched_setscheduler( 0, in );#elif defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0 struct sched_param current; sched_getparam( 0, ¤t ); current.sched_priority = DEFAULT_SCHED_PARAM; return sched_setscheduler( 0, in, ¤t );#else return 0;#endif}/**************************************************************************** * Function: SetPriority * * Description: * Sets the priority of the currently running thread. * Internal only. * Parameters: * ThreadPriority priority * Returns: * 0 on success, nonzero on failure * EINVAL invalid priority * Returns result of GerLastError on failure. * *****************************************************************************/static int SetPriority( ThreadPriority priority ){#if defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0 int currentPolicy; int minPriority = 0; int maxPriority = 0; int actPriority = 0; int midPriority = 0; struct sched_param newPriority; pthread_getschedparam( ithread_self(), ¤tPolicy, &newPriority ); minPriority = sched_get_priority_min( currentPolicy ); maxPriority = sched_get_priority_max( currentPolicy ); midPriority = ( maxPriority - minPriority ) / 2; switch ( priority ) { case LOW_PRIORITY: actPriority = minPriority; break; case MED_PRIORITY: actPriority = midPriority; break; case HIGH_PRIORITY: actPriority = maxPriority; break; default: return EINVAL; }; newPriority.sched_priority = actPriority; return pthread_setschedparam(ithread_self(), currentPolicy, &newPriority );#else return 0;#endif}/**************************************************************************** * Function: BumpPriority * * Description: * Determines whether any jobs * need to be bumped to a higher priority Q and bumps them. * * tp->mutex must be locked. * Internal Only. * Parameters: * ThreadPool *tp *****************************************************************************/static void BumpPriority( ThreadPool *tp ){ int done = 0; struct timeval now; unsigned long diffTime = 0; ThreadPoolJob *tempJob = NULL; assert( tp != NULL ); gettimeofday(&now, NULL); while( !done ) { if( tp->medJobQ.size ) { tempJob = ( ThreadPoolJob *) tp->medJobQ.head.next->item; diffTime = DiffMillis( &now, &tempJob->requestTime ); if( diffTime >= ( tp->attr.starvationTime ) ) { // If job has waited longer than the starvation time // bump priority (add to higher priority Q) StatsAccountMQ( tp, diffTime ); ListDelNode( &tp->medJobQ, tp->medJobQ.head.next, 0 ); ListAddTail( &tp->highJobQ, tempJob ); continue; } } if( tp->lowJobQ.size ) { tempJob = ( ThreadPoolJob *) tp->lowJobQ.head.next->item; diffTime = DiffMillis( &now, &tempJob->requestTime ); if( diffTime >= ( tp->attr.maxIdleTime ) ) { // If job has waited longer than the starvation time // bump priority (add to higher priority Q) StatsAccountLQ( tp, diffTime ); ListDelNode( &tp->lowJobQ, tp->lowJobQ.head.next, 0 ); ListAddTail( &tp->medJobQ, tempJob ); continue; } } done = 1; }}/**************************************************************************** * Function: SetRelTimeout * * Description: * Sets the fields of the * passed in timespec to be relMillis milliseconds in the future. * Internal Only. * Parameters: * struct timespec *time * int relMillis - milliseconds in the future *****************************************************************************/static void SetRelTimeout( struct timespec *time, int relMillis ){ struct timeval now; int sec = relMillis / 1000; int milliSeconds = relMillis % 1000; assert( time != NULL ); gettimeofday( &now, NULL ); time->tv_sec = now.tv_sec + sec; time->tv_nsec = ( (now.tv_usec/1000) + milliSeconds ) * 1000000;}/**************************************************************************** * Function: SetSeed * * Description: * Sets seed for random number generator. * Each thread sets the seed random number generator. * Internal Only. * Parameters: * *****************************************************************************/static void SetSeed(){ struct timeval t; gettimeofday(&t, NULL);#if defined(WIN32) srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id().p );#elif defined(__FreeBSD__) || defined(__OSX__) || defined(__APPLE__) srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id() );#elif defined(__linux__) || defined(__sun) srand( ( unsigned int )t.tv_usec + ithread_get_current_thread_id() );#else { volatile union { volatile pthread_t tid; volatile unsigned i; } idu; idu.tid = ithread_get_current_thread_id(); srand( ( unsigned int )t.millitm + idu.i ); }#endif}/**************************************************************************** * Function: WorkerThread * * Description: * Implements a thread pool worker. * Worker waits for a job to become available. * Worker picks up persistent jobs first, high priority, med priority, * then low priority. * If worker remains idle for more than specified max, the worker * is released. * Internal Only. * Parameters: * void * arg -> is cast to ThreadPool * *****************************************************************************/static void *WorkerThread( void *arg ){ time_t start = 0; ThreadPoolJob *job = NULL; ListNode *head = NULL; struct timespec timeout; int retCode = 0; int persistent = -1; ThreadPool *tp = ( ThreadPool *) arg; // allow static linking#ifdef WIN32#ifdef PTW32_STATIC_LIB pthread_win32_thread_attach_np();#endif#endif assert( tp != NULL ); // Increment total thread count ithread_mutex_lock( &tp->mutex ); tp->totalThreads++; ithread_cond_broadcast( &tp->start_and_shutdown ); ithread_mutex_unlock( &tp->mutex ); SetSeed(); StatsTime( &start ); while( 1 ) { ithread_mutex_lock( &tp->mutex ); if( job ) { FreeThreadPoolJob( tp, job ); job = NULL; } retCode = 0; tp->stats.idleThreads++; tp->stats.totalWorkTime += ( StatsTime( NULL ) - start ); // work time StatsTime( &start ); // idle time if( persistent == 1 ) { // Persistent thread // becomes a regular thread tp->persistentThreads--; } if( persistent == 0 ) { tp->stats.workerThreads--; } // Check for a job or shutdown while( tp->lowJobQ.size == 0 && tp->medJobQ.size == 0 && tp->highJobQ.size == 0 && !tp->persistentJob && !tp->shutdown ) { // If wait timed out // and we currently have more than the // min threads, or if we have more than the max threads // (only possible if the attributes have been reset) // let this thread die. if( ( retCode == ETIMEDOUT && tp->totalThreads > tp->attr.minThreads ) || ( tp->attr.maxThreads != -1 && tp->totalThreads > tp->attr.maxThreads ) ) { tp->stats.idleThreads--; tp->totalThreads--; ithread_cond_broadcast( &tp->start_and_shutdown ); ithread_mutex_unlock( &tp->mutex );#ifdef WIN32#ifdef PTW32_STATIC_LIB // allow static linking pthread_win32_thread_detach_np ();#endif#endif return NULL; } SetRelTimeout( &timeout, tp->attr.maxIdleTime ); // wait for a job up to the specified max time retCode = ithread_cond_timedwait( &tp->condition, &tp->mutex, &timeout ); } tp->stats.idleThreads--; tp->stats.totalIdleTime += ( StatsTime( NULL ) - start ); // idle time StatsTime( &start ); // work time // bump priority of starved jobs BumpPriority( tp ); // if shutdown then stop if( tp->shutdown ) { tp->totalThreads--; ithread_cond_broadcast( &tp->start_and_shutdown ); ithread_mutex_unlock( &tp->mutex );#ifdef WIN32#ifdef PTW32_STATIC_LIB // allow static linking pthread_win32_thread_detach_np ();#endif#endif return NULL; } else { // Pick up persistent job if available if( tp->persistentJob ) { job = tp->persistentJob; tp->persistentJob = NULL; tp->persistentThreads++; persistent = 1; ithread_cond_broadcast( &tp->start_and_shutdown ); } else { tp->stats.workerThreads++; persistent = 0; // Pick the highest priority job if( tp->highJobQ.size > 0 ) { head = ListHead( &tp->highJobQ ); job = ( ThreadPoolJob *) head->item; CalcWaitTime( tp, HIGH_PRIORITY, job ); ListDelNode( &tp->highJobQ, head, 0 ); } else if( tp->medJobQ.size > 0 ) { head = ListHead( &tp->medJobQ ); job = ( ThreadPoolJob *) head->item; CalcWaitTime( tp, MED_PRIORITY, job ); ListDelNode( &tp->medJobQ, head, 0 ); } else if( tp->lowJobQ.size > 0 ) { head = ListHead( &tp->lowJobQ ); job = ( ThreadPoolJob *) head->item; CalcWaitTime( tp, LOW_PRIORITY, job ); ListDelNode( &tp->lowJobQ, head, 0 );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -