⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.cpp

📁 看到联通的接口协议的dll没
💻 CPP
字号:



#include "threadpool.h"
#include <assert.h>

//#include <Wxutil.h>


#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
//#define new DEBUG_NEW
#endif


/*
  管理线程
*/
unsigned int CThreadPool::ManagerProc(void* p)
{
 	CThreadPool* pServer = (CThreadPool*)p;
	HANDLE       IoPort = pServer->GetMgrIoPort();
	unsigned long      pN1, pN2; 
	OVERLAPPED*       pOverLapped;
	
LABEL_MANAGER_PROCESSING:
	while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2, &pOverLapped, pServer->GetMgrWaitTime()))
	{
		if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
		{
			return 0;
		}
		//  else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE)
		//  {
		//	   if(pN1 != 0)
		//	   {
		//			DWORD rc=::WaitForSingleObject((HANDLE)pN1,INFINITE);
		//			if(rc=WAIT_OBJECT_0)
		//			{
		//				CloseHandle((HANDLE)pN1);//关闭该线程句柄
		//			}
		//			ReportDebug("Wait a Thread Removed!\n");
		//		 }
		//  }
		else 
		{  //zc
			//ReportDebug("mgr events comes in!\n");
		}
	}

	//time out processing
	if (::GetLastError() == WAIT_TIMEOUT)
	{
		//ReportDebug("Time out processing!\n");
		if (pServer->GetThreadPoolStatus() == CThreadPool::BUSY)
			pServer->AddThreads();
		if (pServer->GetThreadPoolStatus() == CThreadPool::IDLE)
			pServer->RemoveThreads();

		goto LABEL_MANAGER_PROCESSING;
	}
	return 0;
}


/*
  线程池工作体  
*/
unsigned int CThreadPool::WorkerProc(void* p)
{	
	CThreadPool* pServer=(CThreadPool*)p;
	HANDLE  IoPort = pServer->GetWorkerIoPort();

	unsigned long      pN1, pN2; 
	OVERLAPPED*       pOverLapped;
	
	DWORD threadId = ::GetCurrentThreadId();
	//ReportDebug("worker thread id is %d.\n", threadId);
	
	while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2, 
		&pOverLapped, INFINITE ))
	{
		if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE)
		{			
			//   CThreadPool::Iterator_ThreadInfoMap it = pServer->m_threadMap.find(threadId);
			//   if(it!=pServer->m_threadMap.end())
			//   {
			//     ::PostQueuedCompletionStatus(pServer->m_hMgrIoPort,
			//			(unsigned long)it->second.m_hThread,
			//			   0, 
			//		   (OVERLAPPED*)0xFFFFFFFE);
			pServer->RemoveThread(threadId);			
			//   }
			break;
		}
		else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
		{
			break;
		}
		else
		{			
			//before processing, we need to change the status to busy.
			pServer->ChangeStatus(threadId, true);	
			
			IWorker* pIWorker = reinterpret_cast<IWorker*>(pN1);
			IJobDesc* pIJob= reinterpret_cast<IJobDesc*>(pN2);
			
			//开始调用工作
			pIWorker->ProcessJob(pIJob);

			pServer->ChangeStatus(threadId, false);
		}
	}
	return 0;
}


/*
   nStatic   运行线程数
   nMax      最大线程数
*/
void CThreadPool::Start(unsigned short nStatic, unsigned short nMax)
{
	assert(nMax >= nStatic);
	
	HANDLE  hThread;
	DWORD nThreadId;
	m_nNumberOfStaticThreads = nStatic;
	m_nNumberOfTotalThreads = nMax;
	
	
	//自动加锁状态?
	//CAutoLock AutoLock(m_arrayCs);	
	//CSingleLock lock(&m_arrayCs);
	//lock.Lock();
	
	m_hMgrIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE, NULL, 0, 0);
	m_hWorkerIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE, NULL, 0, 0);

	//创建管理线程的线程,
	m_hMgrThread = CreateThread(
        NULL, // SD
        0,                        // initial stack size
        (LPTHREAD_START_ROUTINE)ManagerProc,    // thread function
        (LPVOID)this,                       // thread argument
        0,                    
        &nThreadId );                       // thread identifier
		
	
	//创建工作线程,并保存在map中
	for(long n = 0; n < nStatic; n++)
	{
		hThread = CreateThread(
			NULL, // SD
			0,                        // initial stack size
			(LPTHREAD_START_ROUTINE)WorkerProc,    // thread function
			(LPVOID)this,                       // thread argument
			0,                    // creation option
			&nThreadId );    
		//ReportDebug("generate a worker thread handle id is %d.\n", nThreadId);
		//m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false)));
		m_threadMap.SetAt(nThreadId, ThreadInfo(hThread, false));
	}
//	lock.Unlock();
}


/*
  关闭线程池
*/
void CThreadPool::Stop(bool bHash)
{
	//CAutoLock Lock(m_arrayCs);
	//CSingleLock lock(&m_arrayCs);
	//lock.Lock();
	::PostQueuedCompletionStatus(m_hMgrIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);

	//在那里释放?
	//lock.Unlock();
	WaitForSingleObject(m_hMgrThread, INFINITE);
	CloseHandle(m_hMgrThread);
	CloseHandle(m_hMgrIoPort);
	
	//shut down all the worker threads
	//UINT nCount = m_threadMap.size();
	UINT nCount = m_threadMap.GetCount();
	HANDLE* pThread = new HANDLE[nCount];
	
	long n = 0;
	ThreadInfo info;
	/*
	for(Iterator_ThreadInfoMap i = m_threadMap.begin();i != m_threadMap.end(); i++)
	{
		::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
		info = i->second;
		pThread[n++] = info.m_hThread;	
	}
	*/
	
	//wait for 0.5 minutes, then start to kill threads
	DWORD rc = WaitForMultipleObjects(nCount, pThread, TRUE, 30000);
	CloseHandle(m_hWorkerIoPort);
	if(rc >= WAIT_OBJECT_0 && rc < WAIT_OBJECT_0+nCount)
	{
		for(unsigned int n = 0;n < nCount;n++)
		{
			CloseHandle(pThread[n]);
		}
	}
	else if(rc == WAIT_TIMEOUT && bHash)
	{
		//some threads not terminated, we have to stop them.
		DWORD exitCode;
		for(unsigned int i=0; i<nCount; i++)
		{
			if (::GetExitCodeThread(pThread[i], &exitCode) == STILL_ACTIVE)
			{
				TerminateThread(pThread[i], 99);
			}
			CloseHandle(pThread[i]);
		}
	}
	delete[] pThread;
}

/*
  把处理消息放到工作线程完成队列中,开始处理.
*/
void CThreadPool::ProcessJob(IJobDesc* pJob, IWorker* pWorker) const
{
	::PostQueuedCompletionStatus(m_hWorkerIoPort, \
		reinterpret_cast<DWORD>(pWorker), \
		reinterpret_cast<DWORD>(pJob),\
		NULL);
}

CThreadPool::CThreadPool()
{


}


CThreadPool::~CThreadPool()
{

}



/*
  增加线程
*/
void CThreadPool::AddThreads()
{
	HANDLE  hThread;
	DWORD nThreadId;
	//unsigned int nCount = m_threadMap.size();
	unsigned int nCount = m_threadMap.GetCount();
	unsigned int nTotal = min(nCount+2, m_nNumberOfTotalThreads);
	for(unsigned int i = 0; i < nTotal - nCount; i++)
	{
		hThread = CreateThread(
			NULL,                         // SD
			0,                            // initial stack size
			(LPTHREAD_START_ROUTINE)WorkerProc,    // thread function
			(LPVOID)this,                       // thread argument
			0,                               // creation option
			&nThreadId );    
		//ReportDebug("generate a worker thread handle id is %d.\n", nThreadId);
	//	m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false)));
		m_threadMap.SetAt(nThreadId, ThreadInfo(hThread, false));
	}
}


//void CThreadPool::RemoveThread(DWORD threadId)
//{
	//CAutoLock lock(m_arrayCs);	
	//m_threadMap.erase(threadId);
//}


/*
   移去所有的线程
*/
void CThreadPool::RemoveThreads()
{
	//unsigned int nCount = m_threadMap.size();

	unsigned int nCount = m_threadMap.GetCount();
	unsigned int nTotal = max(nCount-2, m_nNumberOfStaticThreads);
	for(unsigned int i = 0; i < nCount-nTotal; i++)
	{
		::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFE);
	}
}


/*
  获取线程池的状态
*/
CThreadPool::ThreadPoolStatus CThreadPool::GetThreadPoolStatus()
{
	//int nTotal = m_threadMap.size();
    int nTotal = m_threadMap.GetCount();
	ThreadInfo info;
	int nCount = 0;
	
	/*
	for(Iterator_ThreadInfoMap i = m_threadMap.begin();i!=m_threadMap.end() ;i++)	
	{
		info = i->second;
		if (info.m_bBusyWorking == true) 
			nCount++;		
	}
	*/
	if ( nCount/(1.0*nTotal) > 0.8 )
		return BUSY;
	if ( nCount/ (1.0*nTotal) < 0.2 )
		return IDLE;
	return NORMAL;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -