⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.cpp

📁 五行MMORPG引擎系统V1.0
💻 CPP
字号:
/*///////////////////////////////////////////////////////////////////////////////////
线程管理
	包括线程池管理
	线程队列管理
	线程封装

	李亦
	2006.6.15
/*///////////////////////////////////////////////////////////////////////////////////

#include "console/console.h"
#include "server/sys/threadPool.h"


#define QUEUETHREAD_IDLE	100

//namespace RPGServer
//{

//////////////////////////////////////////////////////////////
//线程队列
ThreadQueue::ThreadQueueThread::ThreadQueueThread(ThreadQueue *q)
{
   mThreadQueue	= q;
	setAutoDel(TRUE);
	//m_bStop			= FALSE;
}

void ThreadQueue::ThreadQueueThread::run(S32 arg)
{
   mThreadQueue->threadStart();

   //mThreadQueue->lock();
   //ThreadStorage &sto = mThreadQueue->getStorage();
   //sto.set((void *) 0);
   //mThreadQueue->unlock();
	U32	uTimer;	
	m_uExpire = Platform::getRealMilliseconds();

   for(;;)
	{   
		if(mThreadQueue->dispatchNextCall())
			m_uExpire = Platform::getRealMilliseconds();

		Platform::sleep(QUEUETHREAD_IDLE);

		if(mThreadQueue->mLifeTime == ThreadQueue::NODEAD)
			continue;

		uTimer = Platform::getRealMilliseconds();

		if(uTimer - m_uExpire >= mThreadQueue->mLifeTime)
		{
			if(mThreadQueue->removeThread(this))
				break;
			else
				m_uExpire = Platform::getRealMilliseconds();
		}
	}

#ifdef TORQUE_DEBUG
	Con::printf("线程退出(生命周期结束)...");
#endif
    //return 0;
	//delete this;
}


////////////////////////////////////////////////////////////////////

ThreadQueue::ThreadQueue(U32 uInit,U32 nMin, U32 nMax,U32 uLife)
:mTaskSemaphore(0)
{
	AssertWarn(uInit,"至少指定1个线程,自动设为0");
	if(uInit > nMax)
		uInit = nMax;
	if(uInit == 0)
		uInit = 1;
	if(nMin == 0)
		nMin = 1;

	mMaxAmount	= nMax;
	mMinAmount	= nMin;
	mLifeTime	= uLife;
	mWorkingNum	= 0;

	//mStorage.set((void *) 1);
   for(U32 i = 0; i < uInit; i++)
   {
      Thread *theThread = new ThreadQueueThread(this);
      mThreads.push_back(theThread);
      theThread->start();
   }
   //mMutex = Mutex::createMutex();

}




ThreadQueue::~ThreadQueue()
{
   //Mutex::destroyMutex(mMutex);
   //mMutex = NULL;
}


bool ThreadQueue::removeThread(ThreadQueueThread* pThread)
{
	if(pThread == NULL)
		return false;
	bool bRemoved = false;

	lock();
	if(mThreads.size() > mMinAmount)
	{
		for(U32 n = mThreads.size()-1; n >= 0; n--)
		{
			if(mThreads[n] == pThread)
			{	
				mThreads.erase_fast(n);
				bRemoved = true;
				break;
			}
		}
	}
	unlock();

	return bRemoved;
}

bool ThreadQueue::newThread()
{
	//lock()  由调用者来 lock信号

	bool bRet(false);
	//线程已满
   if(mThreads.size() < mMaxAmount)
	{
		Thread *theThread = new ThreadQueueThread(this);
		mThreads.push_back(theThread);
		theThread->start();
		bRet = true;

#ifdef TORQUE_DEBUG
	Con::printf("新建线程...");
#endif

	}

	return bRet;
}


bool ThreadQueue::dispatchNextCall()
{
	//newThread();
	//等待任务
	if(!mTaskSemaphore.wait(false))
		return false;

   lock();
   if(mThreadCalls.size() == 0)
   {
      unlock();
      return false;
   }

	//申请新的工作线程数量失败时,则返回
	if(mWorkingNum >= mThreads.size() && !newThread())
	{
		unlock();
		return false;
	}
	mWorkingNum++;

   Functor *c = mThreadCalls.first();
   mThreadCalls.pop_front();
   unlock();

	//进入任务处理状态...
   c->dispatch(this);
   delete c;

	lock();
	mWorkingNum--;
   unlock();

	return true;
}


void ThreadQueue::postCall(Functor *theCall)
{
   lock();
   //if(isMainThread())
   {
      mThreadCalls.push_back(theCall);
      unlock();
      mTaskSemaphore.increment();
   }
   //else
   //{
   //   mResponseCalls.push_back(theCall);
   //   unlock();
   //}
}

void ThreadQueue::dispatchResponseCalls()
{
   lock();
   for(S32 i = 0; i < mResponseCalls.size(); i++)
   {
      Functor *c = mResponseCalls[i];
      c->dispatch(this);
      delete c;
   }
   mResponseCalls.clear();
   unlock();
}

//};

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -