⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tpool.cpp.svn-base

📁 sigmadesign smp8623 gui source code ,bingo
💻 SVN-BASE
字号:
/* ------------------------------------------------------------------------- * tpool.cpp - thread pool functions * Copyright (C) 2008 Dimitar Atanasov <datanasov@deisytechbg.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA * ------------------------------------------------------------------------- *//* * most of the theory and implementation of the thread pool was taken * from the o'reilly pthreads programming book. */#include  <stdio.h>#include  <stdlib.h>#include  <string.h> /* strerror() */#include  <pthread.h>#include "common.h"#include "tpool.h"#include "log.h"extern log_t *logn;/* the worker thread */void *tpool_thread(void *tpool);tpool_t::tpool_t(int threads_count, int queue){    int i, rtn;    /* set the desired thread pool values */    num_threads = threads_count;    max_queue_size = queue;    do_not_block_when_full = POOL_BLOCK_ONFULL;    if((threads = new pthread_t[num_threads]) == NULL)    {        logger( FATAL,"Unable to allocate thread info array\n");    }    /* initialize the work queue */    cur_queue_size = 0;    queue_head = NULL;    queue_tail = NULL;    queue_closed = 0;    shutdown = 0;    threads_awake = 0;    /* create the mutexs and cond vars */    if((rtn = pthread_mutex_init(&(queue_lock),NULL)) != 0) {        logger(FATAL,"pthread_mutex_init %s",strerror(rtn));    }    if((rtn = pthread_cond_init(&(queue_not_empty),NULL)) != 0) {        logger(FATAL,"pthread_cond_init %s",strerror(rtn));    }    if((rtn = pthread_cond_init(&(queue_not_full),NULL)) != 0) {        logger(FATAL,"pthread_cond_init %s",strerror(rtn));    }    if((rtn = pthread_cond_init(&(queue_empty),NULL)) != 0) {        logger(FATAL,"pthread_cond_init %s",strerror(rtn));    }    if((rtn = pthread_cond_init(&(all_work_done),NULL)) != 0) {		logger(FATAL,"pthread_cond_init %s",strerror(rtn));	}    /**     * from "man 3c pthread_attr_init"     * Define the scheduling contention scope for the created thread.  The only     * value     supported    in    the    LinuxThreads    implementation    is     * !PTHREAD_SCOPE_SYSTEM!, meaning that the threads contend  for  CPU  time     * with all processes running on the machine.     *     * so no need to explicitly set the SCOPE     */    /* create the individual worker threads */    for(i = 0; i < num_threads; i++)    {        if( (rtn=pthread_create(&(threads[i]),NULL, tpool_thread,(void*)this)) != 0)        {            logger(FATAL,"pthread_create %s\n",strerror(rtn)), exit(1);        }        threads_awake++;    }}int tpool_t::tpool_add_work(void (*routine)(void*), void *arg){    int rtn;    tpool_work_t *workp;    if((rtn = pthread_mutex_lock(&queue_lock)) != 0)    {        logger(FATAL,"pthread mutex lock failure\n");        exit(1);    }    /* now we have exclusive access to the work queue ! */    if((cur_queue_size == max_queue_size) && (do_not_block_when_full))    {        if((rtn = pthread_mutex_unlock(&queue_lock)) != 0)        {            logger(FATAL,"pthread mutex lock failure\n");            exit(1);        }        return -1;    }    /* wait for the queue to have an open space for new work, while     * waiting the queue_lock will be released */    while((cur_queue_size == max_queue_size) && (!(shutdown || queue_closed)))    {        if((rtn = pthread_cond_wait(&(queue_not_full), &(queue_lock)) ) != 0)        {            logger(FATAL,"pthread cond wait failure\n");            exit(1);        }    }    if(shutdown || queue_closed)    {        if((rtn = pthread_mutex_unlock(&queue_lock)) != 0)        {            logger(FATAL,"pthread mutex lock failure\n");            exit(1);        }        return -1;    }    /* allocate the work structure */    if((workp = new tpool_work_t) == NULL)    {        logger(FATAL,"unable to create work struct\n");        exit(1);    }    /* set the function/routine which will handle the work,     * (note: it must be reenterant) */    workp->handler_routine = routine;    workp->arg = arg;    workp->next = NULL;    if(cur_queue_size == 0)    {        queue_tail = queue_head = workp;        if((rtn = pthread_cond_broadcast(&(queue_not_empty))) != 0)        {            logger(FATAL,"pthread broadcast error\n");            exit(1);        }    }    else    {        queue_tail->next = workp;        queue_tail = workp;    }    cur_queue_size++;    /* relinquish control of the queue */    if((rtn = pthread_mutex_unlock(&queue_lock)) != 0)    {        logger(FATAL,"pthread mutex lock failure\n");        exit(1);    }    return 1;}tpool_t::~tpool_t(){    int i, rtn;    tpool_work_t *cur;    /* relinquish control of the queue */    if((rtn = pthread_mutex_lock(&(queue_lock))) != 0)    {        logger(FATAL,"pthread mutex lock failure\n");        exit(1);    }    /* is a shutdown already going on ? */    if(queue_closed || shutdown)    {        if((rtn = pthread_mutex_unlock(&(queue_lock))) != 0)        {            logger(FATAL,"pthread mutex lock failure\n");            exit(1);        }        return;    }    /* close the queue to any new work */    queue_closed = 1;    /* if the finish flag is set, drain the queue */    if(POOL_THREADS_FINISH)    {        while(cur_queue_size != 0)        {            /* wait for the queue to become empty,             * while waiting queue lock will be released */            if((rtn = pthread_cond_wait(&(queue_empty), &(queue_lock))) != 0)            {                logger(FATAL,"pthread_cond_wait %d\n",rtn),exit(1);            }        }    }    /* set the shutdown flag */    shutdown = 1;    if((rtn = pthread_mutex_unlock(&(queue_lock))) != 0)    {        logger(FATAL,"pthread mutex unlock failure\n"),exit(1);    }    /* wake up all workers to rechedk the shutdown flag */    if((rtn = pthread_cond_broadcast(&(queue_not_empty))) != 0)    {        logger(FATAL,"pthread_cond_boradcast %d\n",rtn),exit(1);    }    if((rtn = pthread_cond_broadcast(&(queue_not_full))) != 0)    {        logger(FATAL,"pthread_cond_boradcast %d\n",rtn),exit(1);    }    /* wait for workers to exit */    for(i = 0; i < num_threads; i++)    {        if((rtn = pthread_join(threads[i],NULL)) != 0)        {            logger(FATAL,"pthread_join error %d %d\n",rtn, i), exit(1);        }    }    /* clean up memory */ 	delete[] threads;    while(queue_head != NULL)    {        cur = queue_head->next;        queue_head = queue_head->next;		delete cur;    }}void *tpool_thread(void *tpool){    tpool_work_t *my_work;    tpool_t *pool = (tpool_t *)tpool;    sigset_t mask;    sigfillset(&mask); /* Mask all allowed signals */    pthread_sigmask(SIG_BLOCK, &mask, NULL);    for(;;) /* go forever */    {        pthread_mutex_lock(&(pool->queue_lock));        /* sleep until there is work,         * while asleep the queue_lock is relinquished */        while((pool->cur_queue_size == 0) && (!pool->shutdown))        {        	pool->threads_awake --;        	if(pool->threads_awake == 0 && (!pool->shutdown))        		pthread_cond_signal(&(pool->all_work_done));            pthread_cond_wait(&(pool->queue_not_empty), &(pool->queue_lock));            pool->threads_awake ++;        }        /* are we shutting down ? */        if(pool->shutdown)        {        	pool->threads_awake --;            pthread_mutex_unlock(&(pool->queue_lock));            pthread_exit(NULL);        }        /* process the work */        my_work = pool->queue_head;        pool->cur_queue_size--;        if(pool->cur_queue_size == 0)            pool->queue_head = pool->queue_tail = NULL;        else            pool->queue_head = my_work->next;        /* broadcast that the queue is not full */        if((!pool->do_not_block_when_full) &&                (pool->cur_queue_size == (pool->max_queue_size - 1)))        {            pthread_cond_broadcast(&(pool->queue_not_full));        }        if(pool->cur_queue_size == 0)        {            pthread_cond_signal(&(pool->queue_empty));        }        pthread_mutex_unlock(&(pool->queue_lock));        logger(INFO,"pool %p threads awake %d\n", pool, pool->threads_awake);        /* perform the work */        (*(my_work->handler_routine))(my_work->arg);        logger(INFO,"pool %p threads done\n", pool); 		delete my_work;    }    return(NULL);}mysqlpp::Connection* DBPool::create(){	 logger(INFO,"Create DB conn %d\n",size() + 1);	return new mysqlpp::Connection(ctxt.conf->database.c_str(),				ctxt.conf->hostname.c_str(), ctxt.conf->username.c_str(), ctxt.conf->password.c_str());}void DBPool::destroy(mysqlpp::Connection* con){	logger(INFO,"Destroy DB conn %d\n",size());	delete con;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -