📄 vthreadpool.cxx
字号:
/* ==================================================================== * The Vovida Software License, Version 1.0 * * Copyright (c) 2000 Vovida Networks, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The names "VOCAL", "Vovida Open Communication Application Library", * and "Vovida Open Communication Application Library (VOCAL)" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact vocal@vovida.org. * * 4. Products derived from this software may not be called "VOCAL", nor * may "VOCAL" appear in their name, without prior written * permission of Vovida Networks, Inc. * * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH * DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by Vovida * Networks, Inc. and many individuals on behalf of Vovida Networks, * Inc. For more information on Vovida Networks, Inc., please see * <http://www.vovida.org/>. * */static const char* const VThreadPool_cxx_Version = "$Id: VThreadPool.cxx,v 1.4 2001/08/10 04:02:09 icahoon Exp $";#ifndef __vxworks#include "global.h"#include <cerrno>#include <cstdio>#include "VThreadPool.hxx"#include "cpLog.h"extern "C" {static void* workerProc(void* args){ return VThreadPool::workerProc(args);}}VThreadPool::VThreadPool(int numThreads) : numThreads_(numThreads), queueClosed_(0), shutdown_(0), currQueueSize_(0){ init();}voidVThreadPool::init() throw (VThreadInitException&){ int ret; char buf[256]; if ((ret = pthread_mutex_init(&queueLock_, 0)) != 0) { sprintf(buf, "pthread_mutext_init %s", strerror(errno)); cpLog(LOG_ALERT, buf); throw VThreadInitException(buf, __FILE__, __LINE__, errno); } if ((ret = pthread_cond_init(&queueNotEmpty_, 0)) != 0) { sprintf(buf, "pthread_cond_init %s", strerror(errno)); cpLog(LOG_ALERT, buf); throw VThreadInitException(buf, __FILE__, __LINE__, errno); } if ((ret = pthread_cond_init(&queueNotFull_, 0)) != 0) { sprintf(buf, "pthread_cond_init %s", strerror(errno)); cpLog(LOG_ALERT, buf); throw VThreadInitException(buf, __FILE__, __LINE__, errno); } if ((ret = pthread_cond_init(&queueEmpty_, 0)) != 0) { sprintf(buf, "pthread_cond_init %s", strerror(errno)); cpLog(LOG_ALERT, buf); throw VThreadInitException(buf, __FILE__, __LINE__, errno); } ///create threads for (int i = 0 ; i < numThreads_; i++) { VThread* thread = new VThread(); workerThreads_.push_back(thread); ret = thread->spawn(::workerProc, this); if (ret != 0) { sprintf(buf, "pthread_create %s", strerror(errno)); cpLog(LOG_ALERT, buf); throw VThreadInitException(buf, __FILE__, __LINE__, errno); } }}voidVThreadPool::addFunctor(const VFunctor& functor) throw (VThreadInitException&){ pthread_mutex_lock(&queueLock_); if (shutdown_ || queueClosed_) { pthread_mutex_unlock(&queueLock_); cpLog(LOG_DEBUG, "Pool shutting down.."); throw VThreadInitException("Pool shutting down..", __FILE__, __LINE__, 0); } if (currQueueSize_ == 0) { queue_.push_back(functor); pthread_cond_signal(&queueNotEmpty_); } else { queue_.push_back(functor); pthread_cond_signal(&queueNotEmpty_); } currQueueSize_++; pthread_mutex_unlock(&queueLock_);}voidVThreadPool::shutdown(bool finish){ int ret; if ((ret = pthread_mutex_lock(&queueLock_)) != 0) { cpLog(LOG_DEBUG, "Failed to get the lock, still shutting down.."); assert(0); return; } if (queueClosed_ || shutdown_) { pthread_mutex_unlock(&queueLock_); return ; } queueClosed_ = 1; if (finish) { // Wait till the entire queue is finished while (currQueueSize_ != 0) { pthread_cond_wait(&queueEmpty_, &queueLock_); } } shutdown_ = 1; pthread_mutex_unlock(&queueLock_); // Wakeup all worker threads so that they can recheck shutdown flag pthread_cond_broadcast(&queueNotEmpty_); pthread_cond_broadcast(&queueNotFull_); // wait for workers to exit for (ListOfThreads::iterator itr = workerThreads_.begin(); itr != workerThreads_.end(); itr++) { Sptr < VThread > vth = (*itr); if ((ret = vth->join()) != 0) { cpLog(LOG_ALERT, "Failed to join thread (%d)", vth->getId()); } }}void*VThreadPool::workerProc(void* args){ VThreadPool& self = *(static_cast < VThreadPool* > (args)); cpLog(LOG_DEBUG, "Creating worker thread (%d)", pthread_self()); for (;;) { pthread_mutex_lock(&(self.queueLock_)); while ((self.currQueueSize_ == 0) && !(self.shutdown_)) { pthread_cond_wait(&(self.queueNotEmpty_), &(self.queueLock_)); } if (self.shutdown_) { pthread_mutex_unlock(&(self.queueLock_)); cpLog(LOG_DEBUG, "Thread (%d) exiting.." , pthread_self()); pthread_exit(0); } VFunctor work = (self.queue_.front()); self.queue_.pop_front(); self.currQueueSize_--; if (self.currQueueSize_ == 0) { pthread_cond_signal(&(self.queueEmpty_)); } pthread_mutex_unlock(&(self.queueLock_)); try {// if (work)// { cpLog(LOG_DEBUG, "Thread (%d) Doing work." , pthread_self()); work.doWork();// } } catch (VException& e) { cpLog(LOG_ALERT, "Work threw an exception: %s", e.getDescription().c_str()); } } return 0;}VThreadPool::~VThreadPool(){ cpLog(LOG_DEBUG, "VThreadPool::~VThreadPool"); shutdown();}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -