⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 osthread.cpp

📁 跨操作系统的微型中间件
💻 CPP
字号:
/*
    File:       OSThread.cpp

    Contains:   Thread abstraction implementation
   
*/
#include "OSThread.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#ifndef __Win32__
#include <pthread.h>
#endif

#include "MyAssert.h"
#include "OS.h"
#include "LogInterface.h"
#include "Globle.h"

static Bool bDebug = FALSE;

OSMutex	OSTask::m_mtxIDCalclator;
UInt32 OSTask::m_IDCalclator = 0;

OSTask::OSTask()
: CEvent(NULL)
{
	m_mtxIDCalclator.Lock();
	m_TaskID = ++m_IDCalclator;
	m_mtxIDCalclator.Unlock();
	
	m_isRunning = FALSE;
}

OSTask::~OSTask()
{
	while(!m_MsgQueue.empty())
	{
		m_MsgQueue.pop();
	}
}


//发送消息
Bool OSTask::PostMessage(UInt32 v_MsgID,CEvent *v_MsgRecvObj,Int32 v_Value,UInt32 v_PrioID)
{
	OSMutexLocker MsgQueueLocker(&m_Mutex_MsgQueue);
	MsgData msgData;
	msgData.m_MsgID = v_MsgID;
	msgData.m_MsgRecvObj = v_MsgRecvObj;
	msgData.m_Value = v_Value;
	msgData.m_PrioID = v_PrioID;
	m_MsgQueue.push(msgData);
	return TRUE;
}


//从消息队列中弹出消息
Bool OSTask::PopMessage(MsgData* v_msgData)
{
	Assert(v_msgData);
	
	OSMutexLocker MsgQueueLocker(&m_Mutex_MsgQueue);
	if (!m_MsgQueue.empty())
	{
		*v_msgData = m_MsgQueue.top();
		m_MsgQueue.pop();
		return TRUE;
	}
	return FALSE;
}

UInt32 OSTask::GetMsgQueueSize( )
{
	m_Mutex_MsgQueue.Lock();
	UInt32 iCount = m_MsgQueue.size();
	m_Mutex_MsgQueue.Unlock();
	return iCount;
}

Bool OSTask::RunTask()
{
	m_isRunningMutex.Lock();
	Bool Ret = OSThreadPool::RegistTask(this);
	if(Ret)	
	{
		m_isRunning = TRUE;
	}
	else 
	{
		ERR_LOGGER(LOG_RUN_LEVEL_LOW,LOG_MODULE_OS,-1,"RegistTask failed, maybe threadpool is full");
		m_isRunningMutex.Unlock();
		return FALSE;
	}
	m_isRunningMutex.Unlock();
	return Ret;
}

void OSTask::StopTask()
{
	m_isRunningMutex.Lock();
	if(m_isRunning)
	{
		PostMessage(RFS_MSG_NORNALQUIT,NULL,0,k_prio_high);

		while(m_isRunning)
			m_isRunningCond.Wait(&m_isRunningMutex, 100);//等待结束
	}
	m_isRunningMutex.Unlock();
	
}

void OSTask::DeleteTask()
{
	StopTask();

	delete this;
}

void OSTask::DetachFromThread()
{
	Reset();//复位

 	m_isRunningMutex.Lock();
	m_isRunning = FALSE;
	m_isRunningCond.Signal();
	m_isRunningMutex.Unlock();
}

OSThread::OSThread(UInt32 v_StackSize)
{
	m_run = FALSE;
	m_busy = FALSE;
	m_StackSize = v_StackSize;
	m_Task = NULL;
	fJoined = false;
	m_IsTaskRun = FALSE;
}



OSThread::~OSThread()
{
	Stop();
#ifdef __Win32__
	::CloseHandle(m_EventHandle);
#elif __linux__
	pthread_attr_destroy(&m_ThreadAttr);
#endif

}


Bool OSThread::Start()
{
	m_run=TRUE;
#ifdef __Win32__
	unsigned int theId = 0; // We don't care about the identifier
	m_ThreadID = (HANDLE)_beginthreadex( NULL,   // Inherit security
 								m_StackSize*1024,      // Inherit stack size
								OSThread::_Entry, // Entry function
                                (LPVOID)this,    // Entry arg
                                0,      // Begin executing immediately
                                &theId);


	Assert(m_ThreadID != NULL);
	if( NULL == m_ThreadID) 
		m_run=FALSE;

	m_EventHandle = CreateEvent(NULL,FALSE,FALSE,NULL);
#else
	pthread_attr_init(&m_ThreadAttr);
	pthread_attr_setstacksize(&m_ThreadAttr,m_StackSize*1024);
	int err = pthread_create((pthread_t*)&m_ThreadID, &m_ThreadAttr, _Entry, (void*)this);
	Assert(err == 0);
	if( 0 != err) 
	{
		m_run=FALSE;
	}

#endif

	if(!m_run) 
		ERR_LOGGER(LOG_ERR_LEVEL_WARNING,LOG_MODULE_OS,-1,"Create thread failed");
	return m_run;
}

Bool OSThread::SetTask(OSTask * v_Task)
{
	Assert(v_Task != NULL);
	m_Task = v_Task;
	return TRUE;
}

void OSThread::Stop()
{
	if(!m_run)
		return;

	m_run = FALSE; //必须放在前面, 保证Entry()能够退出循环

	if (m_busy)
	{
		m_Task->StopTask();
	}
	else
	{
		Active();
	}
	
	if (!fJoined)
	{
#ifdef WIN32
		Join(m_EventHandle);
#else
		Join(m_ThreadID);
#endif
	}
}

Bool OSThread::Active()
{
	m_Mutex_Run.Lock();
	if(!m_busy)
	{
		m_busy = TRUE;
		m_Cond_Run.Signal();
		m_Mutex_Run.Unlock();

		if(m_run)
		{
			m_IsTaskRunMutex.Lock();
			while(!m_IsTaskRun)
			{
				m_IsTaskRunCond.Wait(&m_IsTaskRunMutex,100);
			}
			m_IsTaskRunMutex.Unlock();

			Assert(m_IsTaskRun);
		}
		return TRUE;
	}
	m_Mutex_Run.Unlock();
	return TRUE;
}


void* OSThread::Entry()
{
	while (m_run)
	{
		m_Mutex_Run.Lock();
		while(!m_busy)
		{
			m_Cond_Run.Wait(&m_Mutex_Run, 500);
		}
		m_Mutex_Run.Unlock();


		m_IsTaskRunMutex.Lock();
		m_IsTaskRun = TRUE;
		m_IsTaskRunCond.Signal();
		m_IsTaskRunMutex.Unlock();

		if(m_Task != NULL)
		{
			m_Task->Run();
			m_busy = FALSE;
			m_Task->DetachFromThread();
			m_Task = NULL;
			
			OSThreadPool::ReclaimThread(this);
		}

		m_IsTaskRunMutex.Lock();
		m_IsTaskRun = FALSE;
		m_IsTaskRunMutex.Unlock();
	}

	return NULL;
}

#ifdef __Win32__
unsigned int WINAPI OSThread::_Entry(LPVOID inThread)
{
    OSThread* theThread = (OSThread*)inThread;
	
    // Run the thread
    theThread->Entry();
	
	theThread->SendQuitEvent();
    return 0;
}

#else
void* OSThread::_Entry(void *inThread)  //static
{
    OSThread* theThread = (OSThread*)inThread;
	
    // Run the thread
    theThread->Entry();
    return NULL;
}
#endif

Bool OSThread::Join(THREAD_ID hJoinedThreadID)
{
    // What we're trying to do is allow the thread we want to delete to complete
    // running. So we wait for it to stop.
    Assert(!fJoined);
    fJoined = true;
	
#ifdef __Win32__
    DWORD theErr = ::WaitForSingleObject(hJoinedThreadID, INFINITE);
    Assert(theErr == WAIT_OBJECT_0);
	if(theErr == WAIT_FAILED)
	{
		return FALSE;
	}
#else
    void *retVal;
    if(0 != pthread_join(hJoinedThreadID, &retVal))
		return FALSE;
#endif
	
	return TRUE;
}

#ifdef WIN32
void OSThread::SendQuitEvent()
{
	::SetEvent(m_EventHandle);
}
#endif


OSThreadPool* g_ThreadPool;

list<OSThread*> OSThreadPool::m_ThreadList;
list<OSThread*> OSThreadPool::m_UseList;
list<OSThread*> OSThreadPool::m_UnuseList;
OSMutex OSThreadPool::m_ListMutex;
UInt32 OSThreadPool::m_ThreadNum = 0;
UInt32 OSThreadPool::m_StackSize = 512;


OSThreadPool* OSThreadPool::m_OSThreadPool;



OSThreadPool::OSThreadPool()
{
	m_ThreadList.clear();
	m_UseList.clear();
	m_UnuseList.clear();
}

OSThreadPool::~OSThreadPool()
{

}

OSThreadPool* OSThreadPool::GetInstance()
{
	if (m_OSThreadPool == NULL)
	{
		m_OSThreadPool = new OSThreadPool();
	}
	return m_OSThreadPool;
}

Bool OSThreadPool::DestroyInstance()
{
	if (m_OSThreadPool != NULL)
	{
		while(!m_ThreadList.empty())
		{
			m_ThreadList.front()->Stop();
			delete m_ThreadList.front();
			m_ThreadList.pop_front();
		}
		m_ThreadList.clear();
		m_UseList.clear();
		m_UnuseList.clear();
		delete m_OSThreadPool;
		m_OSThreadPool = NULL;
	}
	return TRUE;
}

Bool OSThreadPool::Initialize(UInt32 v_ThreadNum,UInt32 v_StackSize)
{
	m_StackSize = (v_StackSize < 16) ? 16 : v_StackSize;

#ifdef __BC__
		return TRUE;
#endif

	OSMutexLocker Locker(&m_ListMutex);
	m_ThreadNum = v_ThreadNum;
	UInt32 iCurrentThreadNum = (m_ThreadNum > 100) ? 100 : m_ThreadNum;
	for (UInt32 i = 0; i < iCurrentThreadNum; i++)
	{
		OSThread* pThread = new OSThread(v_StackSize);
		if (pThread == NULL)
		{
			return FALSE;
		}
		m_ThreadList.push_back(pThread);
		m_UnuseList.push_back(pThread);
		pThread->Start();
	}
	return TRUE;
}

Bool OSThreadPool::RegistTask(OSTask* v_Task)
{
	Assert(v_Task != NULL);
	
	OSMutexLocker Locker(&m_ListMutex);
	if (v_Task != NULL)
	{
		//check if need to allocate new thread
		if( m_UnuseList.size() <=0) 
		{
#ifdef __BC__
			OSThread* pThread = new OSThread(m_StackSize);
			if (pThread == NULL) 	return FALSE;
			m_ThreadList.push_back(pThread);
			m_UnuseList.push_back(pThread);
			pThread->Start();
#else
			if( m_UseList.size() >= m_ThreadNum) 
			{
				char buffer[100];
				sprintf(buffer,"ThreadPool Full,Max :%d,Used:%d,Unused:%d",m_ThreadNum,m_UseList.size(),m_UnuseList.size());
				ERR_LOGGER(LOG_RUN_LEVEL_LOW,LOG_MODULE_OS,-1,buffer);
			}
			else 
			{
				//allocate 25% threads
				UInt32 iNum = (m_ThreadNum - m_UseList.size())/4 + (m_ThreadNum - m_UseList.size())%4;
				for (UInt32 i = 0; i < iNum; i++) 
				{
					OSThread *pThread = new OSThread(m_StackSize);
					if (pThread == NULL) 	return FALSE;
					m_ThreadList.push_back(pThread);
					m_UnuseList.push_back(pThread);
					pThread->Start();
				}
				
				{
					char buffer[100];
					sprintf(buffer,"ThreadPool status,Max :%d,Used:%d,Unused:%d",m_ThreadNum,m_UseList.size(),m_UnuseList.size());
					RUN_LOGGER2(buffer);
				}
			}
#endif
		}
		
		if(bDebug) PrintUsedThreadInfo();

		//Assign thread to task.
		if (m_UnuseList.size() > 0)
		{
			OSThread* pThread = m_UnuseList.front();
			m_UnuseList.erase(m_UnuseList.begin());
			if (pThread != NULL)
			{
				pThread->SetTask(v_Task);
				m_UseList.push_back(pThread);
				if(pThread->Active())
					return TRUE;
				else
				{			
				
					//log
					char buffer[100];
					sprintf(buffer,"ThreadPool [Total:Run:Idle]=[%d:%d:%d]",m_ThreadNum,m_UseList.size(),m_UnuseList.size());
					RUN_LOGGER(LOG_RUN_LEVEL_LOW,LOG_MODULE_OS,buffer);
					return FALSE;
				}
			}
		}
	}
	return FALSE;
}

Bool OSThreadPool::ReclaimThread(OSThread * v_Thread)
{
	Assert(v_Thread);
	OSMutexLocker Locker(&m_ListMutex);
	for (list<OSThread*>::iterator it = m_UseList.begin(); it != m_UseList.end();++it)
	{
		if (v_Thread == *it)
		{
			m_UnuseList.push_front(v_Thread);
			m_UseList.erase(it);
			return TRUE;
		}
	}
	return FALSE;
}

void OSThreadPool::PrintUsedThreadInfo( )
{
	UInt32 i=0;
	printf("\n------------------------->[Total:Run:Idle]=[%d:%d:%d]<-------------------------\n",m_ThreadNum,m_UseList.size(),m_UnuseList.size());
	for (list<OSThread*>::iterator it = m_UseList.begin(); it != m_UseList.end();++it)
	{
		printf("[%d]",(*it)->GetThreadID());
		printf("\t");
		if(++i%5 == 0) printf("\n");
	}	
	printf("\n");
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -