⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.c

📁 Upnp开发包文件
💻 C
📖 第 1 页 / 共 4 页
字号:
        STATSONLY( time( &start );             );        while( 1 ) {            ithread_mutex_lock( &tp->mutex );            if( job ) {                FreeThreadPoolJob( tp, job );                job = NULL;            }            retCode = 0;            STATSONLY( tp->stats.idleThreads++;                 );            STATSONLY( tp->stats.totalWorkTime += ( time( NULL ) - start );                 );             //work time            STATSONLY( time( &start );                 );             //idle time            if( persistent == 1 ) {                //Persistent thread                //becomes a regular thread                tp->persistentThreads--;            }            STATSONLY( if( persistent == 0 )                       tp->stats.workerThreads--; );            //Check for a job or shutdown            while( ( tp->lowJobQ.size == 0 )                   && ( tp->medJobQ.size == 0 )                   && ( tp->highJobQ.size == 0 )                   && ( !tp->persistentJob )                   && ( !tp->shutdown ) ) {                //If wait timed out                //and we currently have more than the                //min threads, or if we have more than the max threads                // (only possible if the attributes have been reset)                //let this thread die.                if( ( ( retCode == ETIMEDOUT )                      && ( ( tp->totalThreads ) > tp->attr.minThreads ) )                    || ( ( tp->attr.maxThreads != -1 )                         && ( ( tp->totalThreads ) >                              tp->attr.maxThreads ) ) ) {                    STATSONLY( tp->stats.idleThreads-- );                    tp->totalThreads--;                    ithread_cond_broadcast( &tp->start_and_shutdown );                    ithread_mutex_unlock( &tp->mutex );                    return NULL;                }                SetRelTimeout( &timeout, tp->attr.maxIdleTime );                //wait for a job up to the specified max time                retCode = ithread_cond_timedwait( &tp->condition,                                                  &tp->mutex, &timeout );            }            STATSONLY( tp->stats.idleThreads--;                 );            STATSONLY( tp->stats.totalIdleTime += ( time( NULL ) - start );                 );             //idle time            STATSONLY( time( &start );                 );             //work time            //bump priority of starved jobs            BumpPriority( tp );            //if shutdown then stop            if( tp->shutdown ) {                tp->totalThreads--;                ithread_cond_broadcast( &tp->start_and_shutdown );                ithread_mutex_unlock( &tp->mutex );                return NULL;            } else {                //Pick up persistent job if available                if( tp->persistentJob ) {                    job = tp->persistentJob;                    tp->persistentJob = NULL;                    tp->persistentThreads++;                    persistent = 1;                    ithread_cond_broadcast( &tp->start_and_shutdown );                } else {                    STATSONLY( tp->stats.workerThreads++ );                    persistent = 0;                    //Pick the highest priority job                    if( tp->highJobQ.size > 0 ) {                        head = ListHead( &tp->highJobQ );                        job = ( ThreadPoolJob * ) head->item;                        STATSONLY( CalcWaitTime                                   ( tp, HIGH_PRIORITY, job ) );                        ListDelNode( &tp->highJobQ, head, 0 );                    } else if( tp->medJobQ.size > 0 ) {                        head = ListHead( &tp->medJobQ );                        job = ( ThreadPoolJob * ) head->item;                        STATSONLY( CalcWaitTime( tp, MED_PRIORITY, job ) );                        ListDelNode( &tp->medJobQ, head, 0 );                    } else if( tp->lowJobQ.size > 0 ) {                        head = ListHead( &tp->lowJobQ );                        job = ( ThreadPoolJob * ) head->item;                        STATSONLY( CalcWaitTime( tp, LOW_PRIORITY, job ) );                        ListDelNode( &tp->lowJobQ, head, 0 );                    } else {                        // Should never get here                        assert( 0 );                        STATSONLY( tp->stats.workerThreads-- );                        tp->totalThreads--;                        ithread_cond_broadcast( &tp->start_and_shutdown );                        ithread_mutex_unlock( &tp->mutex );                        return NULL;                    }                }            }            ithread_mutex_unlock( &tp->mutex );            if( SetPriority( job->priority ) != 0 ) {                // In the future can log                // info            } else {                // In the future can log                // info            }            //run the job            job->func( job->arg );            //return to Normal            SetPriority( DEFAULT_PRIORITY );        }    }/**************************************************************************** * Function: CreateThreadPoolJob * *  Description: *      Creates a Thread Pool Job. (Dynamically allocated) *      Internal to thread pool. *  Parameters: *      ThreadPoolJob * job - job is copied *      id - id of job * *  Returns: *      ThreadPoolJob * on success, NULL on failure. *****************************************************************************/    static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob * job,                                               int id,                                               ThreadPool * tp ) {        ThreadPoolJob *newJob = NULL;        assert( job != NULL );        assert( tp != NULL );        newJob = ( ThreadPoolJob * ) FreeListAlloc( &tp->jobFreeList );        if( newJob ) {            ( *newJob ) = ( *job );            newJob->jobId = id;            ftime( &newJob->requestTime );        }        return newJob;    }/**************************************************************************** * Function: CreateWorker * *  Description: *      Creates a worker thread, if the thread pool *      does not already have max threads. *      Internal to thread pool. *  Parameters: *      ThreadPool *tp * *  Returns: *      0 on success, <0 on failure *      EMAXTHREADS if already max threads reached *      EAGAIN if system can not create thread * *****************************************************************************/    static int CreateWorker( ThreadPool * tp ) {        ithread_t temp;        int rc = 0;        int currentThreads = tp->totalThreads + 1;        assert( tp != NULL );        if( ( tp->attr.maxThreads != INFINITE_THREADS )            && ( currentThreads > tp->attr.maxThreads ) ) {            return EMAXTHREADS;        }        rc = ithread_create( &temp, NULL, WorkerThread, tp );        if( rc == 0 ) {            rc = ithread_detach( temp );            while( tp->totalThreads < currentThreads ) {                ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );            }        }        STATSONLY( if( tp->stats.maxThreads < tp->totalThreads ) {                   tp->stats.maxThreads = tp->totalThreads;}         )            return rc;    }/**************************************************************************** * Function: AddWorker * *  Description: *      Determines whether or not a thread should be added *      based on the jobsPerThread ratio. *      Adds a thread if appropriate. *      Internal to Thread Pool. *  Parameters: *      ThreadPool* tp * *****************************************************************************/    static void AddWorker( ThreadPool * tp ) {        int jobs = 0;        int threads = 0;        assert( tp != NULL );        jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;        threads = tp->totalThreads - tp->persistentThreads;        while( ( threads == 0 )               || ( ( jobs / threads ) > tp->attr.jobsPerThread ) ) {            if( CreateWorker( tp ) != 0 )                return;            threads++;        }    }/**************************************************************************** * Function: ThreadPoolInit * *  Description: *      Initializes and starts ThreadPool. Must be called first. *      And only once for ThreadPool. *  Parameters: *      tp  - must be valid, non null, pointer to ThreadPool. *      minWorkerThreads - minimum number of worker threads *                         thread pool will never have less than this *                         number of threads. *      maxWorkerThreads - maximum number of worker threads *                         thread pool will never have more than this *                         number of threads. *      maxIdleTime      - maximum time that a worker thread will spend *                         idle. If a worker is idle longer than this *                         time and there are more than the min *                         number of workers running, than the *                         worker thread exits. *      jobsPerThread    - ratio of jobs to thread to try and maintain *                         if a job is scheduled and the number of jobs per *                         thread is greater than this number,and *                         if less than the maximum number of *                         workers are running then a new thread is *                         started to help out with efficiency. *      schedPolicy      - scheduling policy to try and set (OS dependent) *  Returns: *      0 on success, nonzero on failure. *      EAGAIN if not enough system resources to create minimum threads. *      INVALID_POLICY if schedPolicy can't be set *      EMAXTHREADS if minimum threads is greater than maximum threads *****************************************************************************/    int ThreadPoolInit( ThreadPool * tp,                        ThreadPoolAttr * attr ) {        int retCode = 0;        int i = 0;        assert( tp != NULL );        if( tp == NULL ) {            return EINVAL;        }        retCode += ithread_mutex_init( &tp->mutex, NULL );        assert( retCode == 0 );        retCode += ithread_mutex_lock( &tp->mutex );        assert( retCode == 0 );        retCode += ithread_cond_init( &tp->condition, NULL );        assert( retCode == 0 );        retCode += ithread_cond_init( &tp->start_and_shutdown, NULL );        assert( retCode == 0 );        if( retCode != 0 ) {            return EAGAIN;        }        if( attr ) {            tp->attr = ( *attr );        } else {            TPAttrInit( &tp->attr );        }        if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) {            ithread_mutex_unlock( &tp->mutex );            ithread_mutex_destroy( &tp->mutex );            ithread_cond_destroy( &tp->condition );            ithread_cond_destroy( &tp->start_and_shutdown );            return INVALID_POLICY;        }        retCode += FreeListInit( &tp->jobFreeList, sizeof( ThreadPoolJob ),                                 JOBFREELISTSIZE );        assert( retCode == 0 );        STATSONLY( StatsInit( &tp->stats ) );        retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL );        assert( retCode == 0 );        retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL );        assert( retCode == 0 );        retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL );        assert( retCode == 0 );        if( retCode != 0 ) {            retCode = EAGAIN;        } else {            tp->persistentJob = NULL;            tp->lastJobId = 0;            tp->shutdown = 0;            tp->totalThreads = 0;            tp->persistentThreads = 0;            for( i = 0; i < tp->attr.minThreads; i++ ) {                if( ( retCode = CreateWorker( tp ) ) != 0 ) {                    break;                }            }        }        ithread_mutex_unlock( &tp->mutex );        if( retCode != 0 ) {            //clean up if the min threads could            //not be created            ThreadPoolShutdown( tp );        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -