📄 threadpool.cpp
字号:
/*
* Thread Pool Class
* Copyright (C) Burlex <burlex@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#include "ThreadPool.h"
#include "../NGLog.h"
#ifdef WIN32
#include <process.h>
#else
volatile int threadid_count = 0;
int GenerateThreadId()
{
int i = ++threadid_count;
return i;
}
#endif
#define THREAD_RESERVE 5
SERVER_DECL CThreadPool ThreadPool;
CThreadPool::CThreadPool()
{
_threadsExitedSinceLastCheck = 0;
_threadsRequestedSinceLastCheck = 0;
_threadsEaten = 0;
_threadsFreedSinceLastCheck = 0;
}
bool CThreadPool::ThreadExit(Thread * t)
{
_mutex.Acquire();
// we're definitely no longer active
m_activeThreads.erase(t);
// do we have to kill off some threads?
if(_threadsToExit > 0)
{
// kill us.
--_threadsToExit;
++_threadsExitedSinceLastCheck;
if(t->DeleteAfterExit)
m_freeThreads.erase(t);
_mutex.Release();
delete t;
return false;
}
// enter the "suspended" pool
++_threadsExitedSinceLastCheck;
++_threadsEaten;
std::set<Thread*>::iterator itr = m_freeThreads.find(t);
if(itr != m_freeThreads.end())
{
printf("线程 %u 重复线程 %u\n", (*itr)->ControlInterface.GetId(), t->ControlInterface.GetId());
}
m_freeThreads.insert(t);
Log.Debug(" 线程池 ", "线程 %u 进入线程池.", t->ControlInterface.GetId());
_mutex.Release();
return true;
}
void CThreadPool::ExecuteTask(ThreadBase * ExecutionTarget)
{
Thread * t;
_mutex.Acquire();
++_threadsRequestedSinceLastCheck;
--_threadsEaten;
// grab one from the pool, if we have any.
if(m_freeThreads.size())
{
t = *m_freeThreads.begin();
m_freeThreads.erase(m_freeThreads.begin());
// execute the task on this thread.
t->ExecutionTarget = ExecutionTarget;
// resume the thread, and it should start working.
t->ControlInterface.Resume();
Log.Debug(" 线程池 ", "线程 %u 左线程池.", t->ControlInterface.GetId());
}
else
{
// creating a new thread means it heads straight to its task.
// no need to resume it :)
t = StartThread(ExecutionTarget);
}
// add the thread to the active set
#ifdef WIN32
Log.Debug(" 线程池 ", "线程 %u 现正执行任务 0x%p.", t->ControlInterface.GetId(), ExecutionTarget);
#else
Log.Debug(" 线程池 ", "线程 %u 现正执行任务 %p.", t->ControlInterface.GetId(), ExecutionTarget);
#endif
m_activeThreads.insert(t);
_mutex.Release();
}
void CThreadPool::Startup()
{
int i;
int tcount = THREAD_RESERVE;
for(i=0; i < tcount; ++i)
StartThread(NULL);
Log.Debug(" 线程池 ", "启动, 展开 %u 线程.", tcount);
}
void CThreadPool::ShowStats()
{
_mutex.Acquire();
Log.Debug(" 线程池 ", "============ 线程状态 =============");
Log.Debug(" 线程池 ", "积极线程: %u", m_activeThreads.size());
Log.Debug(" 线程池 ", "暂停线程: %u", m_freeThreads.size());
Log.Debug(" 线程池 ", "要求到释放的比例: %.3f%% (%u/%u)", float( float(_threadsRequestedSinceLastCheck+1) / float(_threadsExitedSinceLastCheck+1) * 100.0f ), _threadsRequestedSinceLastCheck, _threadsExitedSinceLastCheck);
Log.Debug(" 线程池 ", "Eaten Count: %d (negative is bad!)", _threadsEaten);
Log.Debug(" 线程池 ", "============================================");
_mutex.Release();
}
void CThreadPool::IntegrityCheck()
{
_mutex.Acquire();
int32 gobbled = _threadsEaten;
if(gobbled < 0)
{
// this means we requested more threads than we had in the pool last time.
// spawn "gobbled" + THREAD_RESERVE extra threads.
uint32 new_threads = abs(gobbled) + THREAD_RESERVE;
_threadsEaten=0;
for(uint32 i = 0; i < new_threads; ++i)
StartThread(NULL);
Log.Debug(" 线程池 ", "完整性检查: (gobbled < 0) 产生 %u 线程.", new_threads);
}
else if(gobbled < THREAD_RESERVE)
{
// this means while we didn't run out of threads, we were getting damn low.
// spawn enough threads to keep the reserve amount up.
uint32 new_threads = (THREAD_RESERVE - gobbled);
for(uint32 i = 0; i < new_threads; ++i)
StartThread(NULL);
Log.Debug(" 线程池 ", "完整性检查: (gobbled <= 5) 产生 %u 线程.", new_threads);
}
else if(gobbled > THREAD_RESERVE)
{
// this means we had "excess" threads sitting around doing nothing.
// lets kill some of them off.
uint32 kill_count = (gobbled - THREAD_RESERVE);
KillFreeThreads(kill_count);
_threadsEaten -= kill_count;
Log.Debug(" 线程池 ", "完整性检查: (gobbled > 5) 杀死 %u 线程.", kill_count);
}
else
{
// perfect! we have the ideal number of free threads.
Log.Debug(" 线程池 ", "完整性检查: 完整!");
}
/*if(m_freeThreads.size() < 5)
{
uint32 j = 5 - m_freeThreads.size();
Log.Debug(" 线程池 ", "Spawning %u threads.", j);
for(uint32 i = 0; i < j; ++i)
StartThread(NULL);
}*/
_threadsExitedSinceLastCheck = 0;
_threadsRequestedSinceLastCheck = 0;
_threadsFreedSinceLastCheck = 0;
_mutex.Release();
}
void CThreadPool::KillFreeThreads(uint32 count)
{
Log.Debug(" 线程池 ", "杀死 %u 多余线程.", count);
_mutex.Acquire();
Thread * t;
ThreadSet::iterator itr;
uint32 i;
for(i = 0, itr = m_freeThreads.begin(); i < count && itr != m_freeThreads.end(); ++i, ++itr)
{
t = *itr;
t->ExecutionTarget = NULL;
t->DeleteAfterExit = true;
++_threadsToExit;
t->ControlInterface.Resume();
}
_mutex.Release();
}
void CThreadPool::Shutdown()
{
_mutex.Acquire();
size_t tcount = m_activeThreads.size() + m_freeThreads.size(); // exit all
Log.Debug(" 线程池 ", "关闭 %u 线程.", tcount);
KillFreeThreads((uint32)m_freeThreads.size());
_threadsToExit += (uint32)m_activeThreads.size();
for(ThreadSet::iterator itr = m_activeThreads.begin(); itr != m_activeThreads.end(); ++itr)
{
if((*itr)->ExecutionTarget)
(*itr)->ExecutionTarget->OnShutdown();
}
_mutex.Release();
for(;;)
{
_mutex.Acquire();
if(m_activeThreads.size() || m_freeThreads.size())
{
Log.Debug(" 线程池 ", "%u 其余线程...",m_activeThreads.size() + m_freeThreads.size() );
_mutex.Release();
Sleep(1000);
continue;
}
break;
}
}
/* this is the only platform-specific code. neat, huh! */
#ifdef WIN32
static unsigned long WINAPI thread_proc(void* param)
{
Thread * t = (Thread*)param;
t->SetupMutex.Acquire();
uint32 tid = t->ControlInterface.GetId();
bool ht = (t->ExecutionTarget != NULL);
t->SetupMutex.Release();
//Log.Debug(" 线程池 ", "Thread %u started.", t->ControlInterface.GetId());
for(;;)
{
if(t->ExecutionTarget != NULL)
{
if(t->ExecutionTarget->run())
delete t->ExecutionTarget;
t->ExecutionTarget = NULL;
}
if(!ThreadPool.ThreadExit(t))
{
Log.Debug(" 线程池 ", "线程 %u 退出.", tid);
break;
}
else
{
if(ht)
Log.Debug(" 线程池 ", "线程 %u 等待的一个新任务.", tid);
// enter "suspended" state. when we return, the threadpool will either tell us to fuk off, or to execute a new task.
t->ControlInterface.Suspend();
// after resuming, this is where we will end up. start the loop again, check for tasks, then go back to the threadpool.
}
}
// at this point the t pointer has already been freed, so we can just cleanly exit.
ExitThread(0);
// not reached
return 0;
}
Thread * CThreadPool::StartThread(ThreadBase * ExecutionTarget)
{
HANDLE h;
Thread * t = new Thread;
t->DeleteAfterExit = false;
t->ExecutionTarget = ExecutionTarget;
//h = (HANDLE)_beginthreadex(NULL, 0, &thread_proc, (void*)t, 0, NULL);
t->SetupMutex.Acquire();
h = CreateThread(NULL, 0, &thread_proc, (LPVOID)t, 0, (LPDWORD)&t->ControlInterface.thread_id);
t->ControlInterface.Setup(h);
t->SetupMutex.Release();
return t;
}
#else
static void * thread_proc(void * param)
{
Thread * t = (Thread*)param;
t->SetupMutex.Acquire();
Log.Debug(" 线程池 ", "线程 %u 开始.", t->ControlInterface.GetId());
t->SetupMutex.Release();
for(;;)
{
if(t->ExecutionTarget != NULL)
{
if(t->ExecutionTarget->run())
delete t->ExecutionTarget;
t->ExecutionTarget = NULL;
}
if(!ThreadPool.ThreadExit(t))
break;
else
{
// enter "suspended" state. when we return, the threadpool will either tell us to fuk off, or to execute a new task.
t->ControlInterface.Suspend();
// after resuming, this is where we will end up. start the loop again, check for tasks, then go back to the threadpool.
}
}
pthread_exit(0);
}
Thread * CThreadPool::StartThread(ThreadBase * ExecutionTarget)
{
pthread_t target;
Thread * t = new Thread;
t->ExecutionTarget = ExecutionTarget;
t->DeleteAfterExit = false;
// lock the main mutex, to make sure id generation doesn't get messed up
_mutex.Acquire();
t->SetupMutex.Acquire();
pthread_create(&target, NULL, &thread_proc, (void*)t);
t->ControlInterface.Setup(target);
t->SetupMutex.Release();
_mutex.Release();
return t;
}
#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -