📄 osthread.cpp
字号:
/*
File: OSThread.cpp
Contains: Thread abstraction implementation
*/
#include "OSThread.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#ifndef __Win32__
#include <pthread.h>
#endif
#include "MyAssert.h"
#include "OS.h"
#include "LogInterface.h"
#include "Globle.h"
static Bool bDebug = FALSE;
OSMutex OSTask::m_mtxIDCalclator;
UInt32 OSTask::m_IDCalclator = 0;
OSTask::OSTask()
: CEvent(NULL)
{
m_mtxIDCalclator.Lock();
m_TaskID = ++m_IDCalclator;
m_mtxIDCalclator.Unlock();
m_isRunning = FALSE;
}
OSTask::~OSTask()
{
while(!m_MsgQueue.empty())
{
m_MsgQueue.pop();
}
}
//发送消息
Bool OSTask::PostMessage(UInt32 v_MsgID,CEvent *v_MsgRecvObj,Int32 v_Value,UInt32 v_PrioID)
{
OSMutexLocker MsgQueueLocker(&m_Mutex_MsgQueue);
MsgData msgData;
msgData.m_MsgID = v_MsgID;
msgData.m_MsgRecvObj = v_MsgRecvObj;
msgData.m_Value = v_Value;
msgData.m_PrioID = v_PrioID;
m_MsgQueue.push(msgData);
return TRUE;
}
//从消息队列中弹出消息
Bool OSTask::PopMessage(MsgData* v_msgData)
{
Assert(v_msgData);
OSMutexLocker MsgQueueLocker(&m_Mutex_MsgQueue);
if (!m_MsgQueue.empty())
{
*v_msgData = m_MsgQueue.top();
m_MsgQueue.pop();
return TRUE;
}
return FALSE;
}
UInt32 OSTask::GetMsgQueueSize( )
{
m_Mutex_MsgQueue.Lock();
UInt32 iCount = m_MsgQueue.size();
m_Mutex_MsgQueue.Unlock();
return iCount;
}
Bool OSTask::RunTask()
{
m_isRunningMutex.Lock();
Bool Ret = OSThreadPool::RegistTask(this);
if(Ret)
{
m_isRunning = TRUE;
}
else
{
ERR_LOGGER(LOG_RUN_LEVEL_LOW,LOG_MODULE_OS,-1,"RegistTask failed, maybe threadpool is full");
m_isRunningMutex.Unlock();
return FALSE;
}
m_isRunningMutex.Unlock();
return Ret;
}
void OSTask::StopTask()
{
m_isRunningMutex.Lock();
if(m_isRunning)
{
PostMessage(RFS_MSG_NORNALQUIT,NULL,0,k_prio_high);
while(m_isRunning)
m_isRunningCond.Wait(&m_isRunningMutex, 100);//等待结束
}
m_isRunningMutex.Unlock();
}
void OSTask::DeleteTask()
{
StopTask();
delete this;
}
void OSTask::DetachFromThread()
{
Reset();//复位
m_isRunningMutex.Lock();
m_isRunning = FALSE;
m_isRunningCond.Signal();
m_isRunningMutex.Unlock();
}
OSThread::OSThread(UInt32 v_StackSize)
{
m_run = FALSE;
m_busy = FALSE;
m_StackSize = v_StackSize;
m_Task = NULL;
fJoined = false;
m_IsTaskRun = FALSE;
}
OSThread::~OSThread()
{
Stop();
#ifdef __Win32__
::CloseHandle(m_EventHandle);
#elif __linux__
pthread_attr_destroy(&m_ThreadAttr);
#endif
}
Bool OSThread::Start()
{
m_run=TRUE;
#ifdef __Win32__
unsigned int theId = 0; // We don't care about the identifier
m_ThreadID = (HANDLE)_beginthreadex( NULL, // Inherit security
m_StackSize*1024, // Inherit stack size
OSThread::_Entry, // Entry function
(LPVOID)this, // Entry arg
0, // Begin executing immediately
&theId);
Assert(m_ThreadID != NULL);
if( NULL == m_ThreadID)
m_run=FALSE;
m_EventHandle = CreateEvent(NULL,FALSE,FALSE,NULL);
#else
pthread_attr_init(&m_ThreadAttr);
pthread_attr_setstacksize(&m_ThreadAttr,m_StackSize*1024);
int err = pthread_create((pthread_t*)&m_ThreadID, &m_ThreadAttr, _Entry, (void*)this);
Assert(err == 0);
if( 0 != err)
{
m_run=FALSE;
}
#endif
if(!m_run)
ERR_LOGGER(LOG_ERR_LEVEL_WARNING,LOG_MODULE_OS,-1,"Create thread failed");
return m_run;
}
Bool OSThread::SetTask(OSTask * v_Task)
{
Assert(v_Task != NULL);
m_Task = v_Task;
return TRUE;
}
void OSThread::Stop()
{
if(!m_run)
return;
m_run = FALSE; //必须放在前面, 保证Entry()能够退出循环
if (m_busy)
{
m_Task->StopTask();
}
else
{
Active();
}
if (!fJoined)
{
#ifdef WIN32
Join(m_EventHandle);
#else
Join(m_ThreadID);
#endif
}
}
Bool OSThread::Active()
{
m_Mutex_Run.Lock();
if(!m_busy)
{
m_busy = TRUE;
m_Cond_Run.Signal();
m_Mutex_Run.Unlock();
if(m_run)
{
m_IsTaskRunMutex.Lock();
while(!m_IsTaskRun)
{
m_IsTaskRunCond.Wait(&m_IsTaskRunMutex,100);
}
m_IsTaskRunMutex.Unlock();
Assert(m_IsTaskRun);
}
return TRUE;
}
m_Mutex_Run.Unlock();
return TRUE;
}
void* OSThread::Entry()
{
while (m_run)
{
m_Mutex_Run.Lock();
while(!m_busy)
{
m_Cond_Run.Wait(&m_Mutex_Run, 500);
}
m_Mutex_Run.Unlock();
m_IsTaskRunMutex.Lock();
m_IsTaskRun = TRUE;
m_IsTaskRunCond.Signal();
m_IsTaskRunMutex.Unlock();
if(m_Task != NULL)
{
m_Task->Run();
m_busy = FALSE;
m_Task->DetachFromThread();
m_Task = NULL;
OSThreadPool::ReclaimThread(this);
}
m_IsTaskRunMutex.Lock();
m_IsTaskRun = FALSE;
m_IsTaskRunMutex.Unlock();
}
return NULL;
}
#ifdef __Win32__
unsigned int WINAPI OSThread::_Entry(LPVOID inThread)
{
OSThread* theThread = (OSThread*)inThread;
// Run the thread
theThread->Entry();
theThread->SendQuitEvent();
return 0;
}
#else
void* OSThread::_Entry(void *inThread) //static
{
OSThread* theThread = (OSThread*)inThread;
// Run the thread
theThread->Entry();
return NULL;
}
#endif
Bool OSThread::Join(THREAD_ID hJoinedThreadID)
{
// What we're trying to do is allow the thread we want to delete to complete
// running. So we wait for it to stop.
Assert(!fJoined);
fJoined = true;
#ifdef __Win32__
DWORD theErr = ::WaitForSingleObject(hJoinedThreadID, INFINITE);
Assert(theErr == WAIT_OBJECT_0);
if(theErr == WAIT_FAILED)
{
return FALSE;
}
#else
void *retVal;
if(0 != pthread_join(hJoinedThreadID, &retVal))
return FALSE;
#endif
return TRUE;
}
#ifdef WIN32
void OSThread::SendQuitEvent()
{
::SetEvent(m_EventHandle);
}
#endif
OSThreadPool* g_ThreadPool;
list<OSThread*> OSThreadPool::m_ThreadList;
list<OSThread*> OSThreadPool::m_UseList;
list<OSThread*> OSThreadPool::m_UnuseList;
OSMutex OSThreadPool::m_ListMutex;
UInt32 OSThreadPool::m_ThreadNum = 0;
UInt32 OSThreadPool::m_StackSize = 512;
OSThreadPool* OSThreadPool::m_OSThreadPool;
OSThreadPool::OSThreadPool()
{
m_ThreadList.clear();
m_UseList.clear();
m_UnuseList.clear();
}
OSThreadPool::~OSThreadPool()
{
}
OSThreadPool* OSThreadPool::GetInstance()
{
if (m_OSThreadPool == NULL)
{
m_OSThreadPool = new OSThreadPool();
}
return m_OSThreadPool;
}
Bool OSThreadPool::DestroyInstance()
{
if (m_OSThreadPool != NULL)
{
while(!m_ThreadList.empty())
{
m_ThreadList.front()->Stop();
delete m_ThreadList.front();
m_ThreadList.pop_front();
}
m_ThreadList.clear();
m_UseList.clear();
m_UnuseList.clear();
delete m_OSThreadPool;
m_OSThreadPool = NULL;
}
return TRUE;
}
Bool OSThreadPool::Initialize(UInt32 v_ThreadNum,UInt32 v_StackSize)
{
m_StackSize = (v_StackSize < 16) ? 16 : v_StackSize;
#ifdef __BC__
return TRUE;
#endif
OSMutexLocker Locker(&m_ListMutex);
m_ThreadNum = v_ThreadNum;
UInt32 iCurrentThreadNum = (m_ThreadNum > 100) ? 100 : m_ThreadNum;
for (UInt32 i = 0; i < iCurrentThreadNum; i++)
{
OSThread* pThread = new OSThread(v_StackSize);
if (pThread == NULL)
{
return FALSE;
}
m_ThreadList.push_back(pThread);
m_UnuseList.push_back(pThread);
pThread->Start();
}
return TRUE;
}
Bool OSThreadPool::RegistTask(OSTask* v_Task)
{
Assert(v_Task != NULL);
OSMutexLocker Locker(&m_ListMutex);
if (v_Task != NULL)
{
//check if need to allocate new thread
if( m_UnuseList.size() <=0)
{
#ifdef __BC__
OSThread* pThread = new OSThread(m_StackSize);
if (pThread == NULL) return FALSE;
m_ThreadList.push_back(pThread);
m_UnuseList.push_back(pThread);
pThread->Start();
#else
if( m_UseList.size() >= m_ThreadNum)
{
char buffer[100];
sprintf(buffer,"ThreadPool Full,Max :%d,Used:%d,Unused:%d",m_ThreadNum,m_UseList.size(),m_UnuseList.size());
ERR_LOGGER(LOG_RUN_LEVEL_LOW,LOG_MODULE_OS,-1,buffer);
}
else
{
//allocate 25% threads
UInt32 iNum = (m_ThreadNum - m_UseList.size())/4 + (m_ThreadNum - m_UseList.size())%4;
for (UInt32 i = 0; i < iNum; i++)
{
OSThread *pThread = new OSThread(m_StackSize);
if (pThread == NULL) return FALSE;
m_ThreadList.push_back(pThread);
m_UnuseList.push_back(pThread);
pThread->Start();
}
{
char buffer[100];
sprintf(buffer,"ThreadPool status,Max :%d,Used:%d,Unused:%d",m_ThreadNum,m_UseList.size(),m_UnuseList.size());
RUN_LOGGER2(buffer);
}
}
#endif
}
if(bDebug) PrintUsedThreadInfo();
//Assign thread to task.
if (m_UnuseList.size() > 0)
{
OSThread* pThread = m_UnuseList.front();
m_UnuseList.erase(m_UnuseList.begin());
if (pThread != NULL)
{
pThread->SetTask(v_Task);
m_UseList.push_back(pThread);
if(pThread->Active())
return TRUE;
else
{
//log
char buffer[100];
sprintf(buffer,"ThreadPool [Total:Run:Idle]=[%d:%d:%d]",m_ThreadNum,m_UseList.size(),m_UnuseList.size());
RUN_LOGGER(LOG_RUN_LEVEL_LOW,LOG_MODULE_OS,buffer);
return FALSE;
}
}
}
}
return FALSE;
}
Bool OSThreadPool::ReclaimThread(OSThread * v_Thread)
{
Assert(v_Thread);
OSMutexLocker Locker(&m_ListMutex);
for (list<OSThread*>::iterator it = m_UseList.begin(); it != m_UseList.end();++it)
{
if (v_Thread == *it)
{
m_UnuseList.push_front(v_Thread);
m_UseList.erase(it);
return TRUE;
}
}
return FALSE;
}
void OSThreadPool::PrintUsedThreadInfo( )
{
UInt32 i=0;
printf("\n------------------------->[Total:Run:Idle]=[%d:%d:%d]<-------------------------\n",m_ThreadNum,m_UseList.size(),m_UnuseList.size());
for (list<OSThread*>::iterator it = m_UseList.begin(); it != m_UseList.end();++it)
{
printf("[%d]",(*it)->GetThreadID());
printf("\t");
if(++i%5 == 0) printf("\n");
}
printf("\n");
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -