📄 threadpool.c
字号:
ThreadPoolAttr temp; int i = 0; assert( tp != NULL ); if( tp == NULL ) { return EINVAL; } ithread_mutex_lock( &tp->mutex ); if( attr != NULL ) { temp = ( *attr ); } else { TPAttrInit( &temp ); } if( SetPolicyType( temp.schedPolicy ) != 0 ) { ithread_mutex_unlock( &tp->mutex ); return INVALID_POLICY; } tp->attr = ( temp ); // add threads if( tp->totalThreads < tp->attr.minThreads ) { for( i = tp->totalThreads; i < tp->attr.minThreads; i++ ) { if( ( retCode = CreateWorker( tp ) ) != 0 ) { break; } } } // signal changes ithread_cond_signal( &tp->condition ); ithread_mutex_unlock( &tp->mutex ); if( retCode != 0 ) { // clean up if the min threads could not be created ThreadPoolShutdown( tp ); } return retCode;}/**************************************************************************** * Function: ThreadPoolShutdown * * Description: * Shuts the thread pool down. * Waits for all threads to finish. * May block indefinitely if jobs do not * exit. * Parameters: * tp - must be valid tp * Returns: * 0 on success, nonzero on failure * Always returns 0. *****************************************************************************/int ThreadPoolShutdown( ThreadPool *tp ){ ListNode *head = NULL; ThreadPoolJob *temp = NULL; assert( tp != NULL ); if( tp == NULL ) { return EINVAL; } ithread_mutex_lock( &tp->mutex ); // clean up high priority jobs while( tp->highJobQ.size ) { head = ListHead( &tp->highJobQ ); temp = ( ThreadPoolJob *) head->item; if( temp->free_func ) { temp->free_func( temp->arg ); } FreeThreadPoolJob( tp, temp ); ListDelNode( &tp->highJobQ, head, 0 ); } ListDestroy( &tp->highJobQ, 0 ); // clean up med priority jobs while( tp->medJobQ.size ) { head = ListHead( &tp->medJobQ ); temp = ( ThreadPoolJob *) head->item; if( temp->free_func ) { temp->free_func( temp->arg ); } FreeThreadPoolJob( tp, temp ); ListDelNode( &tp->medJobQ, head, 0 ); } ListDestroy( &tp->medJobQ, 0 ); // clean up low priority jobs while( tp->lowJobQ.size ) { head = ListHead( &tp->lowJobQ ); temp = ( ThreadPoolJob *) head->item; if( temp->free_func ) { temp->free_func( temp->arg ); } FreeThreadPoolJob( tp, temp ); ListDelNode( &tp->lowJobQ, head, 0 ); } ListDestroy( &tp->lowJobQ, 0 ); // clean up long term job if( tp->persistentJob ) { temp = tp->persistentJob; if( temp->free_func ) { temp->free_func( temp->arg ); } FreeThreadPoolJob( tp, temp ); tp->persistentJob = NULL; } // signal shutdown tp->shutdown = 1; ithread_cond_broadcast( &tp->condition ); // wait for all threads to finish while( tp->totalThreads > 0 ) { ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex ); } // destroy condition while( ithread_cond_destroy( &tp->condition ) != 0 ) { } while( ithread_cond_destroy( &tp->start_and_shutdown ) != 0 ) { } FreeListDestroy( &tp->jobFreeList ); ithread_mutex_unlock( &tp->mutex ); // destroy mutex while( ithread_mutex_destroy( &tp->mutex ) != 0 ) { } return 0;}/**************************************************************************** * Function: TPAttrInit * * Description: * Initializes thread pool attributes. * Sets values to defaults defined in ThreadPool.h. * Parameters: * attr - must be valid thread pool attributes. * Returns: * Always returns 0. *****************************************************************************/int TPAttrInit( ThreadPoolAttr *attr ){ assert( attr != NULL ); if( attr == NULL ) { return EINVAL; } attr->jobsPerThread = DEFAULT_JOBS_PER_THREAD; attr->maxIdleTime = DEFAULT_IDLE_TIME; attr->maxThreads = DEFAULT_MAX_THREADS; attr->minThreads = DEFAULT_MIN_THREADS; attr->schedPolicy = DEFAULT_POLICY; attr->starvationTime = DEFAULT_STARVATION_TIME; attr->maxJobsTotal = DEFAULT_MAX_JOBS_TOTAL; return 0;}/**************************************************************************** * Function: TPJobInit * * Description: * Initializes thread pool job. * Sets the priority to default defined in ThreadPool.h. * Sets the free_routine to default defined in ThreadPool.h * Parameters: * ThreadPoolJob *job - must be valid thread pool attributes. * start_routine func - function to run, must be valid * void * arg - argument to pass to function. * Returns: * Always returns 0. *****************************************************************************/int TPJobInit( ThreadPoolJob *job, start_routine func, void *arg ){ assert( job != NULL ); assert( func != NULL ); if( job == NULL || func == NULL ) { return EINVAL; } job->func = func; job->arg = arg; job->priority = DEFAULT_PRIORITY; job->free_func = DEFAULT_FREE_ROUTINE; return 0;}/**************************************************************************** * Function: TPJobSetPriority * * Description: * Sets the max threads for the thread pool attributes. * Parameters: * attr - must be valid thread pool attributes. * maxThreads - value to set * Returns: * Returns 0 on success nonzero on failure. * Returns EINVAL if invalid priority. *****************************************************************************/int TPJobSetPriority(ThreadPoolJob *job, ThreadPriority priority ){ assert( job != NULL ); if( job == NULL ) { return EINVAL; } if( priority == LOW_PRIORITY || priority == MED_PRIORITY || priority == HIGH_PRIORITY ) { job->priority = priority; return 0; } else { return EINVAL; }}/**************************************************************************** * Function: TPJobSetFreeFunction * * Description: * Sets the max threads for the thread pool attributes. * Parameters: * attr - must be valid thread pool attributes. * maxThreads - value to set * Returns: * Always returns 0. *****************************************************************************/int TPJobSetFreeFunction( ThreadPoolJob *job, free_routine func ){ assert( job != NULL ); if( job == NULL ) { return EINVAL; } job->free_func = func; return 0;}/**************************************************************************** * Function: TPAttrSetMaxThreads * * Description: * Sets the max threads for the thread pool attributes. * Parameters: * attr - must be valid thread pool attributes. * maxThreads - value to set * Returns: * Always returns 0. *****************************************************************************/int TPAttrSetMaxThreads( ThreadPoolAttr *attr, int maxThreads ){ assert( attr != NULL ); if( attr == NULL ) { return EINVAL; } attr->maxThreads = maxThreads; return 0;}/**************************************************************************** * Function: TPAttrSetMinThreads * * Description: * Sets the min threads for the thread pool attributes. * Parameters: * attr - must be valid thread pool attributes. * minThreads - value to set * Returns: * Always returns 0. *****************************************************************************/int TPAttrSetMinThreads( ThreadPoolAttr *attr, int minThreads ){ assert( attr != NULL ); if( attr == NULL ) { return EINVAL; } attr->minThreads = minThreads; return 0;}/**************************************************************************** * Function: TPAttrSetIdleTime * * Description: * Sets the idle time for the thread pool attributes. * Parameters: * attr - must be valid thread pool attributes. * Returns: * Always returns 0. *****************************************************************************/int TPAttrSetIdleTime( ThreadPoolAttr *attr, int idleTime ){ assert( attr != NULL ); if( attr == NULL ) { return EINVAL; } attr->maxIdleTime = idleTime; return 0;}/**************************************************************************** * Function: TPAttrSetJobsPerThread * * Description: * Sets the max thre * Parameters: * attr - must be valid thread pool attributes. * Returns: * Always returns 0. *****************************************************************************/int TPAttrSetJobsPerThread( ThreadPoolAttr *attr, int jobsPerThread ){ assert( attr != NULL ); if( attr == NULL ) { return EINVAL; } attr->jobsPerThread = jobsPerThread; return 0;}/**************************************************************************** * Function: TPAttrSetStarvationTime * * Description: * Sets the starvation time for the thread pool attributes. * Parameters: * attr - must be valid thread pool attributes. * Returns: * Always returns 0. *****************************************************************************/int TPAttrSetStarvationTime( ThreadPoolAttr *attr, int starvationTime ){ assert( attr != NULL ); if( attr == NULL ) { return EINVAL; } attr->starvationTime = starvationTime; return 0;}/**************************************************************************** * Function: TPAttrSetSchedPolicy * * Description: * Sets the scheduling policy for the thread pool attributes. * Parameters: * attr - must be valid thread pool attributes. * PolicyType schedPolicy - must be a valid policy type. * Returns: * Always returns 0. *****************************************************************************/int TPAttrSetSchedPolicy( ThreadPoolAttr *attr, PolicyType schedPolicy ){ assert( attr != NULL ); if( attr == NULL ) { return EINVAL; } attr->schedPolicy = schedPolicy; return 0;}/**************************************************************************** * Function: TPAttrSetMaxJobsTotal * * Description: * Sets the maximum number jobs that can be qeued totally. * Parameters: * attr - must be valid thread pool attributes. * maxJobsTotal - maximum number of jobs * Returns: * Always returns 0. *****************************************************************************/int TPAttrSetMaxJobsTotal( ThreadPoolAttr *attr, int maxJobsTotal ){ assert( attr != NULL ); if( attr == NULL ) { return EINVAL; } attr->maxJobsTotal = maxJobsTotal; return 0;}#ifdef STATSvoid ThreadPoolPrintStats(ThreadPoolStats *stats){ assert( stats != NULL ); if (stats == NULL) { return; }#ifdef __FreeBSD__ printf("ThreadPoolStats at Time: %d\n", StatsTime(NULL));#else /* __FreeBSD__ */ printf("ThreadPoolStats at Time: %ld\n", StatsTime(NULL));#endif /* __FreeBSD__ */ printf("High Jobs pending: %d\n", stats->currentJobsHQ); printf("Med Jobs Pending: %d\n", stats->currentJobsMQ); printf("Low Jobs Pending: %d\n", stats->currentJobsLQ); printf("Average Wait in High Priority Q in milliseconds: %f\n", stats->avgWaitHQ); printf("Average Wait in Med Priority Q in milliseconds: %f\n", stats->avgWaitMQ); printf("Averate Wait in Low Priority Q in milliseconds: %f\n", stats->avgWaitLQ); printf("Max Threads Active: %d\n", stats->maxThreads); printf("Current Worker Threads: %d\n", stats->workerThreads); printf("Current Persistent Threads: %d\n", stats->persistentThreads); printf("Current Idle Threads: %d\n", stats->idleThreads); printf("Total Threads : %d\n", stats->totalThreads); printf("Total Time spent Working in seconds: %f\n", stats->totalWorkTime); printf("Total Time spent Idle in seconds : %f\n", stats->totalIdleTime);}#endif /* STATS *//**************************************************************************** * Function: ThreadPoolGetStats * * Description: * Returns various statistics about the * thread pool. * Only valid if STATS has been defined. * Parameters: * ThreadPool *tp - valid initialized threadpool * ThreadPoolStats *stats - valid stats, out parameter * Returns: * Always returns 0. *****************************************************************************/#ifdef STATSint ThreadPoolGetStats( ThreadPool *tp, ThreadPoolStats *stats ){ assert(tp != NULL); assert(stats != NULL); if (tp == NULL || stats == NULL) { return EINVAL; } //if not shutdown then acquire mutex if (!tp->shutdown) { ithread_mutex_lock(&tp->mutex); } *stats = tp->stats; if (stats->totalJobsHQ > 0) { stats->avgWaitHQ = stats->totalTimeHQ / stats->totalJobsHQ; } else { stats->avgWaitHQ = 0; } if( stats->totalJobsMQ > 0 ) { stats->avgWaitMQ = stats->totalTimeMQ / stats->totalJobsMQ; } else { stats->avgWaitMQ = 0; } if( stats->totalJobsLQ > 0 ) { stats->avgWaitLQ = stats->totalTimeLQ / stats->totalJobsLQ; } else { stats->avgWaitLQ = 0; } stats->totalThreads = tp->totalThreads; stats->persistentThreads = tp->persistentThreads; stats->currentJobsHQ = ListSize( &tp->highJobQ ); stats->currentJobsLQ = ListSize( &tp->lowJobQ ); stats->currentJobsMQ = ListSize( &tp->medJobQ ); //if not shutdown then release mutex if( !tp->shutdown ) { ithread_mutex_unlock( &tp->mutex ); } return 0;}#endif /* STATS */#ifdef WIN32#if defined(_MSC_VER) || defined(_MSC_EXTENSIONS) #define DELTA_EPOCH_IN_MICROSECS 11644473600000000Ui64#else #define DELTA_EPOCH_IN_MICROSECS 11644473600000000ULL#endif int gettimeofday(struct timeval *tv, struct timezone *tz){ FILETIME ft; unsigned __int64 tmpres = 0; static int tzflag; if (NULL != tv) { GetSystemTimeAsFileTime(&ft); tmpres |= ft.dwHighDateTime; tmpres <<= 32; tmpres |= ft.dwLowDateTime; /*converting file time to unix epoch*/ tmpres /= 10; /*convert into microseconds*/ tmpres -= DELTA_EPOCH_IN_MICROSECS; tv->tv_sec = (long)(tmpres / 1000000UL); tv->tv_usec = (long)(tmpres % 1000000UL); } if (NULL != tz) { if (!tzflag) { _tzset(); tzflag++; } tz->tz_minuteswest = _timezone / 60; tz->tz_dsttime = _daylight; } return 0;}#endif /* WIN32 */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -