📄 threadpool.cpp
字号:
/*///////////////////////////////////////////////////////////////////////////////////
线程管理
包括线程池管理
线程队列管理
线程封装
李亦
2006.6.15
/*///////////////////////////////////////////////////////////////////////////////////
#include "console/console.h"
#include "server/sys/threadPool.h"
#define QUEUETHREAD_IDLE 100
//namespace RPGServer
//{
//////////////////////////////////////////////////////////////
//线程队列
ThreadQueue::ThreadQueueThread::ThreadQueueThread(ThreadQueue *q)
{
mThreadQueue = q;
setAutoDel(TRUE);
//m_bStop = FALSE;
}
void ThreadQueue::ThreadQueueThread::run(S32 arg)
{
mThreadQueue->threadStart();
//mThreadQueue->lock();
//ThreadStorage &sto = mThreadQueue->getStorage();
//sto.set((void *) 0);
//mThreadQueue->unlock();
U32 uTimer;
m_uExpire = Platform::getRealMilliseconds();
for(;;)
{
if(mThreadQueue->dispatchNextCall())
m_uExpire = Platform::getRealMilliseconds();
Platform::sleep(QUEUETHREAD_IDLE);
if(mThreadQueue->mLifeTime == ThreadQueue::NODEAD)
continue;
uTimer = Platform::getRealMilliseconds();
if(uTimer - m_uExpire >= mThreadQueue->mLifeTime)
{
if(mThreadQueue->removeThread(this))
break;
else
m_uExpire = Platform::getRealMilliseconds();
}
}
#ifdef TORQUE_DEBUG
Con::printf("线程退出(生命周期结束)...");
#endif
//return 0;
//delete this;
}
////////////////////////////////////////////////////////////////////
ThreadQueue::ThreadQueue(U32 uInit,U32 nMin, U32 nMax,U32 uLife)
:mTaskSemaphore(0)
{
AssertWarn(uInit,"至少指定1个线程,自动设为0");
if(uInit > nMax)
uInit = nMax;
if(uInit == 0)
uInit = 1;
if(nMin == 0)
nMin = 1;
mMaxAmount = nMax;
mMinAmount = nMin;
mLifeTime = uLife;
mWorkingNum = 0;
//mStorage.set((void *) 1);
for(U32 i = 0; i < uInit; i++)
{
Thread *theThread = new ThreadQueueThread(this);
mThreads.push_back(theThread);
theThread->start();
}
//mMutex = Mutex::createMutex();
}
ThreadQueue::~ThreadQueue()
{
//Mutex::destroyMutex(mMutex);
//mMutex = NULL;
}
bool ThreadQueue::removeThread(ThreadQueueThread* pThread)
{
if(pThread == NULL)
return false;
bool bRemoved = false;
lock();
if(mThreads.size() > mMinAmount)
{
for(U32 n = mThreads.size()-1; n >= 0; n--)
{
if(mThreads[n] == pThread)
{
mThreads.erase_fast(n);
bRemoved = true;
break;
}
}
}
unlock();
return bRemoved;
}
bool ThreadQueue::newThread()
{
//lock() 由调用者来 lock信号
bool bRet(false);
//线程已满
if(mThreads.size() < mMaxAmount)
{
Thread *theThread = new ThreadQueueThread(this);
mThreads.push_back(theThread);
theThread->start();
bRet = true;
#ifdef TORQUE_DEBUG
Con::printf("新建线程...");
#endif
}
return bRet;
}
bool ThreadQueue::dispatchNextCall()
{
//newThread();
//等待任务
if(!mTaskSemaphore.wait(false))
return false;
lock();
if(mThreadCalls.size() == 0)
{
unlock();
return false;
}
//申请新的工作线程数量失败时,则返回
if(mWorkingNum >= mThreads.size() && !newThread())
{
unlock();
return false;
}
mWorkingNum++;
Functor *c = mThreadCalls.first();
mThreadCalls.pop_front();
unlock();
//进入任务处理状态...
c->dispatch(this);
delete c;
lock();
mWorkingNum--;
unlock();
return true;
}
void ThreadQueue::postCall(Functor *theCall)
{
lock();
//if(isMainThread())
{
mThreadCalls.push_back(theCall);
unlock();
mTaskSemaphore.increment();
}
//else
//{
// mResponseCalls.push_back(theCall);
// unlock();
//}
}
void ThreadQueue::dispatchResponseCalls()
{
lock();
for(S32 i = 0; i < mResponseCalls.size(); i++)
{
Functor *c = mResponseCalls[i];
c->dispatch(this);
delete c;
}
mResponseCalls.clear();
unlock();
}
//};
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -