⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.cpp

📁 This software aims to create an applet and panel tools to manage a wireless interface card, such as
💻 CPP
字号:
//
// ThreadPool.cpp
//
// $Id: //poco/Main/Foundation/src/ThreadPool.cpp#5 $
//
// Copyright (c) 2004, Guenter Obiltschnig/Applied Informatics.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
//
// 1. Redistributions of source code must retain the above copyright
//    notice, this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright
//    notice, this list of conditions and the following disclaimer in the
//    documentation and/or other materials provided with the distribution.
//
// 3. Redistributions in any form must be accompanied by information on
//    how to obtain complete source code for this software and any
//    accompanying software that uses this software.  The source code
//    must either be included in the distribution or be available for no
//    more than the cost of distribution plus a nominal fee, and must be
//    freely redistributable under reasonable conditions.  For an
//    executable file, complete source code means the source code for all
//    modules it contains.  It does not include source code for modules or
//    files that typically accompany the major components of the operating
//    system on which the executable file runs.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
// FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
// COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
// BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//


#include "Foundation/ThreadPool.h"
#include "Foundation/Runnable.h"
#include "Foundation/Thread.h"
#include "Foundation/Event.h"
#include "Foundation/ThreadLocal.h"
#include <sstream>
#include <time.h>


Foundation_BEGIN


class PooledThread: public Runnable
{
public:
	PooledThread(const std::string& name);
	~PooledThread();

	void start();
	void start(Runnable& target);
	bool idle();
	int idleTime();
	void join();
	void activate();
	void release();
	void run();

private:
	volatile bool   _idle;
	volatile time_t _idleTime;
	Runnable* _pTarget;
	Thread    _thread;
	Event     _targetReady;
	Event     _targetCompleted;
	Event     _started;
	FastMutex _mutex;
};


PooledThread::PooledThread(const std::string& name): _idle(true), _idleTime(0), _pTarget(0), _thread(name)
{
	_idleTime = time(NULL);
}


PooledThread::~PooledThread()
{
}


void PooledThread::start()
{
	_thread.start(*this);
	_started.wait();
}


void PooledThread::start(Runnable& target)
{
	FastMutex::ScopedLock lock(_mutex);
	
	poco_assert (_pTarget == 0);

	_pTarget = &target;
	_targetReady.set();
}


inline bool PooledThread::idle()
{
	return _idle;
}


int PooledThread::idleTime()
{
	FastMutex::ScopedLock lock(_mutex);

	if (_idle)
		return (int) (time(NULL) - _idleTime);
	else
		return 0;
}


void PooledThread::join()
{
	_mutex.lock();
	Runnable* pTarget = _pTarget;
	_mutex.unlock();
	if (pTarget)
		_targetCompleted.wait();
}


void PooledThread::activate()
{
	FastMutex::ScopedLock lock(_mutex);
	
	poco_assert (_idle);
	_idle = false;
	_targetCompleted.reset();
}


void PooledThread::release()
{
	_mutex.lock();
	_pTarget = 0;
	_mutex.unlock();
	// In case of a statically allocated thread pool (such
	// as the default thread pool), Windows may have already
	// terminated the thread before we got here.
	if (_thread.isRunning()) 
		_targetReady.set();
	else
		delete this;
}


void PooledThread::run()
{
	_started.set();
	for (;;)
	{
		_targetReady.wait();
		_mutex.lock();
		if (_pTarget) // a NULL target means kill yourself
		{
			_mutex.unlock();
			try
			{
				_pTarget->run();
			}
			catch (std::exception& e)
			{
				poco_bugcheck_msg(e.what());
			}
			catch (...)
			{
				poco_bugcheck();
			}
			_mutex.lock();
			_idle = true;
			_idleTime = time(NULL);
			_pTarget = 0;
			_mutex.unlock();
			_targetCompleted.set();
			ThreadLocalStorage::clear();
		}
		else
		{
			_mutex.unlock();
			break;
		}
	}
	delete this;
}


ThreadPool::ThreadPool(int minCapacity, int maxCapacity, int idleTime): 
	_minCapacity(minCapacity), 
	_maxCapacity(maxCapacity), 
	_idleTime(idleTime),
	_serial(0)
{
	poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);

	for (int i = 0; i < _minCapacity; i++)
	{
		PooledThread* pThread = createThread();
		_threads.push_back(pThread);
		pThread->start();
	}
}


ThreadPool::ThreadPool(const std::string& name, int minCapacity, int maxCapacity, int idleTime):
	_name(name),
	_minCapacity(minCapacity), 
	_maxCapacity(maxCapacity), 
	_idleTime(idleTime),
	_serial(0)
{
	poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);

	for (int i = 0; i < _minCapacity; i++)
	{
		PooledThread* pThread = createThread();
		_threads.push_back(pThread);
		pThread->start();
	}
}


ThreadPool::~ThreadPool()
{
	stopAll();
}


void ThreadPool::addCapacity(int n)
{
	FastMutex::ScopedLock lock(_mutex);

	poco_assert (_maxCapacity + n >= _minCapacity);
	_maxCapacity += n;
	housekeep();
}


int ThreadPool::capacity() const
{
	FastMutex::ScopedLock lock(_mutex);
	return _maxCapacity;
}


int ThreadPool::available() const
{
	FastMutex::ScopedLock lock(_mutex);

	int count = 0;
	for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
	{
		if ((*it)->idle()) ++count;
	}
	return (int) (count + _maxCapacity - _threads.size());
}


int ThreadPool::used() const
{
	FastMutex::ScopedLock lock(_mutex);

	int count = 0;
	for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
	{
		if (!(*it)->idle()) ++count;
	}
	return count;
}


int ThreadPool::allocated() const
{
	FastMutex::ScopedLock lock(_mutex);

	return int(_threads.size());
}


void ThreadPool::start(Runnable& target)
{
	getThread()->start(target);
}


void ThreadPool::stopAll()
{
	FastMutex::ScopedLock lock(_mutex);

	for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
	{
		(*it)->release();
	}
	_threads.clear();
}


void ThreadPool::joinAll()
{
	FastMutex::ScopedLock lock(_mutex);

	for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
	{
		(*it)->join();
	}
	housekeep();
}


void ThreadPool::collect()
{
	FastMutex::ScopedLock lock(_mutex);
	housekeep();
}


void ThreadPool::housekeep()
{
	bool moreThreads = true;
	while (moreThreads && _threads.size() > _minCapacity)
	{
		moreThreads = false;
		for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
		{
			if ((*it)->idleTime() >= _idleTime)
			{
				(*it)->release();
				_threads.erase(it);
				moreThreads = true;
				break;
			}
		}
	}
}


PooledThread* ThreadPool::getThread()
{
	FastMutex::ScopedLock lock(_mutex);

	housekeep();

	PooledThread* pThread = 0;
	for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it)
	{
		if ((*it)->idle()) pThread = *it;
	}
	if (!pThread)
	{
		if (_threads.size() < _maxCapacity)
		{
			pThread = createThread();
			_threads.push_back(pThread);
			pThread->start();
		}
		else
		{
			throw NoThreadAvailableException();
		}
	}
	pThread->activate();
	return pThread;
}


PooledThread* ThreadPool::createThread()
{
	std::ostringstream name;
	name << _name << "[#" << ++_serial << "]";
	return new PooledThread(name.str());
}


class ThreadPoolSingletonHolder
{
public:
	ThreadPoolSingletonHolder()
	{
		_pPool = 0;
	}
	~ThreadPoolSingletonHolder()
	{
		delete _pPool;
	}
	ThreadPool* pool()
	{
		FastMutex::ScopedLock lock(_mutex);
		
		if (!_pPool)
		{
			_pPool = new ThreadPool("default");
		}
		return _pPool;
	}
	
private:
	ThreadPool* _pPool;
	FastMutex   _mutex;
};


ThreadPool& ThreadPool::defaultPool()
{
	static ThreadPoolSingletonHolder sh;
	return *sh.pool();
}


Foundation_END

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -