⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.c

📁 电驴下载工具eMule0.47aVeryCD的源代码,可作分析测试也可用于P2P软件的开发研究.
💻 C
📖 第 1 页 / 共 4 页
字号:
///////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000-2003 Intel Corporation 
// All rights reserved. 
//
// Redistribution and use in source and binary forms, with or without 
// modification, are permitted provided that the following conditions are met: 
//
// * Redistributions of source code must retain the above copyright notice, 
// this list of conditions and the following disclaimer. 
// * Redistributions in binary form must reproduce the above copyright notice, 
// this list of conditions and the following disclaimer in the documentation 
// and/or other materials provided with the distribution. 
// * Neither name of Intel Corporation nor the names of its contributors 
// may be used to endorse or promote products derived from this software 
// without specific prior written permission.
// 
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR 
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 
// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
///////////////////////////////////////////////////////////////////////////

#include "ThreadPool.h"
#include "FreeList.h"
#include <assert.h>
#include <stdlib.h>

#ifdef STATS
#include <stdio.h>
#endif

//#ifndef _WIN32
static char * _GetErrorCode(int aCode)
{
    if (aCode == EINVAL) {
	return "EINVAL";
    } else if (aCode == EPERM) {
	return "EPERM";
    } else if (aCode == ESRCH) {
	return "ESRCH";
    } else if (aCode == EFAULT) {
	return "EFAULT";
    } else {
	return "UNKNOWN";
    }
}


static void _SetSchedulingAndPriority(pthread_t aThread, int aPriority)
{
    struct sched_param schedParam;

    int retVal;

    if (aPriority == 0) {
	schedParam.sched_priority = sched_get_priority_min(SCHED_FIFO);
    } else {
	schedParam.sched_priority = sched_get_priority_max(SCHED_FIFO);
    }

    if ((retVal = pthread_setschedparam(aThread, SCHED_FIFO, &schedParam))) {
	printf("pthread_setschedparam returned %d - %s\n", retVal, _GetErrorCode(retVal));
    }
}
//#endif

/****************************************************************************
 * Function: CmpThreadPoolJob
 *
 *  Description:
 *      Compares thread pool jobs.
 *  Parameters:
 *      void * - job A
 *      void * - job B
 *****************************************************************************/
static int
CmpThreadPoolJob( void *jobA,
                  void *jobB )
{
    ThreadPoolJob *a = ( ThreadPoolJob * ) jobA;
    ThreadPoolJob *b = ( ThreadPoolJob * ) jobB;

    assert( jobA != NULL );
    assert( jobB != NULL );
    return ( a->jobId == b->jobId );
}

/****************************************************************************
 * Function: FreeThreadPoolJob
 *
 *  Description:
 *      Deallocates a dynamically allocated ThreadPoolJob.
 *  Parameters:
 *      ThreadPoolJob *tpj - must be allocated with CreateThreadPoolJob
 *****************************************************************************/
static void
FreeThreadPoolJob( ThreadPool * tp,
                   ThreadPoolJob * tpj )
{
    assert( tp != NULL );

    FreeListFree( &tp->jobFreeList, tpj );
}

/****************************************************************************
 * Function: SetPolicyType
 *
 *  Description:
 *      Sets the scheduling policy of the current process.
 *      Internal only.
 *  Parameters:
 *      PolocyType in
 *  Returns:
 *      0 on success, nonzero on failure
 *      Returns result of GetLastError() on failure.
 *
 *****************************************************************************/
static int
SetPolicyType( PolicyType in )
{
#ifndef _WIN32    
    struct sched_param current;

    sched_getparam( 0, &current );
    current.sched_priority = DEFAULT_SCHED_PARAM;
    return sched_setscheduler( 0, in, &current );
#else
    return sched_setscheduler( 0, in );
#endif
}

/****************************************************************************
 * Function: SetPriority
 *
 *  Description:
 *      Sets the priority of the currently running thread.
 *      Internal only.
 *  Parameters:
 *      ThreadPriority priority
 *  Returns:
 *      0 on success, nonzero on failure
 *      EINVAL invalid priority
 *      Returns result of GerLastError on failure.
 *
 *****************************************************************************/
static int
SetPriority( ThreadPriority priority )
{

    int currentPolicy;
    int minPriority = 0;
    int maxPriority = 0;
    int actPriority = 0;
    int midPriority = 0;
    struct sched_param newPriority;

    pthread_getschedparam( ithread_self(  ), &currentPolicy,
                           &newPriority );
    minPriority = sched_get_priority_min( currentPolicy );
    maxPriority = sched_get_priority_max( currentPolicy );
    midPriority = ( maxPriority - minPriority ) / 2;
    switch ( priority ) {

        case LOW_PRIORITY:
            actPriority = minPriority;
            break;
        case MED_PRIORITY:
            actPriority = midPriority;
            break;
        case HIGH_PRIORITY:
            actPriority = maxPriority;
            break;
        default:
            return EINVAL;
    };

    newPriority.sched_priority = actPriority;

    return pthread_setschedparam( ithread_self(  ), currentPolicy,
                                  &newPriority );

}

/****************************************************************************
 * Function: DiffMillis
 *
 *  Description:
 *      Returns the difference in milliseconds between two
 *      timeb structures.
 *      Internal Only.
 *  Parameters:
 *      struct timeb *time1,
 *      struct timeb *time2,
 *  Returns:
 *       the difference in milliseconds, time1-time2.
 *****************************************************************************/
static double
DiffMillis( struct timeb *time1,
            struct timeb *time2 )
{
    double temp = 0;

    assert( time1 != NULL );
    assert( time2 != NULL );

    temp = ( ( double )( time1->time ) - time2->time );
    temp = temp * 1000;
    temp += ( time1->millitm - time2->millitm );
    return temp;
}

/****************************************************************************
 * Function: BumpPriority
 *
 *  Description:
 *      Determines whether any jobs
 *      need to be bumped to a higher priority Q and bumps them.
 *
 *      tp->mutex must be locked.
 *      Internal Only.
 *  Parameters:
 *      ThreadPool *tp
 *****************************************************************************/
static void
BumpPriority( ThreadPool * tp )
{
    int done = 0;
    struct timeb now;
    double diffTime = 0;
    ThreadPoolJob *tempJob = NULL;

    assert( tp != NULL );

    ftime( &now );

    while( !done ) {
        if( tp->medJobQ.size ) {
            tempJob = ( ThreadPoolJob * ) tp->medJobQ.head.next->item;
            diffTime = DiffMillis( &now, &tempJob->requestTime );

            if( diffTime >= ( tp->attr.starvationTime ) ) {
                //If job has waited longer than the
                //starvation time
                //bump priority (add to higher priority Q)

                STATSONLY( tp->stats.totalJobsMQ++;
                     );
                STATSONLY( tp->stats.totalTimeMQ += diffTime;
                     );

                ListDelNode( &tp->medJobQ, tp->medJobQ.head.next, 0 );
                ListAddTail( &tp->highJobQ, tempJob );
                continue;
            }
        }

        if( tp->lowJobQ.size ) {
            tempJob = ( ThreadPoolJob * ) tp->lowJobQ.head.next->item;

            diffTime = DiffMillis( &now, &tempJob->requestTime );

            if( diffTime >= ( tp->attr.maxIdleTime ) ) {
                //If job has waited longer than the
                //starvation time
                //bump priority (add to higher priority Q)

                STATSONLY( tp->stats.totalJobsLQ++;
                     );
                STATSONLY( tp->stats.totalTimeLQ += diffTime;
                     );

                ListDelNode( &tp->lowJobQ, tp->lowJobQ.head.next, 0 );
                ListAddTail( &tp->medJobQ, tempJob );
                continue;
            }
        }

        done = 1;
    }
}

/****************************************************************************
 * Function: SetRelTimeout
 *
 *  Description:
 *      Sets the fields of the
 *      passed in timespec to be relMillis milliseconds in the future.
 *      Internal Only.
 *  Parameters:
 *      struct timespec *time
 *      int relMillis - milliseconds in the future
 *****************************************************************************/
static void
SetRelTimeout( struct timespec *time,
               int relMillis )
{
    struct timeb now;
    int sec = relMillis / 1000;
    int milliSeconds = relMillis % 1000;

    assert( time != NULL );

    ftime( &now );

    time->tv_sec = now.time + sec;
    time->tv_nsec = ( now.millitm + milliSeconds ) * 1000000;
}

/****************************************************************************
 * Function: StatsInit
 *
 *  Description:
 *      Initializes the statistics structure.
 *      Internal Only.
 *  Parameters:
 *      ThreadPoolStats *stats must be valid non null stats structure
 *****************************************************************************/
STATSONLY( static void StatsInit( ThreadPoolStats * stats ) {
           assert( stats != NULL ); stats->totalIdleTime = 0; stats->totalJobsHQ = 0; stats->totalJobsLQ = 0; stats->totalJobsMQ = 0; stats->totalTimeHQ = 0; stats->totalTimeMQ = 0; stats->totalTimeLQ = 0; stats->totalWorkTime = 0; stats->totalIdleTime = 0; stats->avgWaitHQ = 0; //average wait in HQ
           stats->avgWaitMQ = 0;    //average wait in MQ
           stats->avgWaitLQ = 0;
           stats->workerThreads = 0;
           stats->idleThreads = 0;
           stats->persistentThreads = 0;
           stats->maxThreads = 0; stats->totalThreads = 0;}
 )

/****************************************************************************
 * Function: CalcWaitTime
 *
 *  Description:
 *      Calculates the time the job has been waiting at the specified
 *      priority. Adds to the totalTime and totalJobs kept in the
 *      thread pool statistics structure.
 *      Internal Only.
 *
 *  Parameters:
 *      ThreadPool *tp
 *      ThreadPriority p
 *      ThreadPoolJob *job
 *****************************************************************************/
STATSONLY( static void CalcWaitTime( ThreadPool * tp,
                                     ThreadPriority p,
                                     ThreadPoolJob * job ) {
           struct timeb now;
           double diff;
           assert( tp != NULL );
           assert( job != NULL );
           ftime( &now );
           diff = DiffMillis( &now, &job->requestTime ); switch ( p ) {
case HIGH_PRIORITY:
tp->stats.totalJobsHQ++; tp->stats.totalTimeHQ += diff; break; case MED_PRIORITY:
tp->stats.totalJobsMQ++; tp->stats.totalTimeMQ += diff; break; case LOW_PRIORITY:
tp->stats.totalJobsLQ++; tp->stats.totalTimeLQ += diff; break; default:
           assert( 0 );}
           }

 )

/****************************************************************************
 * Function: SetSeed
 *
 *  Description:
 *      Sets seed for random number generator.
 *      Each thread sets the seed random number generator.
 *      Internal Only.
 *  Parameters:
 *      
 *****************************************************************************/
    void SetSeed(  ) {
    struct timeb t;

    ftime( &t );
#ifndef _WIN32
    srand( ( unsigned int )t.millitm + ithread_get_current_thread_id(  ) );
#else
    srand( ( unsigned int )t.millitm + (unsigned int)ithread_get_current_thread_id(  ).p );
#endif
    }

/****************************************************************************
 * Function: WorkerThread
 *
 *  Description:
 *      Implements a thread pool worker.
 *      Worker waits for a job to become available.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -