⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workthreadpool.cpp

📁 在linux下基于UDP通讯的程序,包括客户端与服务端.
💻 CPP
字号:
#include "WorkThreadPool.h"
#include "TaskQueue.h"
#include "LogHelper.h"
#include <string>
using namespace std;
extern CTaskQueue  theQueue;
extern CLogHelper   theLogger;
CWorkThreadPool    thePool(1);
LockType g_ThreadPoolLock;
//线程队列和任务队列调度线程
//主要的工作是,清除线程组中的无用线程,然后从
//任务队列中取出来一个任务对象,接着尝试从
//线程组中找出一个空闲的线程,如果不能找到的话
//就创建一个新的线程,并把新线程加入到线程组中
#ifdef __LINUX__
void* 
#else
DWORD WINAPI
#endif
TaskScheduleThread(
#ifdef _WIN32
LPVOID
#else
void*
#endif
data);
CWorkThreadPool::CWorkThreadPool(unsigned int nSize)
{
	InitLock(&g_ThreadPoolLock);
	m_nThread = 0;
	m_nThread = nSize;
	for(int i = 0; i < nSize; i++)
	{
	  StartWorkThread(i + 1);
	}
	//再启动调度线程,负责消费任务对列中的任务,
	//将之分配给线程池中的工作线程进行执行
	if(!StartScheduleThread())
	{
		string  str = "调度线程启动失败,系统退出[CWorkThreadPool::CWorkThreadPool(const unsigned int nSize)]";
		theLogger.LogMessage(str, true);
		exit(0);
	}
}

CWorkThreadPool::~CWorkThreadPool()
{

}

#ifdef __LINUX__
void* 
#else
DWORD WINAPI
#endif
TaskScheduleThread(
#ifdef _WIN32
LPVOID
#else
void*
#endif
data)
{
	
	//得到当前线程池指针
	CWorkThreadPool* wp = (CWorkThreadPool*)data;
	//判断是否有效
	if(wp == NULL)
	{
		string str = "无效的线程池参数[TaskScheduleThread]";
		theLogger.LogMessage(str, true);
		exit(0);
	}
	//循环查找合适的任务和合适的线程
	while(1)
	{
		//任务队列空闲时,工作线程不需要运行
		if(theQueue.IsEmpty())
		{
			SLEEP(2, 50);
			continue;
		}

		//当任务队列不空时,取出一个任务进行执行
		CTask* task = theQueue.FetchTask();
		if(task != NULL)
		{
			CWorkThread* wth = wp->FetchFreeThread();
			while(wth == NULL)
			{
                string str = "调度线程: 无空闲线程[TaskScheduleThread]";
				theLogger.LogMessage(str, true);
				int nID = wp->ThreadSize() + 1;
                wp->StartWorkThread(nID);
				SLEEP(0, 100);
				wth = wp->FetchFreeThread();
				
			}
			string str = string(wth->GetName()) + "分配到新的任务[TaskScheduleThread]";
            theLogger.LogMessage(str, true);
			wth->AssigTask(task);
		}
		else
		{
          continue;
		}
		
		
	}
	return 0;
}

int CWorkThreadPool::StartScheduleThread()
{
#ifdef _WIN32
	DWORD dwThreadID;
	if(CreateThread(NULL, 0, TaskScheduleThread, this, 0, &dwThreadID) == NULL)
	{
		ThreadSafePrint("调度线程启动失败");
		return 0;
	}
#else
	pthread_t thread;
	if(pthread_create(&thread, NULL, TaskScheduleThread, this))
	{
		
		ThreadSafePrint("调度线程启动失败");
		return 0;
	} 
	
	if(pthread_detach(thread))
	{
		ThreadSafePrint("调度线程分离失败");
		return 0;
	}
#endif 
	return 1;
}

void CWorkThreadPool::AddThread(CWorkThread *th)
{
  if(th != NULL)
  {
	  LockOn(&g_ThreadPoolLock);
      m_pThreadQueue.push(th);
	  LockOff(&g_ThreadPoolLock);
  }
}

CWorkThread* CWorkThreadPool::FetchFreeThread( )
{
   CWorkThread* th = NULL;
   
   LockOn(&g_ThreadPoolLock);
   m_nThread = m_pThreadQueue.size();
   LockOff(&g_ThreadPoolLock);
   
   if(m_nThread <= 0)
   {
	   return NULL;
   }
   //循环检测队列中是否有空闲线程存在
   LockOn(&g_ThreadPoolLock);
   int i = m_nThread;
   while(i-- > 0)
   {
		 //取出头元素
	     th = m_pThreadQueue.front();
		 //将头元素吐出来
		 m_pThreadQueue.pop();
		 //然后再加到队尾
		 m_pThreadQueue.push(th);
		 //如果有效的话则改变他的状态
		 if(th && th->IsFree())
		 {
             string str = "取到一个空闲线程:" + string(th->GetName()) + "[CWorkThread* CWorkThreadPool::FetchFreeThread( )]";
             theLogger.LogMessage(str, true);
			 th->ChangeStatus(false);
			 break;
		 }
		 th = NULL;
   }
	 LockOff(&g_ThreadPoolLock);
     return th;
}

int CWorkThreadPool::ThreadSize()
{
	int nRet = 0;
    LockOn(&g_ThreadPoolLock);
	nRet = m_pThreadQueue.size();
	LockOff(&g_ThreadPoolLock);
	return nRet;
}

int CWorkThreadPool::FreeThreads() 
{
	
	int nRet = 0;
	LockOn(&g_ThreadPoolLock);
	int i = m_pThreadQueue.size();
	while(i-- >0)
	{
		CWorkThread* wth = m_pThreadQueue.front();
		m_pThreadQueue.pop();
		m_pThreadQueue.push(wth);
		if(wth && wth->IsFree())
		{
			nRet++;
		}
	}
	LockOff(&g_ThreadPoolLock);
	return nRet;
}

int CWorkThreadPool::StartWorkThread(int nID)
{
  	    int nRet = 0;
	    //创建一个新的线程对象
		CWorkThread* wl = new CWorkThread;
		//设置线程的ID
		wl->SetID(nID);
		//启动线程
		nRet =  wl->Start(wl);
		//并将线程添加到线程队列中
		AddThread(wl);
		m_nThread += nRet;
		return nRet;
}

int CWorkThreadPool::CleanDeadThreads()
{
	int nRet = 0;
	LockOn(&g_ThreadPoolLock);
	int i = m_pThreadQueue.size();
	while(i-- >0)
	{
		CWorkThread* wth = m_pThreadQueue.front();
		m_pThreadQueue.pop();
		if(wth->ShouldExit())
		{
			nRet++;
		}else
		{
			m_pThreadQueue.push(wth);
		}
	}
	LockOff(&g_ThreadPoolLock);
	return nRet;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -