⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 thread-pool.c

📁 Linux 下实现的线程库源代码
💻 C
字号:
#include "thread-pool.h"

static void *tp_work_thread(void *pthread);
static void *tp_manage_thread(void *pthread);

static TPBOOL tp_init(tp_thread_pool *this);
static void tp_close(tp_thread_pool *this);
static void tp_process_job(tp_thread_pool *this, tp_work *worker, tp_work_desc *job);
static int  tp_get_thread_by_id(tp_thread_pool *this, int id);
static TPBOOL tp_add_thread(tp_thread_pool *this);
static TPBOOL tp_delete_thread(tp_thread_pool *this);
static int  tp_get_tp_status(tp_thread_pool *this);

/**
  * user interface. creat thread pool.
  * para:
  * 	num: min thread number to be created in the pool
  * return:
  * 	thread pool struct instance be created successfully
  */
tp_thread_pool *creat_thread_pool(int min_num, int max_num){
	tp_thread_pool *this;
	this = (tp_thread_pool*)malloc(sizeof(tp_thread_pool));	

	memset(this, 0, sizeof(tp_thread_pool));
	
	//init member function ponter
	this->init = tp_init;
	this->close = tp_close;
	this->process_job = tp_process_job;
	this->get_thread_by_id = tp_get_thread_by_id;
	this->add_thread = tp_add_thread;
	this->delete_thread = tp_delete_thread;
	this->get_tp_status = tp_get_tp_status;

	//init member var
	this->min_th_num = min_num;
	this->cur_th_num = this->min_th_num;
	this->max_th_num = max_num;
	pthread_mutex_init(&this->tp_lock, NULL);

	//malloc mem for num thread info struct
	if(NULL != this->thread_info)
		free(this->thread_info);
	this->thread_info = (tp_thread_info*)malloc(sizeof(tp_thread_info)*this->max_th_num);

	return this;
}


/**
  * member function reality. thread pool init function.
  * para:
  * 	this: thread pool struct instance ponter
  * return:
  * 	true: successful; false: failed
  */
TPBOOL tp_init(tp_thread_pool *this){
	int i;
	int err;
	
	//creat work thread and init work thread info
	for(i=0;i<this->min_th_num;i++){
		pthread_cond_init(&this->thread_info[i].thread_cond, NULL);
		pthread_mutex_init(&this->thread_info[i].thread_lock, NULL);
		
		err = pthread_create(&this->thread_info[i].thread_id, NULL, tp_work_thread, this);
		if(0 != err){
			printf("tp_init: creat work thread failed\n");
			return FALSE;
		}
		printf("tp_init: creat work thread %d\n", this->thread_info[i].thread_id);
	}

	//creat manage thread
	err = pthread_create(&this->manage_thread_id, NULL, tp_manage_thread, this);
	if(0 != err){
		printf("tp_init: creat manage thread failed\n");
		return FALSE;
	}
	printf("tp_init: creat manage thread %d\n", this->manage_thread_id);

	return TRUE;
}

/**
  * member function reality. thread pool entirely close function.
  * para:
  * 	this: thread pool struct instance ponter
  * return:
  */
void tp_close(tp_thread_pool *this){
	int i;
	
	//close work thread
	for(i=0;i<this->cur_th_num;i++){
		kill(this->thread_info[i].thread_id, SIGKILL);
		pthread_mutex_destroy(&this->thread_info[i].thread_lock);
		pthread_cond_destroy(&this->thread_info[i].thread_cond);
		printf("tp_close: kill work thread %d\n", this->thread_info[i].thread_id);
	}

	//close manage thread
	kill(this->manage_thread_id, SIGKILL);
	pthread_mutex_destroy(&this->tp_lock);
	printf("tp_close: kill manage thread %d\n", this->manage_thread_id);
	
	//free thread struct
	free(this->thread_info);
}

/**
  * member function reality. main interface opened. 
  * after getting own worker and job, user may use the function to process the task.
  * para:
  * 	this: thread pool struct instance ponter
  *	worker: user task reality.
  *	job: user task para
  * return:
  */
void tp_process_job(tp_thread_pool *this, tp_work *worker, tp_work_desc *job){
	int i;
	int tmpid;

	//fill this->thread_info's relative work key
	for(i=0;i<this->cur_th_num;i++){
		pthread_mutex_lock(&this->thread_info[i].thread_lock);
		if(!this->thread_info[i].is_busy){
			printf("tp_process_job: %d thread idle, thread id is %d\n", i, this->thread_info[i].thread_id);
			//thread state be set busy before work
		  	this->thread_info[i].is_busy = TRUE;
			pthread_mutex_unlock(&this->thread_info[i].thread_lock);
			
			this->thread_info[i].th_work = worker;
			this->thread_info[i].th_job = job;
			
			printf("tp_process_job: informing idle working thread %d, thread id is %d\n", i, this->thread_info[i].thread_id);
			pthread_cond_signal(&this->thread_info[i].thread_cond);

			return;
		}
		else 
			pthread_mutex_unlock(&this->thread_info[i].thread_lock);		
	}//end of for

	//if all current thread are busy, new thread is created here
	pthread_mutex_lock(&this->tp_lock);
	if( this->add_thread(this) ){
		i = this->cur_th_num - 1;
		tmpid = this->thread_info[i].thread_id;
		this->thread_info[i].th_work = worker;
		this->thread_info[i].th_job = job;
	}
	pthread_mutex_unlock(&this->tp_lock);
	
	//send cond to work thread
	printf("tp_process_job: informing idle working thread %d, thread id is %d\n", i, this->thread_info[i].thread_id);
	pthread_cond_signal(&this->thread_info[i].thread_cond);
	return;	
}

/**
  * member function reality. get real thread by thread id num.
  * para:
  * 	this: thread pool struct instance ponter
  *	id: thread id num
  * return:
  * 	seq num in thread info struct array
  */
int tp_get_thread_by_id(tp_thread_pool *this, int id){
	int i;

	for(i=0;i<this->cur_th_num;i++){
		if(id == this->thread_info[i].thread_id)
			return i;
	}

	return -1;
}

/**
  * member function reality. add new thread into the pool.
  * para:
  * 	this: thread pool struct instance ponter
  * return:
  * 	true: successful; false: failed
  */
static TPBOOL tp_add_thread(tp_thread_pool *this){
	int err;
	tp_thread_info *new_thread;
	
	if( this->max_th_num <= this->cur_th_num )
		return FALSE;
		
	//malloc new thread info struct
	new_thread = &this->thread_info[this->cur_th_num];
	
	//init new thread's cond & mutex
	pthread_cond_init(&new_thread->thread_cond, NULL);
	pthread_mutex_init(&new_thread->thread_lock, NULL);

	//init status is busy
	new_thread->is_busy = TRUE;

	//add current thread number in the pool.
	this->cur_th_num++;
	
	err = pthread_create(&new_thread->thread_id, NULL, tp_work_thread, this);
	if(0 != err){
		free(new_thread);
		return FALSE;
	}
	printf("tp_add_thread: creat work thread %d\n", this->thread_info[this->cur_th_num-1].thread_id);
	
	return TRUE;
}

/**
  * member function reality. delete idle thread in the pool.
  * only delete last idle thread in the pool.
  * para:
  * 	this: thread pool struct instance ponter
  * return:
  * 	true: successful; false: failed
  */
static TPBOOL tp_delete_thread(tp_thread_pool *this){
	//current thread num can't < min thread num
	if(this->cur_th_num <= this->min_th_num) return FALSE;

	//if last thread is busy, do nothing
	if(this->thread_info[this->cur_th_num-1].is_busy) return FALSE;

	//kill the idle thread and free info struct
	kill(this->thread_info[this->cur_th_num-1].thread_id, SIGKILL);
	pthread_mutex_destroy(&this->thread_info[this->cur_th_num-1].thread_lock);
	pthread_cond_destroy(&this->thread_info[this->cur_th_num-1].thread_cond);

	//after deleting idle thread, current thread num -1
	this->cur_th_num--;

	return TRUE;
}

/**
  * member function reality. get current thread pool status:idle, normal, busy, .etc.
  * para:
  * 	this: thread pool struct instance ponter
  * return:
  * 	0: idle; 1: normal or busy(don't process)
  */
static int  tp_get_tp_status(tp_thread_pool *this){
	float busy_num = 0.0;
	int i;

	//get busy thread number
	for(i=0;i<this->cur_th_num;i++){
		if(this->thread_info[i].is_busy)
			busy_num++;
	}

	//0.2? or other num?
	if(busy_num/(this->cur_th_num) < BUSY_THRESHOLD)
		return 0;//idle status
	else
		return 1;//busy or normal status	
}

/**
  * internal interface. real work thread.
  * para:
  * 	pthread: thread pool struct ponter
  * return:
  */
static void *tp_work_thread(void *pthread){
	pthread_t curid;//current thread id
	int nseq;//current thread seq in the this->thread_info array
	tp_thread_pool *this = (tp_thread_pool*)pthread;//main thread pool struct instance

	//get current thread id
	curid = pthread_self();
	
	//get current thread's seq in the thread info struct array.
	nseq = this->get_thread_by_id(this, curid);
	if(nseq < 0)
		return;
	printf("entering working thread %d, thread id is %d\n", nseq, curid);

	//wait cond for processing real job.
	while( TRUE ){
		pthread_mutex_lock(&this->thread_info[nseq].thread_lock);
		pthread_cond_wait(&this->thread_info[nseq].thread_cond, &this->thread_info[nseq].thread_lock);
		pthread_mutex_unlock(&this->thread_info[nseq].thread_lock);		
		
		printf("%d thread do work!\n", pthread_self());

		tp_work *work = this->thread_info[nseq].th_work;
		tp_work_desc *job = this->thread_info[nseq].th_job;

		//process
		work->process_job(work, job);

		//thread state be set idle after work
		pthread_mutex_lock(&this->thread_info[nseq].thread_lock);		
		this->thread_info[nseq].is_busy = FALSE;
		pthread_mutex_unlock(&this->thread_info[nseq].thread_lock);
		
		printf("%d do work over\n", pthread_self());
	}	
}

/**
  * internal interface. manage thread pool to delete idle thread.
  * para:
  * 	pthread: thread pool struct ponter
  * return:
  */
static void *tp_manage_thread(void *pthread){
	tp_thread_pool *this = (tp_thread_pool*)pthread;//main thread pool struct instance

	//1?
	sleep(MANAGE_INTERVAL);

	do{
		if( this->get_tp_status(this) == 0 ){
			do{
				if( !this->delete_thread(this) )
					break;
			}while(TRUE);
		}//end for if

		//1?
		sleep(MANAGE_INTERVAL);
	}while(TRUE);
}
	

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -