⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.h

📁 用vc++写的一个简单的线程池的代码
💻 H
字号:
/////////////////////////////////////////////////////////////////////////////////////////////////////////
/// ThreadPool.h: interface for the ThreadPoolclass.
// Author - Alex.C.P (alexcpn@gmail.com)
/// This class models the ThreadPool
/// To limit the overhead caused by thread creation and deletion for 
/// mml task creation and task sending, especially in the case of too many SRCHALARM 
/// commands to be fired, MMLADC should use a thread pool to manage the threads.
/////////////////////////////////////////////////////////////////////////////////////////////////////////

#if !defined(THREADPOOLCLASS__INCLUDED)
#define THREADPOOLCLASS__INCLUDED

#if _MSC_VER > 1000
#pragma once
#endif /// _MSC_VER > 1000

#include "Command.h"
#include "ThreadChain.h"
#include "utility.h"
#include <list>
#include <queue>
#include "RequestQueue.h"

template<class T>
class ThreadChain;



namespace PluginThreadPool
{
	const int THREAD_CHECKTIME =1000; /// 1 second
	const int MAX_WAITTIMEOUT =2000;/// 2 Seconds
	const int MAX_IDLETIME = 30*1000; // 5 minutes

template <typename T>

/// This class models the ThreadPool
/// The class that manages the Threads 
/// This creates a chain of ThreadChain objects and manages them

class ThreadPool
{
	
private:
		///there is no meaning in copying a thread pool,
	///so making copy ctor and copy assignment private
	ThreadPool(const ThreadPool&){};
	ThreadPool& operator=(const ThreadPool&){};

public:
	//Constructor
	explicit ThreadPool(int _minthreadcount,int _maxthreadcount,unsigned long maxtimeout)
		:root(0),minthreadCount(_minthreadcount),maxthreadCount(_maxthreadcount),
		m_nPendingCount(0),m_threadCount(0)
	{

		InitializeCriticalSection(&critsec);
		
		///There is at least one thread in the thread pool
		ThreadChain<T> *prev = new ThreadChain<T>(0,this);///last node
		root = prev;

		///Initially only create the minimum threads needed
		for(int i=0;i<minthreadCount-1;++i)
		{
			ThreadChain<T> *temp = new ThreadChain<T>(prev,this);
			prev=temp;
			root = temp;
		}

		///Create a thread that will monitor the threads to see if they are hanging
		m_winthread = AfxBeginThread( ThreadPool::ThreadCheckerProxy,this,
			0,0,CREATE_SUSPENDED);
		m_winthread->m_bAutoDelete = FALSE;
		m_winthread->ResumeThread();

		///Create an event in the non signalled state
		///which is used to check if there are any pending tasks
		m_pending=CreateEvent(0,1,0,0); ///Manually  reset

		///Create a thread that will monitor the pending jobs
		m_winthread = AfxBeginThread(ThreadPool::ProcessListProxy,this,0,0,CREATE_SUSPENDED);
		m_winthread->m_bAutoDelete = FALSE;
		m_winthread->ResumeThread();

	}

	///dtor
	~ThreadPool()
	{
		CloseHandle(m_pending);
		///Note the ownership of the objects in the queue
		///is not inside this
	};

	///This is the proxy thread method to 'ThreadChecker' method
 	static UINT ThreadCheckerProxy(LPVOID param)
	{
		ThreadPool* temp = (ThreadPool*)param;
		temp->ThreadChecker();
		return 1;
	}
		
	///This method initates the root to propogates the 
	///check of each ThreadChain object to see if it is in a hung state
	bool ThreadChecker()
	{
		while(true)
		{
			Sleep(THREAD_CHECKTIME);///minimum time
			if(root)
			root->HandleHungThreads(root); 
	
			
		}
	}


	void HandleRequest()
	{
		/// todo put an assert here root should not be null	
		///This call chains through the ThreadChain objects,the first free object
		///can handle the tasks
		if(root)
			root->canHandle();
		else
		{
			printf("No ROOT!!\n");
			//can happen if all threads are hanging
			///There is at least one thread in the thread pool
			ThreadChain<T> *prev = new ThreadChain<T>(0,this);///last node
			root = prev;
		}
		return ;

	}

	
	///New Method used to Queue the request in a list.
	///This can be called from multiple threads
	void QueueRequest(Command<T>* cmdPtr)
	{
		try
		{
			if(cmdPtr) /// If this is a valid pointer
			{
				m_RequestQueue.QueueRequest(cmdPtr);		
				///printf("Q Size=%d\n",m_RequestQueue.size());	
				SetEvent(m_pending); ///signal the ProcessList to process the
				return ;
			}

		}
		catch(...)
		{
			printf("Exception Caught in QueueRequest.....\n");
			throw;
		}

	}

	/// This method is called by a thread when it is free to procees
	///a request. It deques the request -fifo
	Command<T>* GetRequest()
	{
		return m_RequestQueue.GetRequest();
	}
	


	/// This is the proxy thread method that calls the real ProcessList method
	static UINT ProcessListProxy(LPVOID param)
	{
		ThreadPool* temp = (ThreadPool*)param;
		temp->ProcessList();
		return 1;
	}

	///This waits on a event that is set when a new request is pushed to the queue
	///to be processes
	void ProcessList()
	{
		while(true)
		{
			try
			{

				WaitForSingleObject(m_pending,MAX_WAITTIMEOUT);/// or a maximum two seconds

				
				CLock lock(&critsec); ///the queue is a resource under contention

				if(m_RequestQueue.empty()) 
				{
					printf(".");//m_RequestQueue is Empty\n");
					ResetEvent(m_pending);///If empty wait for the event to occur
				}
				else 
				{
					HandleRequest();///Call to the ThreadChain method to handle a requesr
					///printf("m_RequestQueue is NOT Empty\n");
					SetEvent(m_pending);
					Sleep(0);/// To avoid busy-looping
				}

			}
			catch(...)
			{
				printf("Exception Caught in ProcessList\n");
				throw;
			}
			
		}
	}


	/// This method increments the thread count of the thread pool
	/// in a thread safe manner
	void IncrementThreadCount()
	{
		InterlockedIncrement(&m_threadCount);
	}

	/// This method decrements the thread count of the thread pool
	/// in a thread safe manner
	void DecrementThreadCount()
	{
		InterlockedDecrement(&m_threadCount);
	}

// Private member variables
private:

	CWinThread* m_winthread;
	CRITICAL_SECTION critsec;


public:
	ThreadChain<T>* root; 
	///std::queue<Command<T>*,std::list<Command<T>*> > m_RequestQueue; 
	long m_nPendingCount;
	int minthreadCount;// the mimimum threads in thread pool	
	int maxthreadCount;// the maximum thread count in threadpool
	long m_threadCount; // the current dynamic thread count
	HANDLE m_pending;
	HANDLE freeThreadEvent;
	CRequestQueue<T> m_RequestQueue;  //the container to hold the request object
	
};

}


#endif /// !defined(THREADPOOLCLASS__INCLUDED)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -