⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tpool.c

📁 linux thread programe
💻 C
字号:
/******************************************************** * An example source module to accompany... * * "Using POSIX Threads: Programming with Pthreads" *     by Brad nichols, Dick Buttlar, Jackie Farrell *     O'Reilly & Associates, Inc. * ******************************************************** * tpool.c --  *  * Example thread pooling library */#include <stdlib.h>#include <stdio.h>#include <unistd.h>#include <sys/types.h>#include <string.h>#include <pthread.h>#include "tpool.h"void *tpool_thread(void *);void tpool_init(tpool_t   *tpoolp,		int       num_worker_threads, 		int       max_queue_size,		int       do_not_block_when_full){  int i, rtn;  tpool_t tpool;     /* allocate a pool data structure */   if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) == NULL)    perror("malloc"), exit(1);  /* initialize th fields */  tpool->num_threads = num_worker_threads;  tpool->max_queue_size = max_queue_size;  tpool->do_not_block_when_full = do_not_block_when_full;  if ((tpool->threads =        (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads))       == NULL)    perror("malloc"), exit(1);  tpool->cur_queue_size = 0;  tpool->queue_head = NULL;   tpool->queue_tail = NULL;  tpool->queue_closed = 0;    tpool->shutdown = 0;   if ((rtn = pthread_mutex_init(&(tpool->queue_lock), NULL)) != 0)    fprintf(stderr,"pthread_mutex_init %s",strerror(rtn)), exit(1);  if ((rtn = pthread_cond_init(&(tpool->queue_not_empty), NULL)) != 0)    fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(1);  if ((rtn = pthread_cond_init(&(tpool->queue_not_full), NULL)) != 0)    fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(1);  if ((rtn = pthread_cond_init(&(tpool->queue_empty), NULL)) != 0)    fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(1);  /* create threads */  for (i = 0; i != num_worker_threads; i++) {    if ((rtn = pthread_create( &(tpool->threads[i]),			      NULL,			      tpool_thread,			      (void *)tpool)) != 0)      fprintf(stderr,"pthread_create %d",rtn), exit(1);  }  *tpoolp = tpool;}int tpool_add_work(		   tpool_t          tpool,		   void             (*routine)(void *),		   void             *arg){  int rtn;  tpool_work_t *workp;  if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)    fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);  /* no space and this caller doesn't want to wait */  if ((tpool->cur_queue_size == tpool->max_queue_size) &&      tpool->do_not_block_when_full) {    if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)      fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);    return -1;  }  while( (tpool->cur_queue_size == tpool->max_queue_size) &&	(!(tpool->shutdown || tpool->queue_closed))  ) {    if ((rtn = pthread_cond_wait(&(tpool->queue_not_full),				 &(tpool->queue_lock))) != 0)      fprintf(stderr,"pthread_cond_wait %d",rtn), exit(1);  }  /* the pool is in the process of being destroyed */  if (tpool->shutdown || tpool->queue_closed) {    if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)      fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);     return -1;  }  /* allocate work structure */  if ((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)    perror("malloc"), exit(1);  workp->routine = routine;  workp->arg = arg;  workp->next = NULL;  printf("adder: adding an item %d\n", workp->routine);  if (tpool->cur_queue_size == 0) {    tpool->queue_tail = tpool->queue_head = workp;     printf("adder: queue == 0, waking all workers\n");    if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)      fprintf(stderr,"pthread_cond_signal %d",rtn), exit(1);;  } else {    tpool->queue_tail->next = workp;    tpool->queue_tail = workp;  }  tpool->cur_queue_size++;   if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)    fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);  return 1;}int tpool_destroy(tpool_t          tpool,		  int              finish){  int          i,rtn;  tpool_work_t *cur_nodep;    if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)    fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);  /* Is a shutdown already in progress? */  if (tpool->queue_closed || tpool->shutdown) {    if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)      fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);    return 0;  }  tpool->queue_closed = 1;  /* If the finish flag is set, wait for workers to      drain queue */   if (finish == 1) {    while (tpool->cur_queue_size != 0) {      if ((rtn = pthread_cond_wait(&(tpool->queue_empty),				   &(tpool->queue_lock))) != 0)	fprintf(stderr,"pthread_cond_wait %d",rtn), exit(1);    }  }  tpool->shutdown = 1;  if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)    fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);  /* Wake up any workers so they recheck shutdown flag */  if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)    fprintf(stderr,"pthread_cond_broadcast %d",rtn), exit(1);  if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)    fprintf(stderr,"pthread_cond_broadcast %d",rtn), exit(1);  /* Wait for workers to exit */  for(i=0; i < tpool->num_threads; i++) {    if ((rtn = pthread_join(tpool->threads[i],NULL)) != 0)      fprintf(stderr,"pthread_join %d",rtn), exit(1);  }  /* Now free pool structures */  free(tpool->threads);  while(tpool->queue_head != NULL) {    cur_nodep = tpool->queue_head->next;     tpool->queue_head = tpool->queue_head->next;    free(cur_nodep);  }  free(tpool); }void *tpool_thread(void *arg){  tpool_t tpool = (tpool_t)arg;   int rtn;  tpool_work_t	*my_workp;	  for(;;) {    /* Check queue for work */     if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)      fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);    while ((tpool->cur_queue_size == 0) && (!tpool->shutdown)) {      printf("worker %d: I'm sleeping again\n", pthread_self());      if ((rtn = pthread_cond_wait(&(tpool->queue_not_empty),				   &(tpool->queue_lock))) != 0)	fprintf(stderr,"pthread_cond_wait %d",rtn), exit(1);    }    sleep(5);      printf("worker %d: I'm awake\n", pthread_self());    /* Has a shutdown started while i was sleeping? */    if (tpool->shutdown == 1) {      if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)	fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);      pthread_exit(NULL);    }    /* Get to work, dequeue the next item */     my_workp = tpool->queue_head;    tpool->cur_queue_size--;    if (tpool->cur_queue_size == 0)      tpool->queue_head = tpool->queue_tail = NULL;    else      tpool->queue_head = my_workp->next;     printf("worker %d: dequeing item %d\n", pthread_self(), my_workp->next);    /* Handle waiting add_work threads */    if ((!tpool->do_not_block_when_full) &&	(tpool->cur_queue_size ==  (tpool->max_queue_size - 1)))       if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)	fprintf(stderr,"pthread_cond_broadcast %d",rtn), exit(1);    /* Handle waiting destroyer threads */    if (tpool->cur_queue_size == 0)      if ((rtn = pthread_cond_signal(&(tpool->queue_empty))) != 0)	fprintf(stderr,"pthread_cond_signal %d",rtn), exit(1);    if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)      fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);          /* Do this work item */    (*(my_workp->routine))(my_workp->arg);    free(my_workp);  }   return(NULL);            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -