📄 rpgcmdprocess.cc
字号:
//RPGCmdProcess.cc
/*/////////////////////////////////////////////////////////////////
李亦
liease@163.com 4040719
2006-7-20
/*/////////////////////////////////////////////////////////////////
#include "rpg/cmd/RPGCmdProcess.h"
#include "console/console.h"
#define RPGCMD_DELAY 20
#define RPGCMD_EXPIRE 3000000*1000
#define RPGCMD_TRY 5
namespace RPG
{
RPGCmdProcess* g_pRPGCommand = NULL;
//static RPGCmdProcess gs_RPGCommand;
FreeListChunker<GCMDTASK> RPGCmdProcess::ms_ParamPool;
MutexInstance RPGCmdProcess::ms_mutexParamPool;
//IMPLEMENT_CONOBJECT(RPGCmdProcess);
/////////////////////////////////////////////////////////////////
// class RPGCmdProcess 构造函数 /析构函数
RPGCmdProcess::RPGCmdProcess()
:m_taskQueue(QUEUE_SIZE,QUEUE_GROW)
,m_sendingQueue(0)
,m_resultQueue(QUEUE_SIZE,QUEUE_GROW)
#ifdef RPGCMD_USE_THREAD
,Thread(0,0,FALSE)
#endif
{
m_dwRunDelay = RPGCMD_DELAY;
m_dwExpireTime = RPGCMD_EXPIRE;
m_dwTryTimes = RPGCMD_TRY;
m_pCmdParam = NULL;
m_bRunning = TRUE;
m_bProcessResult = TRUE;
//m_dwTaskIdle = FALSE;
//m_bSendingIdle = FALSE;
//m_bResultIdle = FALSE;
//m_pClassName = "RPGCmdProcess";
}
RPGCmdProcess::~RPGCmdProcess()
{
m_bRunning = FALSE;
}
//#ifdef RPGCMD_USE_THREAD
void RPGCmdProcess::run(S32 arg)
//#else
//void RPGCmdProcess::Process()
//#endif
{
//m_dwTimer = 0;
#ifdef RPGCMD_USE_THREAD
Thread::prioIDLE();
#else
if(!m_bRunning)
return;
#endif
GCMDTASK* pTask;
U32 dwTime;
S32 n;
S32 nOperating;
m_mutexInst.lock(true);
SetWorkingIdleState();
m_mutexInst.unlock();
#ifdef RPGCMD_USE_THREAD
for(;m_bRunning;)
#endif
{
dwTime = Platform::getRealMilliseconds();
//////////////////////////////////////////
//派发指令任务到各DBAccess
if(m_mutexInst.lock(CMDPRO_WAIT))
{
pTask = m_taskQueue.Shift();
m_mutexInst.unlock();
}
else
pTask = NULL;
if(pTask)
{
//发送成功,则加到发送队列中,并开始超时计时
nOperating = OnSendingTask(pTask);
if(nOperating != TO_FAILED)
{
pTask->dwExpire = dwTime;
m_sendingQueue.push_back(pTask);
SetIdleState(TIS_SENDING);
}
//发送失败,则累计重试次数,并放在队列尾部
else if(pTask->dwTrys < m_dwTryTimes)
{
pTask->dwTrys++;
m_mutexInst.lock(true);
m_taskQueue.Push(pTask);
m_mutexInst.unlock();
}
//任务失败
else
{
OnTaskFinished(pTask,TS_FAILED);
FreeCmdTask(pTask);
}
m_mutexInst.lock(true);
SetIdleState(TIS_TASK,!m_taskQueue.IsEmpty());
m_mutexInst.unlock();
}
//////////////////////////////////////////
//检测发送中任务是否超时
if(m_sendingQueue.size())
{
for(n=m_sendingQueue.size()-1; n >= 0; n--)
{
pTask = m_sendingQueue[n];
if(pTask == NULL)
{
m_sendingQueue.erase(n);
continue;
}
if(dwTime - pTask->dwExpire >= m_dwExpireTime)
{
OnTaskFinished(pTask,TS_EXPIRED);
m_sendingQueue.erase(n);
FreeCmdTask(pTask);
}
}
SetIdleState(TIS_SENDING,m_sendingQueue.size());
}
//////////////////////////////////////////
//把指令结果
if(m_bProcessResult)
{
if(m_mutexInst.lock(CMDPRO_WAIT))
{
pTask = m_resultQueue.Shift();
m_mutexInst.unlock();
}
else
pTask = NULL;
if(pTask)
{
nOperating = OnRespondeTask(pTask);
if(nOperating != TO_FAILED)
{
Con::printf(" Result Task %d",pTask->dwCmd);
//RemoveSendingTask(pTask);
OnTaskFinished(pTask,TS_RESULTOK);
if(nOperating == TO_OK)
FreeCmdTask(pTask);
}
//响应失败,则累计重试次数,并放在队列尾部
else if(pTask->dwTrys < m_dwTryTimes)
{
pTask->dwTrys++;
m_mutexInst.lock(true);
m_resultQueue.Push(pTask);
m_mutexInst.unlock();
}
//任务失败
else
{
RemoveSendingTask(pTask);
OnTaskFinished(pTask,TS_RESULTFAILED);
FreeCmdTask(pTask);
}
m_mutexInst.lock(true);
SetIdleState(TIS_RESULT, !m_resultQueue.IsEmpty());
m_mutexInst.unlock();
}//if(pTask)
}//if(m_bProcessResult)
if(m_mutexInst.lock(CMDPRO_WAIT))
{
if( NeedIdle())
{
#ifdef RPGCMD_USE_THREAD
Thread::prioIDLE();
#endif
SetWorkingIdleState();
}
m_mutexInst.unlock();
}//if
#ifdef RPGCMD_USE_THREAD
Platform::sleep(m_dwRunDelay);
#endif
}//for
#ifdef RPGCMD_USE_THREAD
OnStopProcess();
#endif
}
//BOOL RPGCmdProcess::CheckCmd(U32 dwCmd)
//{
// return TRUE;
//}
BOOL RPGCmdProcess::CheckSendingTask(GCMDTASK* pTask)
{
AssertWarn(pTask,"任务不能为NULL");
U32 n;
GCMDTASK* pSending;
for(n=0; n < m_sendingQueue.size(); n++)
{
pSending = m_sendingQueue[n];
if(*pSending == *pTask)
return TRUE;
}
return FALSE;
}
BOOL RPGCmdProcess::CheckSendingTask(U32 dwCmd,U32 dwParam1,U32 dwParam2)
{
U32 n;
GCMDTASK* pSending;
for(n=0; n < m_sendingQueue.size(); n++)
{
pSending = m_sendingQueue[n];
if(pSending->EqualIt(dwCmd,dwParam1,dwParam2))
return TRUE;
}
return FALSE;
}
void RPGCmdProcess::RemoveSendingTask(GCMDTASK* pTask)
{
AssertWarn(pTask,"任务不能为NULL");
S32 n;
GCMDTASK* pSending;
for(n=m_sendingQueue.size()-1; n >= 0 ; n--)
{
pSending = m_sendingQueue[n];
if(*pSending == *pTask)
{
m_sendingQueue.erase(n);
}
}
}
BOOL RPGCmdProcess::GhostTask(GCMDTASK* pTask)
{
AssertWarn(pTask,"pTask不能为空");
if(!CheckCmd(pTask->dwCmd))
return FALSE;
GCMDTASK* pParam = AllocCmdTask();
dMemcpy(pParam,pTask,sizeof(GCMDTASK));
m_mutexInst.lock(true);
m_taskQueue.Push(pTask);
#ifdef RPGCMD_USE_THREAD
Thread::prioHIGHEST();
//Thread::prioNORMAL();
#endif
SetIdleState(TIS_TASK);
m_mutexInst.unlock();
return TRUE;
}
BOOL RPGCmdProcess::AddTask(GCMDTASK* pTask)
{
AssertWarn(pTask,"pTask不能为空");
if(!CheckCmd(pTask->dwCmd))
return FALSE;
if(CheckSendingTask(pTask))
return FALSE;
m_mutexInst.lock(true);
m_taskQueue.Push(pTask);
#ifdef RPGCMD_USE_THREAD
Thread::prioHIGHEST();
//Thread::prioNORMAL();
#endif
SetIdleState(TIS_TASK);
m_mutexInst.unlock();
return TRUE;
}
BOOL RPGCmdProcess::AddTask(U32 dwCmd,U32 dwParam1,U32 dwParam2)
{
if(!CheckCmd(dwCmd))
return FALSE;
if(CheckSendingTask(dwCmd, dwParam1, dwParam2))
return FALSE;
GCMDTASK* pParam = AllocCmdTask();
pParam->dwCmd = dwCmd;
pParam->unParam1.dwData = dwParam1;
pParam->unParam2.dwData = dwParam2;
m_mutexInst.lock(true);
m_taskQueue.Push(pParam);
#ifdef RPGCMD_USE_THREAD
Thread::prioHIGHEST();
//Thread::prioNORMAL();
#endif
SetIdleState(TIS_TASK);
m_mutexInst.unlock();
return TRUE;
}
BOOL RPGCmdProcess::ResultTask(GCMDTASK* pTask)
{
AssertWarn(pTask,"pTask不能为空");
if(!CheckCmd(pTask->dwCmd))
return FALSE;
m_mutexInst.lock(true);
m_resultQueue.Push(pTask);
RemoveSendingTask(pTask);
#ifdef RPGCMD_USE_THREAD
Thread::prioNORMAL();
#endif
SetIdleState(TIS_RESULT);
m_mutexInst.unlock();
return TRUE;
}
BOOL RPGCmdProcess::ResultTask(U32 dwCmd,U32 dwParam1,U32 dwParam2)
{
if(!CheckCmd(dwCmd))
return FALSE;
GCMDTASK* pParam = AllocCmdTask();
pParam->dwCmd = dwCmd;
pParam->unParam1.dwData = dwParam1;
pParam->unParam2.dwData = dwParam2;
m_mutexInst.lock(true);
m_resultQueue.Push(pParam);
#ifdef RPGCMD_USE_THREAD
Thread::prioNORMAL();
#endif
SetIdleState(TIS_RESULT);
m_mutexInst.unlock();
return TRUE;
}
GCMDTASK* RPGCmdProcess::BeginTask(U32 dwCmd)
{
if(!CheckCmd(dwCmd))
return NULL;
AssertWarn(m_pCmdParam == NULL,"最近一次BeginCmd后,没有调用EndCmd");
if(m_pCmdParam)
EndTask();
GCMDTASK* pParam = AllocCmdTask();
pParam->dwCmd = dwCmd;
m_pCmdParam = pParam;
return pParam;
}
BOOL RPGCmdProcess::EndTask()
{
if(CheckSendingTask(m_pCmdParam))
{
m_pCmdParam = NULL;
return FALSE;
}
m_mutexInst.lock(true);
m_taskQueue.Push(m_pCmdParam);
#ifdef RPGCMD_USE_THREAD
Thread::prioNORMAL();
#endif
SetIdleState(TIS_TASK);
m_mutexInst.unlock();
m_pCmdParam = NULL;
return TRUE;
}
};//namespace RPG
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -