📄 cmworkthread.cpp
字号:
#include "MatrixCore/Network/CMWorkThread.h"
#include <MatrixCore/Network/CMMemPooler.h>
#include <MatrixCore/System/CMDebug.h>
#include <MatrixCore/Network/CMIocpMudule.h>
#include <MatrixCore/Network/CMPacket.h>
#include <MatrixCore/Pattern/CMSingleton.h>
#include <MatrixCore/Network/CMOverlapped.h>
using namespace MatrixCore::Pattern;
//CMSingleton<CMDebug>::getInstance()
//#include "SessionManager.h"
using namespace MatrixCore::System;
using namespace MatrixCore::Network;
bool CMWorkerThread::m_stThreadLoop = true;
/////////////////////////////////////////////////////////////////////////////////////////////////////////
// 积己磊 & 家戈磊.
/////////////////////////////////////////////////////////////////////////////////////////////////////////
CMWorkerThread::CMWorkerThread(LPTHREAD_START_ROUTINE StartRoutine)
{
CMThread::m_StartRoutine = StartRoutine;
m_iTotalRecvSize = 0;
m_Count = 1;
m_iReadSize = 0;
m_pQueue = new CMPacket;
}
CMWorkerThread::~CMWorkerThread()
{
m_stThreadLoop = false;
delete m_pQueue;
// WaitForSingleObject( handleThread, 1000 );
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////////
//-------------------------------------------------------------------------------------------------------
// Name :: void run()
// Create Date :: 2003/12/30
// Description :: 况目 静饭靛 橇肺技胶.
// param ::
// Return Value :: void 鸥涝
// Bug Report ::
//-------------------------------------------------------------------------------------------------------
void CMWorkerThread::run()
{
DWORD bytesTransfer, keyValue;
CMOVERLAPPED *overlapped;
BOOL retVal;
HANDLE hIOCP = CMIocpMudule::GetInstance()->GetWorkerIOCPHandle();
while( m_stThreadLoop == true )
{
retVal = ::GetQueuedCompletionStatus( hIOCP, &bytesTransfer,
&keyValue, (OVERLAPPED **)&overlapped, INFINITE );
if( retVal == TRUE && bytesTransfer > 0 || overlapped->eState == eACCEPT)
{
overlapped->dwBytesTransfer = bytesTransfer;
//单捞鸥甫 罐疽促.
DoIo( *overlapped );
}
else
{
if ( overlapped == 0 ) // the IOCP op failed
{
// no key, no byte count, no overlapped info available!
// how do I handle this?
// by ignoring it. I don't even have a socket handle to close.
// You might want to abort instead, as something is seriously
// wrong if we get here.
}
else
{
// key, byte count, and overlapped are valid.
// tear down the connection and requeue the socket
// (unless the listener socket is closed, in which case
// this is a failed AcceptEx(), and we stop accepting)
DoClose( *overlapped, TRUE,
CMIocpMudule::GetInstance()->GetListenSocket() == INVALID_SOCKET? false: true );
}
}
}
}
//-------------------------------------------------------------------------------------------------------
// Name :: void DoIo(CMOVERLAPPED &OV, CMPackQueue *pQueue,BOOL bQueueOption)
// Create Date :: 2003/12/30
// Description :: 庆歹客 单捞鸥 菩哦阑 盒籍窍绊 磊弗促.
// param ::
// CMOVERLAPPED &OV : 坷滚乏 函荐甫 罐绰促.
// Return Value :: void 鸥涝
// Bug Report ::
// 2004/03/07 : 府矫宏 磊福绰 何盒 荐沥.(钮 可记..何盒.)
//-------------------------------------------------------------------------------------------------------
BOOL CMWorkerThread::DoIo(CMOVERLAPPED &OV, CMPackQueue *pQueue,BOOL bQueueOption)
{
int locallen, remotelen;
sockaddr_in *pLocal = 0, *pRemote = 0;
unsigned short sDataSize = 0;
unsigned short sReplayID = 0;
int err;
int iCnt;
switch ( OV.eState )
{
case eACCEPT: // Accetp process
{
SOCKET ListenSock;
GetAcceptExSockaddrs( &OV.pRecvContext->AcceptBuf[0], 0,
OV.addrlen, OV.addrlen, (sockaddr **) &pLocal, &locallen,
(sockaddr **) &pRemote, &remotelen );
memcpy( &OV.Local, pLocal, sizeof sockaddr_in );
memcpy( &OV.ClientAddr, pRemote, sizeof sockaddr_in );
ListenSock = CMIocpMudule::GetInstance()->GetListenSocket();
err = setsockopt( OV.s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *)&ListenSock, sizeof SOCKET );
//getsockopt(OV.s,SOL_SOCKET, SO_SNDBUF,(char*)&zero,&zero1);
// IOCP俊 家南 勤甸阑 楷搬秦霖促.
HANDLE hIOCP = CreateIoCompletionPort( (HANDLE) OV.s, CMIocpMudule::GetInstance()->GetWorkerIOCPHandle(),(ULONG_PTR)&OV.pKeyComplete, 0 );
// 荤侩磊 立加阑 墨款磐 茄促.
InterlockedIncrement(&CMIocpMudule::GetInstance()->m_UserCount);
PostRecv( OV);
}
break;
case eREAD: // Recv process
/*DoCommand();*/
_OnRead(OV, pQueue);
break;
case eSEND: // Send process
{
CMMemPooler<CMOVERLAPPED> *pPool;
if(OV.SendIoContext.iSendCnt == NULL)
{
if(OV.sendBufferDescriptor.buf != NULL)
{
delete OV.sendBufferDescriptor.buf;
OV.sendBufferDescriptor.buf = NULL;
}
}
else
{
iCnt = InterlockedDecrement(OV.SendIoContext.iSendCnt);
if( 0 >= iCnt)
{
if(OV.sendBufferDescriptor.buf != NULL)
{
delete OV.SendIoContext.iSendCnt;
delete OV.sendBufferDescriptor.buf;
OV.SendIoContext.iSendCnt = NULL;
OV.sendBufferDescriptor.buf = NULL;
}
}
}
pPool = (CMMemPooler<CMOVERLAPPED> * )OV.SendIoContext.pThisPoint;
pPool->Free(&OV) ;
/*if(pPool->Free(&OV) == FALSE)
OutPut_Message("甸绢棵荐 绝绰 风凭捞促.季靛 皋葛府 秦力角菩");*/
//m_Count++;
}
break;
default:
//俊矾?
break;
}
return TRUE;
}
//-------------------------------------------------------------------------------------------------------
// Name :: DoClose( CMOVERLAPPED &OV, bool force /* = false */, bool listenAgain /* = true */ )
// Create Date :: 2003/12/29
// Description :: .
// param ::
// CMOVERLAPPED &OV, bool force /* = false */, bool listenAgain /* = true */
// Return Value :: void 鸥涝
// Bug Report ::
//-------------------------------------------------------------------------------------------------------
void CMWorkerThread::DoClose( CMOVERLAPPED &OV, bool force /* = false */, bool listenAgain /* = true */ )
{
// char cTmp[100];
//BOOL bFlag = TRUE;
// 佬篮 滚欺 荤捞令甫 府悸矫挪促.
m_iReadSize = 0;
struct linger li = { 0, 0 }; // default: SO_DONTLINGER
if ( force )
li.l_onoff = 1; // SO_LINGER, timeout = 0
setsockopt( OV.s, SOL_SOCKET, SO_LINGER, (char *) &li, sizeof li );
// IOCP扼辑 SOL_SOCKET栏肺 贸府 窍看促..
////弊饭捞胶 妓促款. (快酒窍变 葛啊 -_-);;
//if(shutdown(OV.s, SD_SEND) == SOCKET_ERROR )
//{
// //家南捞 俊矾啊 车促 快酒窍霸 富绊 公侥窍霸 辆丰 矫虐磊 -_-;;
// int err = WSAGetLastError();
// WokrerDebug.output("%s(%d)DoClose()-> shutdown() FAIL (%d)",__FILE__,__LINE__,err);
//}
//else
// while (bFlag == TRUE) {
// iRet = recv(OV.s, cTmp, sizeof(cTmp), 0);
// if (iRet == SOCKET_ERROR) bFlag = FALSE;
// if (iRet == 0) bFlag = FALSE;
// }
closesocket( OV.s );
// 荤侩磊 立加阑 墨款磐 茄促.
::InterlockedDecrement(&CMIocpMudule::GetInstance()->m_UserCount);
// 辑滚埃 立加茄 捞亥飘 家南篮 力寇..
if ( listenAgain && OV.iAcceptServer != eServerClient)
{
// accept 促矫 吧绢霖促.
::CMIocpMudule::GetInstance()->m_pIOAccepter->AcceptContext(OV);
}
}
// 磊扁 家南捞 酒囱 促弗 坷滚乏 家南阑 辆丰 矫挪促.
void CMWorkerThread::SocketClose(SOCKET s)
{
char cTmp[100];
BOOL bFlag = TRUE;
int iRet = 0;
//弊饭捞胶 妓促款. (快酒窍变 葛啊 -_-);;
if(shutdown(s, SD_SEND) == SOCKET_ERROR )
{
//家南捞 俊矾啊 车促 快酒窍霸 富绊 公侥窍霸 辆丰 矫虐磊 -_-;;
int err = WSAGetLastError();
//WokrerDebug.output("%s(%d)DoClose()-> shutdown() FAIL (%d)",__FILE__,__LINE__,err);
}
else
while (bFlag == TRUE) {
iRet = recv(s, cTmp, sizeof(cTmp), 0);
if (iRet == SOCKET_ERROR) bFlag = FALSE;
if (iRet == 0) bFlag = FALSE;
}
closesocket( s );
//// 荤侩磊 立加阑 墨款磐 茄促.
//::InterlockedDecrement(&CMIocpMudule::GetInstance()->m_UserCount);
}
//-------------------------------------------------------------------------------------------------------
// Name :: _OnRead()
// Create Date :: 2003/03/07
// Description :: 庆歹客 单捞鸥 菩哦阑 盒籍窍绊 磊弗促.
// 扼涝 郴何俊辑 倒酒 哎锭 敬促.
// 霸烙辑滚侩...
// param ::
// Return Value :: int 鸥涝
// Bug Report ::
//-------------------------------------------------------------------------------------------------------
int CMWorkerThread::_OnRead(CMOVERLAPPED &OV, CMPackQueue *pQueue)
{
int iRet = TRUE;
unsigned short sID;
unsigned short sDataSize = 0;
unsigned short sReplayID = 0;
int iReadSize = 0;
if(pQueue == NULL)
{
CMSingleton<CMDebug>::getInstance()->output("%s(%d)pQueue == NULL(橇肺弊伐 辆丰矫 秦力 啊瓷己)",__FILE__,__LINE__);
return DEF_WORKREADERR_PROGRAM_EXIT_ERR;
}
//佬篮 单捞鸥甫 促 贸府 且锭鳖瘤 捣促.
for(int iReadTotalSize = OV.dwBytesTransfer ; iReadTotalSize > iReadSize ; )
{
if(OV.eRecvState == eNormalRecv) // 单捞鸥 畴钢肺 罐绰何盒.
{
//罐篮单捞鸥 - 佬篮 单捞鸥 >= 庆歹 农扁搁 曼...
if(iReadTotalSize - iReadSize >= PACKETHEADERSIZE )
{
// 庆歹 菩哦 盒籍.
if(FALSE == pQueue->PackDecode(OV.pRecvContext->RecvBuf + iReadSize)) //庆歹 菩哦捞 鞠龋拳 登绢 乐栏搁 钱绢霖促.
{
// 菩哦 庆歹啊 力措肺 救登菌促.
// 碍力 立加 辆丰.
DoClose(OV,true);
CMSingleton<CMDebug>::getInstance()->output("%s(%d)秦欧 啊瓷己 丑饶.!!!(%d)",__FILE__,__LINE__,OV.s);
return DEF_WORKREADERR_DECODE_ERR;
}
pQueue->GetHeaderInfo(OV.pRecvContext->RecvBuf + iReadSize,&sID,&sDataSize,&sReplayID);
// 菩哦狼 罚待蔼捞 捞傈 菩哦苞 鞍促搁.
if(OV.iAcceptServer == eServer || OV.iAcceptServer == eServerClient)
{
// 辑滚 菩哦捞搁 八荤 救茄促.
}
else
{
if(OV.sReplayAttack == sReplayID)
{
// 碍力 立加 辆丰.
DoClose(OV,true);
CMSingleton<CMDebug>::getInstance()->output("%s(%d)秦欧 啊瓷己 丑饶.!!!(%d) ",__FILE__,__LINE__,OV.s);
return DEF_WORKREADERR_REPLAY_ERR;
}
}
//府敲饭捞 傍拜 抗规
OV.sReplayAttack = sReplayID;
// 茄菩哦捞 单捞鸥 + 庆歹 何盒捞 肯寒窍霸 佬阑荐 乐霸 贸府 茄促.
if(iReadTotalSize >= (int) sDataSize + PACKETHEADERSIZE +iReadSize )
{
// 钮俊 单捞鸥甫 楷搬秦霖促.
if( pQueue->PackPush(OV.s
, OV.pRecvContext->RecvBuf + iReadSize // 佬篮 单捞鸥 第肺 器牢磐 捞悼.
,sDataSize + PACKETHEADERSIZE,OV.dwRandomID,&OV)
== FALSE)
{
// 钮啊 菜茫促. 例措 咯扁 救客具 茄促.. 荤侩磊啊 腹栏聪.. 立加 辆丰 矫虐磊.
// 碍力 立加 辆丰.
DoClose(OV,true);
CMSingleton<CMDebug>::getInstance()->output("%s(%d)Recv Queue PUSH FAIL(CRITICAL WARNNING!!)",__FILE__,__LINE__);
return DEF_WORKREADERR_MEMORYFULL_ERR;
}
// 单捞鸥 荤捞令 父怒 刘啊.
iReadSize += (sDataSize + PACKETHEADERSIZE);
// 函荐 檬扁拳.
OV.iMoreRecv = 0;
OV.eRecvState = eNormalRecv;
}
else
{
// 单捞鸥 农扁甫 呈公 农霸 焊郴芭唱 秦欧 惑炔?
if(sDataSize > PACKETBUFFERSIZE)
{
// 捞荤侩磊绰 碍力 立加 矫难具茄促. 贸府 鞘荐... //?
}
//单捞鸥 菩哦阑 促 给罐篮 版快 歹罐酒具 茄促.
OV.eRecvState = eNoDataRecv;
//皋葛府甫 墨乔 秦初绰促.
memcpy(OV.pRecvContext->OnRecvBuf,OV.pRecvContext->RecvBuf + iReadSize,iReadTotalSize - iReadSize);
// 给佬篮 单捞鸥 菩哦箭磊甫 历厘茄促.
OV.iMoreRecv = ( PACKETHEADERSIZE + sDataSize ) - (iReadTotalSize - iReadSize);
//佬篮 单捞鸥 荐甫 历厘茄促.
//OV.iMoreRecv = iReadTotalSize - iReadSize;
break;
}
}
else
{
//庆歹 菩哦阑 促 给罐篮 版快 歹罐酒具 茄促.
OV.eRecvState = eNoHeaderRecv;
//皋葛府甫 墨乔 秦初绰促.
memcpy(OV.pRecvContext->OnRecvBuf,OV.pRecvContext->RecvBuf + iReadSize,iReadTotalSize - iReadSize);
// 给佬篮 单捞鸥 庆歹 菩哦箭磊甫 历厘茄促.
OV.iMoreRecv = PACKETHEADERSIZE - (iReadTotalSize - iReadSize);
//佬篮 单捞鸥 荐甫 历厘茄促.
//OV.iMoreRecv = iReadTotalSize - iReadSize;
break;
}
}
else if(OV.eRecvState == eNoDataRecv) //单捞鸥甫 眠啊肺 罐绰版快.
{
// 庆歹 菩哦 盒籍.
pQueue->GetHeaderInfo(OV.pRecvContext->OnRecvBuf,&sID,&sDataSize,&sReplayID);
// 茄菩哦捞 单捞鸥 + 庆歹 何盒捞 肯寒窍霸 佬阑荐 乐霸 贸府 茄促.
if(iReadTotalSize >= OV.iMoreRecv )
{
// 皋葛府甫 墨乔茄促.
memcpy(OV.pRecvContext->OnRecvBuf + PACKETHEADERSIZE + sDataSize - OV.iMoreRecv,OV.pRecvContext->RecvBuf + iReadSize,OV.iMoreRecv);
// 钮俊 单捞鸥甫 楷搬秦霖促.
if( pQueue->PackPush(OV.s
, OV.pRecvContext->OnRecvBuf // 捞绢辑 罐篮 滚欺俊辑 单捞鸥甫 佬绢柯促.
,sDataSize + PACKETHEADERSIZE,OV.dwRandomID,&OV) == FALSE)
{
// 钮啊 菜茫促. 例措 咯扁 救客具 茄促.. 荤侩磊啊 腹栏聪.. 立加 辆丰 矫虐磊.
// 碍力 立加 辆丰.
DoClose(OV,true);
CMSingleton<CMDebug>::getInstance()->output("%s(%d)Recv Queue PUSH FAIL(CRITICAL WARNNING!!)",__FILE__,__LINE__);
return DEF_WORKREADERR_MEMORYFULL_ERR;
}
iReadSize += OV.iMoreRecv;
// 函荐 檬扁拳.
OV.iMoreRecv = 0;
OV.eRecvState = eNormalRecv;
}
else
{
//皋葛府甫 墨乔 秦初绰促.
memcpy(OV.pRecvContext->OnRecvBuf + PACKETHEADERSIZE + sDataSize - OV.iMoreRecv,OV.pRecvContext->RecvBuf + iReadSize,iReadTotalSize - iReadSize);
// 给佬篮 单捞鸥 菩哦箭磊甫 历厘茄促. (捞繁版快绰 喊风 绝绢具 窍绰叼..)
OV.iMoreRecv = OV.iMoreRecv - iReadTotalSize;
break;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -