📄 threadpool.c
字号:
/////////////////////////////////////////////////////////////////////////////// Copyright (c) 2000-2003 Intel Corporation // All rights reserved. //// Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: //// * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // * Neither name of Intel Corporation nor the names of its contributors // may be used to endorse or promote products derived from this software // without specific prior written permission.// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY // OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./////////////////////////////////////////////////////////////////////////////#include "ThreadPool.h"#include "FreeList.h"#include <assert.h>#include <stdlib.h>#ifdef STATS#include <stdio.h>#endif/**************************************************************************** * Function: CmpThreadPoolJob * * Description: * Compares thread pool jobs. * Parameters: * void * - job A * void * - job B *****************************************************************************/static intCmpThreadPoolJob( void *jobA, void *jobB ){ ThreadPoolJob *a = ( ThreadPoolJob * ) jobA; ThreadPoolJob *b = ( ThreadPoolJob * ) jobB; assert( jobA != NULL ); assert( jobB != NULL ); return ( a->jobId == b->jobId );}/**************************************************************************** * Function: FreeThreadPoolJob * * Description: * Deallocates a dynamically allocated ThreadPoolJob. * Parameters: * ThreadPoolJob *tpj - must be allocated with CreateThreadPoolJob *****************************************************************************/static voidFreeThreadPoolJob( ThreadPool * tp, ThreadPoolJob * tpj ){ assert( tp != NULL ); FreeListFree( &tp->jobFreeList, tpj );}/**************************************************************************** * Function: SetPolicyType * * Description: * Sets the scheduling policy of the current process. * Internal only. * Parameters: * PolocyType in * Returns: * 0 on success, nonzero on failure * Returns result of GetLastError() on failure. * *****************************************************************************/static intSetPolicyType( PolicyType in ){ struct sched_param current; sched_getparam( 0, ¤t ); current.sched_priority = DEFAULT_SCHED_PARAM; return sched_setscheduler( 0, in, ¤t );}/**************************************************************************** * Function: SetPriority * * Description: * Sets the priority of the currently running thread. * Internal only. * Parameters: * ThreadPriority priority * Returns: * 0 on success, nonzero on failure * EINVAL invalid priority * Returns result of GerLastError on failure. * *****************************************************************************/static intSetPriority( ThreadPriority priority ){ int currentPolicy; int minPriority = 0; int maxPriority = 0; int actPriority = 0; int midPriority = 0; struct sched_param newPriority; pthread_getschedparam( ithread_self( ), ¤tPolicy, &newPriority ); minPriority = sched_get_priority_min( currentPolicy ); maxPriority = sched_get_priority_max( currentPolicy ); midPriority = ( maxPriority - minPriority ) / 2; switch ( priority ) { case LOW_PRIORITY: actPriority = minPriority; break; case MED_PRIORITY: actPriority = midPriority; break; case HIGH_PRIORITY: actPriority = maxPriority; break; default: return EINVAL; }; newPriority.sched_priority = actPriority; return pthread_setschedparam( ithread_self( ), currentPolicy, &newPriority );}/**************************************************************************** * Function: DiffMillis * * Description: * Returns the difference in milliseconds between two * timeb structures. * Internal Only. * Parameters: * struct timeb *time1, * struct timeb *time2, * Returns: * the difference in milliseconds, time1-time2. *****************************************************************************/static doubleDiffMillis( struct timeb *time1, struct timeb *time2 ){ double temp = 0; assert( time1 != NULL ); assert( time2 != NULL ); temp = ( ( double )( time1->time ) - time2->time ); temp = temp * 1000; temp += ( time1->millitm - time2->millitm ); return temp;}/**************************************************************************** * Function: BumpPriority * * Description: * Determines whether any jobs * need to be bumped to a higher priority Q and bumps them. * * tp->mutex must be locked. * Internal Only. * Parameters: * ThreadPool *tp *****************************************************************************/static voidBumpPriority( ThreadPool * tp ){ int done = 0; struct timeb now; double diffTime = 0; ThreadPoolJob *tempJob = NULL; assert( tp != NULL ); ftime( &now ); while( !done ) { if( tp->medJobQ.size ) { tempJob = ( ThreadPoolJob * ) tp->medJobQ.head.next->item; diffTime = DiffMillis( &now, &tempJob->requestTime ); if( diffTime >= ( tp->attr.starvationTime ) ) { //If job has waited longer than the //starvation time //bump priority (add to higher priority Q) STATSONLY( tp->stats.totalJobsMQ++; ); STATSONLY( tp->stats.totalTimeMQ += diffTime; ); ListDelNode( &tp->medJobQ, tp->medJobQ.head.next, 0 ); ListAddTail( &tp->highJobQ, tempJob ); continue; } } if( tp->lowJobQ.size ) { tempJob = ( ThreadPoolJob * ) tp->lowJobQ.head.next->item; diffTime = DiffMillis( &now, &tempJob->requestTime ); if( diffTime >= ( tp->attr.maxIdleTime ) ) { //If job has waited longer than the //starvation time //bump priority (add to higher priority Q) STATSONLY( tp->stats.totalJobsLQ++; ); STATSONLY( tp->stats.totalTimeLQ += diffTime; ); ListDelNode( &tp->lowJobQ, tp->lowJobQ.head.next, 0 ); ListAddTail( &tp->medJobQ, tempJob ); continue; } } done = 1; }}/**************************************************************************** * Function: SetRelTimeout * * Description: * Sets the fields of the * passed in timespec to be relMillis milliseconds in the future. * Internal Only. * Parameters: * struct timespec *time * int relMillis - milliseconds in the future *****************************************************************************/static voidSetRelTimeout( struct timespec *time, int relMillis ){ struct timeb now; int sec = relMillis / 1000; int milliSeconds = relMillis % 1000; assert( time != NULL ); ftime( &now ); time->tv_sec = now.time + sec; time->tv_nsec = ( now.millitm + milliSeconds ) * 1000000;}/**************************************************************************** * Function: StatsInit * * Description: * Initializes the statistics structure. * Internal Only. * Parameters: * ThreadPoolStats *stats must be valid non null stats structure *****************************************************************************/STATSONLY( static void StatsInit( ThreadPoolStats * stats ) { assert( stats != NULL ); stats->totalIdleTime = 0; stats->totalJobsHQ = 0; stats->totalJobsLQ = 0; stats->totalJobsMQ = 0; stats->totalTimeHQ = 0; stats->totalTimeMQ = 0; stats->totalTimeLQ = 0; stats->totalWorkTime = 0; stats->totalIdleTime = 0; stats->avgWaitHQ = 0; //average wait in HQ stats->avgWaitMQ = 0; //average wait in MQ stats->avgWaitLQ = 0; stats->workerThreads = 0; stats->idleThreads = 0; stats->persistentThreads = 0; stats->maxThreads = 0; stats->totalThreads = 0;} )/**************************************************************************** * Function: CalcWaitTime * * Description: * Calculates the time the job has been waiting at the specified * priority. Adds to the totalTime and totalJobs kept in the * thread pool statistics structure. * Internal Only. * * Parameters: * ThreadPool *tp * ThreadPriority p * ThreadPoolJob *job *****************************************************************************/STATSONLY( static void CalcWaitTime( ThreadPool * tp, ThreadPriority p, ThreadPoolJob * job ) { struct timeb now; double diff; assert( tp != NULL ); assert( job != NULL ); ftime( &now ); diff = DiffMillis( &now, &job->requestTime ); switch ( p ) {case HIGH_PRIORITY:tp->stats.totalJobsHQ++; tp->stats.totalTimeHQ += diff; break; case MED_PRIORITY:tp->stats.totalJobsMQ++; tp->stats.totalTimeMQ += diff; break; case LOW_PRIORITY:tp->stats.totalJobsLQ++; tp->stats.totalTimeLQ += diff; break; default: assert( 0 );} } )/**************************************************************************** * Function: SetSeed * * Description: * Sets seed for random number generator. * Each thread sets the seed random number generator. * Internal Only. * Parameters: * *****************************************************************************/ void SetSeed( ) { struct timeb t; ftime( &t ); srand( ( unsigned int )t.millitm + ithread_get_current_thread_id( ) ); }/**************************************************************************** * Function: WorkerThread * * Description: * Implements a thread pool worker. * Worker waits for a job to become available. * Worker picks up persistent jobs first, high priority, med priority, * then low priority. * If worker remains idle for more than specified max, the worker * is released. * Internal Only. * Parameters: * void * arg -> is cast to ThreadPool * *****************************************************************************/ static void *WorkerThread( void *arg ) { STATSONLY( time_t start = 0; ) ThreadPoolJob *job = NULL; ListNode *head = NULL; struct timespec timeout; int retCode = 0; int persistent = -1; ThreadPool *tp = ( ThreadPool * ) arg; assert( tp != NULL ); //Increment total thread count ithread_mutex_lock( &tp->mutex ); tp->totalThreads++; ithread_cond_broadcast( &tp->start_and_shutdown ); ithread_mutex_unlock( &tp->mutex ); SetSeed( );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -