📄 threadpool.h
字号:
/////////////////////////////////////////////////////////////////////////////////////////////////////////
/// ThreadPool.h: interface for the ThreadPoolclass.
// Author - Alex.C.P (alexcpn@gmail.com)
/// This class models the ThreadPool
/// To limit the overhead caused by thread creation and deletion for
/// mml task creation and task sending, especially in the case of too many SRCHALARM
/// commands to be fired, MMLADC should use a thread pool to manage the threads.
/////////////////////////////////////////////////////////////////////////////////////////////////////////
#if !defined(THREADPOOLCLASS__INCLUDED)
#define THREADPOOLCLASS__INCLUDED
#if _MSC_VER > 1000
#pragma once
#endif /// _MSC_VER > 1000
#include "Command.h"
#include "ThreadChain.h"
#include "utility.h"
#include <list>
#include <queue>
#include "RequestQueue.h"
template<class T>
class ThreadChain;
namespace PluginThreadPool
{
const int THREAD_CHECKTIME =1000; /// 1 second
const int MAX_WAITTIMEOUT =2000;/// 2 Seconds
const int MAX_IDLETIME = 30*1000; // 5 minutes
template <typename T>
/// This class models the ThreadPool
/// The class that manages the Threads
/// This creates a chain of ThreadChain objects and manages them
class ThreadPool
{
private:
///there is no meaning in copying a thread pool,
///so making copy ctor and copy assignment private
ThreadPool(const ThreadPool&){};
ThreadPool& operator=(const ThreadPool&){};
public:
//Constructor
explicit ThreadPool(int _minthreadcount,int _maxthreadcount,unsigned long maxtimeout)
:root(0),minthreadCount(_minthreadcount),maxthreadCount(_maxthreadcount),
m_nPendingCount(0),m_threadCount(0)
{
InitializeCriticalSection(&critsec);
///There is at least one thread in the thread pool
ThreadChain<T> *prev = new ThreadChain<T>(0,this);///last node
root = prev;
///Initially only create the minimum threads needed
for(int i=0;i<minthreadCount-1;++i)
{
ThreadChain<T> *temp = new ThreadChain<T>(prev,this);
prev=temp;
root = temp;
}
///Create a thread that will monitor the threads to see if they are hanging
m_winthread = AfxBeginThread( ThreadPool::ThreadCheckerProxy,this,
0,0,CREATE_SUSPENDED);
m_winthread->m_bAutoDelete = FALSE;
m_winthread->ResumeThread();
///Create an event in the non signalled state
///which is used to check if there are any pending tasks
m_pending=CreateEvent(0,1,0,0); ///Manually reset
///Create a thread that will monitor the pending jobs
m_winthread = AfxBeginThread(ThreadPool::ProcessListProxy,this,0,0,CREATE_SUSPENDED);
m_winthread->m_bAutoDelete = FALSE;
m_winthread->ResumeThread();
}
///dtor
~ThreadPool()
{
CloseHandle(m_pending);
///Note the ownership of the objects in the queue
///is not inside this
};
///This is the proxy thread method to 'ThreadChecker' method
static UINT ThreadCheckerProxy(LPVOID param)
{
ThreadPool* temp = (ThreadPool*)param;
temp->ThreadChecker();
return 1;
}
///This method initates the root to propogates the
///check of each ThreadChain object to see if it is in a hung state
bool ThreadChecker()
{
while(true)
{
Sleep(THREAD_CHECKTIME);///minimum time
if(root)
root->HandleHungThreads(root);
}
}
void HandleRequest()
{
/// todo put an assert here root should not be null
///This call chains through the ThreadChain objects,the first free object
///can handle the tasks
if(root)
root->canHandle();
else
{
printf("No ROOT!!\n");
//can happen if all threads are hanging
///There is at least one thread in the thread pool
ThreadChain<T> *prev = new ThreadChain<T>(0,this);///last node
root = prev;
}
return ;
}
///New Method used to Queue the request in a list.
///This can be called from multiple threads
void QueueRequest(Command<T>* cmdPtr)
{
try
{
if(cmdPtr) /// If this is a valid pointer
{
m_RequestQueue.QueueRequest(cmdPtr);
///printf("Q Size=%d\n",m_RequestQueue.size());
SetEvent(m_pending); ///signal the ProcessList to process the
return ;
}
}
catch(...)
{
printf("Exception Caught in QueueRequest.....\n");
throw;
}
}
/// This method is called by a thread when it is free to procees
///a request. It deques the request -fifo
Command<T>* GetRequest()
{
return m_RequestQueue.GetRequest();
}
/// This is the proxy thread method that calls the real ProcessList method
static UINT ProcessListProxy(LPVOID param)
{
ThreadPool* temp = (ThreadPool*)param;
temp->ProcessList();
return 1;
}
///This waits on a event that is set when a new request is pushed to the queue
///to be processes
void ProcessList()
{
while(true)
{
try
{
WaitForSingleObject(m_pending,MAX_WAITTIMEOUT);/// or a maximum two seconds
CLock lock(&critsec); ///the queue is a resource under contention
if(m_RequestQueue.empty())
{
printf(".");//m_RequestQueue is Empty\n");
ResetEvent(m_pending);///If empty wait for the event to occur
}
else
{
HandleRequest();///Call to the ThreadChain method to handle a requesr
///printf("m_RequestQueue is NOT Empty\n");
SetEvent(m_pending);
Sleep(0);/// To avoid busy-looping
}
}
catch(...)
{
printf("Exception Caught in ProcessList\n");
throw;
}
}
}
/// This method increments the thread count of the thread pool
/// in a thread safe manner
void IncrementThreadCount()
{
InterlockedIncrement(&m_threadCount);
}
/// This method decrements the thread count of the thread pool
/// in a thread safe manner
void DecrementThreadCount()
{
InterlockedDecrement(&m_threadCount);
}
// Private member variables
private:
CWinThread* m_winthread;
CRITICAL_SECTION critsec;
public:
ThreadChain<T>* root;
///std::queue<Command<T>*,std::list<Command<T>*> > m_RequestQueue;
long m_nPendingCount;
int minthreadCount;// the mimimum threads in thread pool
int maxthreadCount;// the maximum thread count in threadpool
long m_threadCount; // the current dynamic thread count
HANDLE m_pending;
HANDLE freeThreadEvent;
CRequestQueue<T> m_RequestQueue; //the container to hold the request object
};
}
#endif /// !defined(THREADPOOLCLASS__INCLUDED)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -