⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.c

📁 电驴下载工具eMule0.47aVeryCD的源代码,可作分析测试也可用于P2P软件的开发研究.
💻 C
📖 第 1 页 / 共 4 页
字号:
 *      Worker picks up persistent jobs first, high priority, med priority,
 *             then low priority.
 *      If worker remains idle for more than specified max, the worker
 *      is released.
 *      Internal Only.
 *  Parameters:
 *      void * arg -> is cast to ThreadPool *
 *****************************************************************************/
    static void *WorkerThread( void *arg ) {

        STATSONLY( time_t start = 0;
             )

        ThreadPoolJob *job = NULL;
        ListNode *head = NULL;

        struct timespec timeout;
        int retCode = 0;
        int persistent = -1;
        ThreadPool *tp = ( ThreadPool * ) arg;

        assert( tp != NULL );

        //Increment total thread count
        ithread_mutex_lock( &tp->mutex );
        tp->totalThreads++;
        ithread_cond_broadcast( &tp->start_and_shutdown );
        ithread_mutex_unlock( &tp->mutex );
        ithread_mutex_lock( &tp->mutex );
        ithread_mutex_unlock( &tp->mutex );

        SetSeed(  );

        STATSONLY( time( &start );
             );

        while( 1 ) {

            ithread_mutex_lock( &tp->mutex );

            if( job ) {

                FreeThreadPoolJob( tp, job );
                job = NULL;

            }

            retCode = 0;

            STATSONLY( tp->stats.idleThreads++;
                 );
            STATSONLY( tp->stats.totalWorkTime += ( time( NULL ) - start );
                 );             //work time
            STATSONLY( time( &start );
                 );             //idle time

            if( persistent == 1 ) {
                //Persistent thread
                //becomes a regular thread
                tp->persistentThreads--;
            }

            STATSONLY( if( persistent == 0 )
                       tp->stats.workerThreads--; );

            //Check for a job or shutdown
            while( ( tp->lowJobQ.size == 0 )
                   && ( tp->medJobQ.size == 0 )
                   && ( tp->highJobQ.size == 0 )
                   && ( !tp->persistentJob )
                   && ( !tp->shutdown ) ) {

                //If wait timed out
                //and we currently have more than the
                //min threads, or if we have more than the max threads
                // (only possible if the attributes have been reset)
                //let this thread die.

                if( ( ( retCode == ETIMEDOUT )
                      && ( ( tp->totalThreads ) > tp->attr.minThreads ) )
                    || ( ( tp->attr.maxThreads != -1 )
                         && ( ( tp->totalThreads ) >
                              tp->attr.maxThreads ) ) ) {

                    STATSONLY( tp->stats.idleThreads-- );

                    tp->totalThreads--;
                    ithread_cond_broadcast( &tp->start_and_shutdown );
                    ithread_mutex_unlock( &tp->mutex );

                    return NULL;
                }

                SetRelTimeout( &timeout, tp->attr.maxIdleTime );

                //wait for a job up to the specified max time
                retCode = ithread_cond_timedwait( &tp->condition,
                                                  &tp->mutex, &timeout );

            }

            STATSONLY( tp->stats.idleThreads--;
                 );
            STATSONLY( tp->stats.totalIdleTime += ( time( NULL ) - start );
                 );             //idle time
            STATSONLY( time( &start );
                 );             //work time

            //bump priority of starved jobs
            BumpPriority( tp );

            //if shutdown then stop
            if( tp->shutdown ) {
                tp->totalThreads--;

                ithread_cond_broadcast( &tp->start_and_shutdown );

                ithread_mutex_unlock( &tp->mutex );

                return NULL;
            } else {

                //Pick up persistent job if available
                if( tp->persistentJob ) {

                    job = tp->persistentJob;
                    tp->persistentJob = NULL;
                    tp->persistentThreads++;
                    persistent = 1;
                    ithread_cond_broadcast( &tp->start_and_shutdown );

                } else {
                    STATSONLY( tp->stats.workerThreads++ );
                    persistent = 0;

                    //Pick the highest priority job
                    if( tp->highJobQ.size > 0 ) {
                        head = ListHead( &tp->highJobQ );
                        job = ( ThreadPoolJob * ) head->item;
                        STATSONLY( CalcWaitTime
                                   ( tp, HIGH_PRIORITY, job ) );
                        ListDelNode( &tp->highJobQ, head, 0 );

                    } else if( tp->medJobQ.size > 0 ) {
                        head = ListHead( &tp->medJobQ );
                        job = ( ThreadPoolJob * ) head->item;
                        STATSONLY( CalcWaitTime( tp, MED_PRIORITY, job ) );
                        ListDelNode( &tp->medJobQ, head, 0 );

                    } else if( tp->lowJobQ.size > 0 ) {
                        head = ListHead( &tp->lowJobQ );
                        job = ( ThreadPoolJob * ) head->item;
                        STATSONLY( CalcWaitTime( tp, LOW_PRIORITY, job ) );
                        ListDelNode( &tp->lowJobQ, head, 0 );

                    } else {

                        // Should never get here
                        assert( 0 );
                        STATSONLY( tp->stats.workerThreads-- );
                        tp->totalThreads--;
                        ithread_cond_broadcast( &tp->start_and_shutdown );
                        ithread_mutex_unlock( &tp->mutex );

                        return NULL;
                    }

                }
            }

            ithread_mutex_unlock( &tp->mutex );

            if( SetPriority( job->priority ) != 0 ) {
                // In the future can log
                // info
            } else {
                // In the future can log
                // info
            }

            //run the job

            job->func( job->arg );

            //return to Normal
            SetPriority( DEFAULT_PRIORITY );

        }
    }

/****************************************************************************
 * Function: CreateThreadPoolJob
 *
 *  Description:
 *      Creates a Thread Pool Job. (Dynamically allocated)
 *      Internal to thread pool.
 *  Parameters:
 *      ThreadPoolJob * job - job is copied
 *      id - id of job
 *
 *  Returns:
 *      ThreadPoolJob * on success, NULL on failure.
 *****************************************************************************/
    static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob * job,
                                               int id,
                                               ThreadPool * tp ) {

        ThreadPoolJob *newJob = NULL;

        assert( job != NULL );
        assert( tp != NULL );

        newJob = ( ThreadPoolJob * ) FreeListAlloc( &tp->jobFreeList );

        if( newJob ) {
            ( *newJob ) = ( *job );
            newJob->jobId = id;
            ftime( &newJob->requestTime );
        }
        return newJob;
    }

/****************************************************************************
 * Function: CreateWorker
 *
 *  Description:
 *      Creates a worker thread, if the thread pool
 *      does not already have max threads.
 *      Internal to thread pool.
 *  Parameters:
 *      ThreadPool *tp
 *
 *  Returns:
 *      0 on success, <0 on failure
 *      EMAXTHREADS if already max threads reached
 *      EAGAIN if system can not create thread
 *
 *****************************************************************************/
    static int CreateWorker( ThreadPool * tp ) {
        ithread_t temp;
        int rc = 0;
        int currentThreads = tp->totalThreads + 1;

        assert( tp != NULL );

        if( ( tp->attr.maxThreads != INFINITE_THREADS )
            && ( currentThreads > tp->attr.maxThreads ) ) {
            return EMAXTHREADS;
        }

        rc = ithread_create( &temp, NULL, WorkerThread, tp );
#ifndef _WIN32
	_SetSchedulingAndPriority( temp, 0 ); //MTY turn this on?
#endif
        if( rc == 0 ) {

            rc = ithread_detach( temp );

            while( tp->totalThreads < currentThreads ) {

                ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );

            }

        }

        STATSONLY( if( tp->stats.maxThreads < tp->totalThreads ) {
                   tp->stats.maxThreads = tp->totalThreads;}
         )

            return rc;
    }

/****************************************************************************
 * Function: AddWorker
 *
 *  Description:
 *      Determines whether or not a thread should be added
 *      based on the jobsPerThread ratio.
 *      Adds a thread if appropriate.
 *      Internal to Thread Pool.
 *  Parameters:
 *      ThreadPool* tp
 *
 *****************************************************************************/
    static void AddWorker( ThreadPool * tp ) {
        int jobs = 0;
        int threads = 0;

        assert( tp != NULL );

        jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;

        threads = tp->totalThreads - tp->persistentThreads;

        while( ( threads == 0 )
               || ( ( jobs / threads ) > tp->attr.jobsPerThread ) ) {

            if( CreateWorker( tp ) != 0 )
                return;
            threads++;
        }

    }

/****************************************************************************
 * Function: ThreadPoolInit
 *
 *  Description:
 *      Initializes and starts ThreadPool. Must be called first.
 *      And only once for ThreadPool.
 *  Parameters:
 *      tp  - must be valid, non null, pointer to ThreadPool.
 *      minWorkerThreads - minimum number of worker threads
 *                         thread pool will never have less than this
 *                         number of threads.
 *      maxWorkerThreads - maximum number of worker threads
 *                         thread pool will never have more than this
 *                         number of threads.
 *      maxIdleTime      - maximum time that a worker thread will spend
 *                         idle. If a worker is idle longer than this
 *                         time and there are more than the min
 *                         number of workers running, than the
 *                         worker thread exits.
 *      jobsPerThread    - ratio of jobs to thread to try and maintain
 *                         if a job is scheduled and the number of jobs per
 *                         thread is greater than this number,and
 *                         if less than the maximum number of
 *                         workers are running then a new thread is
 *                         started to help out with efficiency.
 *      schedPolicy      - scheduling policy to try and set (OS dependent)
 *  Returns:
 *      0 on success, nonzero on failure.
 *      EAGAIN if not enough system resources to create minimum threads.
 *      INVALID_POLICY if schedPolicy can't be set
 *      EMAXTHREADS if minimum threads is greater than maximum threads
 *****************************************************************************/
    int ThreadPoolInit( ThreadPool * tp,
                        ThreadPoolAttr * attr ) {
        int retCode = 0;
        int i = 0;

        assert( tp != NULL );

        if( tp == NULL ) {
            return EINVAL;
        }

        retCode += ithread_mutex_init( &tp->mutex, NULL );
        assert( retCode == 0 );

        retCode += ithread_mutex_lock( &tp->mutex );
        assert( retCode == 0 );

        retCode += ithread_cond_init( &tp->condition, NULL );
        assert( retCode == 0 );

        retCode += ithread_cond_init( &tp->start_and_shutdown, NULL );
        assert( retCode == 0 );

        if( retCode != 0 ) {
            return EAGAIN;
        }

        if( attr ) {
            tp->attr = ( *attr );
        } else {
            TPAttrInit( &tp->attr );
        }

        if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) {
            ithread_mutex_unlock( &tp->mutex );
            ithread_mutex_destroy( &tp->mutex );
            ithread_cond_destroy( &tp->condition );
            ithread_cond_destroy( &tp->start_and_shutdown );
            return INVALID_POLICY;
        }

        retCode += FreeListInit( &tp->jobFreeList, sizeof( ThreadPoolJob ),
                                 JOBFREELISTSIZE );
        assert( retCode == 0 );

        STATSONLY( StatsInit( &tp->stats ) );

        retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL );
        assert( retCode == 0 );

        retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL );
        assert( retCode == 0 );

        retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL );
        assert( retCode == 0 );

        if( retCode != 0 ) {
            retCode = EAGAIN;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -