📄 threadpool.c
字号:
///////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000-2003 Intel Corporation
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
// * Neither name of Intel Corporation nor the names of its contributors
// may be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
///////////////////////////////////////////////////////////////////////////
#include "ThreadPool.h"
#include "FreeList.h"
#include <assert.h>
#include <stdlib.h>
#ifdef STATS
#include <stdio.h>
#endif
//#ifndef _WIN32
static char * _GetErrorCode(int aCode)
{
if (aCode == EINVAL) {
return "EINVAL";
} else if (aCode == EPERM) {
return "EPERM";
} else if (aCode == ESRCH) {
return "ESRCH";
} else if (aCode == EFAULT) {
return "EFAULT";
} else {
return "UNKNOWN";
}
}
static void _SetSchedulingAndPriority(pthread_t aThread, int aPriority)
{
struct sched_param schedParam;
int retVal;
if (aPriority == 0) {
schedParam.sched_priority = sched_get_priority_min(SCHED_FIFO);
} else {
schedParam.sched_priority = sched_get_priority_max(SCHED_FIFO);
}
if ((retVal = pthread_setschedparam(aThread, SCHED_FIFO, &schedParam))) {
printf("pthread_setschedparam returned %d - %s\n", retVal, _GetErrorCode(retVal));
}
}
//#endif
/****************************************************************************
* Function: CmpThreadPoolJob
*
* Description:
* Compares thread pool jobs.
* Parameters:
* void * - job A
* void * - job B
*****************************************************************************/
static int
CmpThreadPoolJob( void *jobA,
void *jobB )
{
ThreadPoolJob *a = ( ThreadPoolJob * ) jobA;
ThreadPoolJob *b = ( ThreadPoolJob * ) jobB;
assert( jobA != NULL );
assert( jobB != NULL );
return ( a->jobId == b->jobId );
}
/****************************************************************************
* Function: FreeThreadPoolJob
*
* Description:
* Deallocates a dynamically allocated ThreadPoolJob.
* Parameters:
* ThreadPoolJob *tpj - must be allocated with CreateThreadPoolJob
*****************************************************************************/
static void
FreeThreadPoolJob( ThreadPool * tp,
ThreadPoolJob * tpj )
{
assert( tp != NULL );
FreeListFree( &tp->jobFreeList, tpj );
}
/****************************************************************************
* Function: SetPolicyType
*
* Description:
* Sets the scheduling policy of the current process.
* Internal only.
* Parameters:
* PolocyType in
* Returns:
* 0 on success, nonzero on failure
* Returns result of GetLastError() on failure.
*
*****************************************************************************/
static int
SetPolicyType( PolicyType in )
{
#ifndef _WIN32
struct sched_param current;
sched_getparam( 0, ¤t );
current.sched_priority = DEFAULT_SCHED_PARAM;
return sched_setscheduler( 0, in, ¤t );
#else
return sched_setscheduler( 0, in );
#endif
}
/****************************************************************************
* Function: SetPriority
*
* Description:
* Sets the priority of the currently running thread.
* Internal only.
* Parameters:
* ThreadPriority priority
* Returns:
* 0 on success, nonzero on failure
* EINVAL invalid priority
* Returns result of GerLastError on failure.
*
*****************************************************************************/
static int
SetPriority( ThreadPriority priority )
{
int currentPolicy;
int minPriority = 0;
int maxPriority = 0;
int actPriority = 0;
int midPriority = 0;
struct sched_param newPriority;
pthread_getschedparam( ithread_self( ), ¤tPolicy,
&newPriority );
minPriority = sched_get_priority_min( currentPolicy );
maxPriority = sched_get_priority_max( currentPolicy );
midPriority = ( maxPriority - minPriority ) / 2;
switch ( priority ) {
case LOW_PRIORITY:
actPriority = minPriority;
break;
case MED_PRIORITY:
actPriority = midPriority;
break;
case HIGH_PRIORITY:
actPriority = maxPriority;
break;
default:
return EINVAL;
};
newPriority.sched_priority = actPriority;
return pthread_setschedparam( ithread_self( ), currentPolicy,
&newPriority );
}
/****************************************************************************
* Function: DiffMillis
*
* Description:
* Returns the difference in milliseconds between two
* timeb structures.
* Internal Only.
* Parameters:
* struct timeb *time1,
* struct timeb *time2,
* Returns:
* the difference in milliseconds, time1-time2.
*****************************************************************************/
static double
DiffMillis( struct timeb *time1,
struct timeb *time2 )
{
double temp = 0;
assert( time1 != NULL );
assert( time2 != NULL );
temp = ( ( double )( time1->time ) - time2->time );
temp = temp * 1000;
temp += ( time1->millitm - time2->millitm );
return temp;
}
/****************************************************************************
* Function: BumpPriority
*
* Description:
* Determines whether any jobs
* need to be bumped to a higher priority Q and bumps them.
*
* tp->mutex must be locked.
* Internal Only.
* Parameters:
* ThreadPool *tp
*****************************************************************************/
static void
BumpPriority( ThreadPool * tp )
{
int done = 0;
struct timeb now;
double diffTime = 0;
ThreadPoolJob *tempJob = NULL;
assert( tp != NULL );
ftime( &now );
while( !done ) {
if( tp->medJobQ.size ) {
tempJob = ( ThreadPoolJob * ) tp->medJobQ.head.next->item;
diffTime = DiffMillis( &now, &tempJob->requestTime );
if( diffTime >= ( tp->attr.starvationTime ) ) {
//If job has waited longer than the
//starvation time
//bump priority (add to higher priority Q)
STATSONLY( tp->stats.totalJobsMQ++;
);
STATSONLY( tp->stats.totalTimeMQ += diffTime;
);
ListDelNode( &tp->medJobQ, tp->medJobQ.head.next, 0 );
ListAddTail( &tp->highJobQ, tempJob );
continue;
}
}
if( tp->lowJobQ.size ) {
tempJob = ( ThreadPoolJob * ) tp->lowJobQ.head.next->item;
diffTime = DiffMillis( &now, &tempJob->requestTime );
if( diffTime >= ( tp->attr.maxIdleTime ) ) {
//If job has waited longer than the
//starvation time
//bump priority (add to higher priority Q)
STATSONLY( tp->stats.totalJobsLQ++;
);
STATSONLY( tp->stats.totalTimeLQ += diffTime;
);
ListDelNode( &tp->lowJobQ, tp->lowJobQ.head.next, 0 );
ListAddTail( &tp->medJobQ, tempJob );
continue;
}
}
done = 1;
}
}
/****************************************************************************
* Function: SetRelTimeout
*
* Description:
* Sets the fields of the
* passed in timespec to be relMillis milliseconds in the future.
* Internal Only.
* Parameters:
* struct timespec *time
* int relMillis - milliseconds in the future
*****************************************************************************/
static void
SetRelTimeout( struct timespec *time,
int relMillis )
{
struct timeb now;
int sec = relMillis / 1000;
int milliSeconds = relMillis % 1000;
assert( time != NULL );
ftime( &now );
time->tv_sec = now.time + sec;
time->tv_nsec = ( now.millitm + milliSeconds ) * 1000000;
}
/****************************************************************************
* Function: StatsInit
*
* Description:
* Initializes the statistics structure.
* Internal Only.
* Parameters:
* ThreadPoolStats *stats must be valid non null stats structure
*****************************************************************************/
STATSONLY( static void StatsInit( ThreadPoolStats * stats ) {
assert( stats != NULL ); stats->totalIdleTime = 0; stats->totalJobsHQ = 0; stats->totalJobsLQ = 0; stats->totalJobsMQ = 0; stats->totalTimeHQ = 0; stats->totalTimeMQ = 0; stats->totalTimeLQ = 0; stats->totalWorkTime = 0; stats->totalIdleTime = 0; stats->avgWaitHQ = 0; //average wait in HQ
stats->avgWaitMQ = 0; //average wait in MQ
stats->avgWaitLQ = 0;
stats->workerThreads = 0;
stats->idleThreads = 0;
stats->persistentThreads = 0;
stats->maxThreads = 0; stats->totalThreads = 0;}
)
/****************************************************************************
* Function: CalcWaitTime
*
* Description:
* Calculates the time the job has been waiting at the specified
* priority. Adds to the totalTime and totalJobs kept in the
* thread pool statistics structure.
* Internal Only.
*
* Parameters:
* ThreadPool *tp
* ThreadPriority p
* ThreadPoolJob *job
*****************************************************************************/
STATSONLY( static void CalcWaitTime( ThreadPool * tp,
ThreadPriority p,
ThreadPoolJob * job ) {
struct timeb now;
double diff;
assert( tp != NULL );
assert( job != NULL );
ftime( &now );
diff = DiffMillis( &now, &job->requestTime ); switch ( p ) {
case HIGH_PRIORITY:
tp->stats.totalJobsHQ++; tp->stats.totalTimeHQ += diff; break; case MED_PRIORITY:
tp->stats.totalJobsMQ++; tp->stats.totalTimeMQ += diff; break; case LOW_PRIORITY:
tp->stats.totalJobsLQ++; tp->stats.totalTimeLQ += diff; break; default:
assert( 0 );}
}
)
/****************************************************************************
* Function: SetSeed
*
* Description:
* Sets seed for random number generator.
* Each thread sets the seed random number generator.
* Internal Only.
* Parameters:
*
*****************************************************************************/
void SetSeed( ) {
struct timeb t;
ftime( &t );
#ifndef _WIN32
srand( ( unsigned int )t.millitm + ithread_get_current_thread_id( ) );
#else
srand( ( unsigned int )t.millitm + (unsigned int)ithread_get_current_thread_id( ).p );
#endif
}
/****************************************************************************
* Function: WorkerThread
*
* Description:
* Implements a thread pool worker.
* Worker waits for a job to become available.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -