📄 socketengine.cpp
字号:
//建立读写线程
for (DWORD i=0;i<dwThreadCount;i++)
{
CServerSocketRSThread * pServerSocketRSThread=new CServerSocketRSThread();
if (pServerSocketRSThread==NULL) throw TEXT("网络引擎读写线程服务创建失败");
bSuccess=pServerSocketRSThread->InitThread(m_hCompletionPort);
if (bSuccess==false) throw TEXT("网络引擎读写线程服务配置失败");
m_SocketRSThreadArray.Add(pServerSocketRSThread);
}
//建立应答线程
bSuccess=m_SocketAcceptThread.InitThread(m_hCompletionPort,m_hServerSocket,this);
if (bSuccess==false) throw TEXT("网络引擎网络监听线程服务配置");
//运行读写线程
for (i=0;i<dwThreadCount;i++)
{
CServerSocketRSThread * pServerSocketRSThread=m_SocketRSThreadArray[i];
ASSERT(pServerSocketRSThread!=NULL);
bSuccess=pServerSocketRSThread->StartThead();
if (bSuccess==false) throw TEXT("网络引擎读写线程服务启动失败");
}
//网络检测线程
m_SocketDetectThread.InitThread(this);
bSuccess=m_SocketDetectThread.StartThead();
if (bSuccess==false) throw TEXT("网络引检测线程服务启动失败");
//运行应答线程
bSuccess=m_SocketAcceptThread.StartThead();
if (bSuccess==false) throw TEXT("网络引擎监听线程服务启动失败");
//设置变量
m_bService=true;
}
catch (LPCTSTR pszError)
{
CEventTrace::ShowEventNotify(pszError,Level_Exception);
return false;
}
return true;
}
//停止服务
bool __cdecl CTCPSocketEngine::StopService()
{
//设置变量
m_bService=false;
m_dwLastDetect=0L;
//停止检测线程
m_SocketDetectThread.StopThread();
//终止应答线程
if (m_hServerSocket!=INVALID_SOCKET)
{
closesocket(m_hServerSocket);
m_hServerSocket=INVALID_SOCKET;
}
m_SocketAcceptThread.StopThread();
//停止发送队列
m_SendQueueService.StopService();
//释放读写线程
INT_PTR nCount=m_SocketRSThreadArray.GetCount(),i=0;
if (m_hCompletionPort!=NULL)
{
for (i=0;i<nCount;i++) PostQueuedCompletionStatus(m_hCompletionPort,0,NULL,NULL);
}
for (i=0;i<nCount;i++)
{
CServerSocketRSThread * pSocketThread=m_SocketRSThreadArray[i];
ASSERT(pSocketThread!=NULL);
pSocketThread->StopThread();
SafeDelete(pSocketThread);
}
m_SocketRSThreadArray.RemoveAll();
//关闭完成端口
if (m_hCompletionPort!=NULL)
{
CloseHandle(m_hCompletionPort);
m_hCompletionPort=NULL;
}
//关闭连接
CServerSocketItem * pServerSocketItem=NULL;
for (i=0;i<m_ActiveSocketItem.GetCount();i++)
{
pServerSocketItem=m_ActiveSocketItem[i];
ASSERT(pServerSocketItem!=NULL);
pServerSocketItem->CloseSocket(pServerSocketItem->GetRountID());
pServerSocketItem->ResetSocketData();
}
m_FreeSocketItem.Append(m_ActiveSocketItem);
m_ActiveSocketItem.RemoveAll();
return true;
}
//应答消息
bool CTCPSocketEngine::OnSocketAcceptEvent(CServerSocketItem * pServerSocketItem)
{
//效验数据
ASSERT(pServerSocketItem!=NULL);
ASSERT(m_AttemperEvent.IsValid()==true);
//投递消息
if (m_bService==false) return false;
m_AttemperEvent.PostSocketAcceptEvent(pServerSocketItem->GetIndex(),pServerSocketItem->GetRountID(),pServerSocketItem->GetClientAddr());
return true;
}
//网络读取消息
bool CTCPSocketEngine::OnSocketReadEvent(CMD_Command Command, void * pBuffer, WORD wDataSize, CServerSocketItem * pServerSocketItem)
{
//效验数据
ASSERT(pServerSocketItem!=NULL);
ASSERT(m_AttemperEvent.IsValid()==true);
//效验状态
if (m_bService==false) return false;
m_AttemperEvent.PostSocketReadEvent(pServerSocketItem->GetIndex(),pServerSocketItem->GetRountID(),Command,pBuffer,wDataSize);
return true;
}
//网络关闭消息
bool CTCPSocketEngine::OnSocketCloseEvent(CServerSocketItem * pServerSocketItem)
{
//效验参数
ASSERT(pServerSocketItem!=NULL);
ASSERT(m_AttemperEvent.IsValid()==true);
try
{
//效验状态
if (m_bService==false) return false;
//计算时间
WORD wIndex=pServerSocketItem->GetIndex();
WORD wRountID=pServerSocketItem->GetRountID();
DWORD dwClientAddr=pServerSocketItem->GetClientAddr();
DWORD dwConnectTime=pServerSocketItem->GetConnectTime();
m_AttemperEvent.PostSocketCloseEvent(wIndex,wRountID,dwClientAddr,dwConnectTime);
//释放连接
FreeSocketItem(pServerSocketItem);
}
catch (...) {}
return true;
}
//通知回调函数(发送队列线程调用)
void __cdecl CTCPSocketEngine::OnQueueServiceSink(WORD wIdentifier, void * pBuffer, WORD wDataSize, DWORD dwInsertTime)
{
switch (wIdentifier)
{
case QUEUE_SEND_REQUEST: //发送请求
{
//效验数据
tagSendDataRequest * pSendDataRequest=(tagSendDataRequest *)pBuffer;
ASSERT(wDataSize>=(sizeof(tagSendDataRequest)-sizeof(pSendDataRequest->cbSendBuf)));
ASSERT(wDataSize==(pSendDataRequest->wDataSize+sizeof(tagSendDataRequest)-sizeof(pSendDataRequest->cbSendBuf)));
//发送数据
if (pSendDataRequest->wIndex==INDEX_ALL_SOCKET)
{
//获取活动项
CThreadLockHandle ItemLockedHandle(&m_ItemLocked);
m_TempSocketItem.Copy(m_ActiveSocketItem);
ItemLockedHandle.UnLock();
//循环发送数据
CServerSocketItem * pServerSocketItem=NULL;
for (INT_PTR i=0;i<m_TempSocketItem.GetCount();i++)
{
pServerSocketItem=m_TempSocketItem[i];
ASSERT(pServerSocketItem!=NULL);
CThreadLockHandle SocketLockHandle(pServerSocketItem->GetSignedLock());
if (pServerSocketItem->IsAllowBatch())
{
pServerSocketItem->SendData(pSendDataRequest->cbSendBuf,pSendDataRequest->wDataSize,pSendDataRequest->wMainCmdID,
pSendDataRequest->wSubCmdID,pServerSocketItem->GetRountID());
}
}
}
else
{
//单项发送
CServerSocketItem * pServerSocketItem=EnumSocketItem(pSendDataRequest->wIndex);
CThreadLockHandle SocketLockHandle(pServerSocketItem->GetSignedLock());
pServerSocketItem->SendData(pSendDataRequest->cbSendBuf,pSendDataRequest->wDataSize,pSendDataRequest->wMainCmdID,
pSendDataRequest->wSubCmdID,pSendDataRequest->wRountID);
}
break;
}
case QUEUE_SAFE_CLOSE: //安全关闭
{
//效验数据
ASSERT(wDataSize==sizeof(tagSafeCloseSocket));
tagSafeCloseSocket * pSafeCloseSocket=(tagSafeCloseSocket *)pBuffer;
//安全关闭
CServerSocketItem * pServerSocketItem=EnumSocketItem(pSafeCloseSocket->wIndex);
CThreadLockHandle SocketLockHandle(pServerSocketItem->GetSignedLock());
pServerSocketItem->ShutDownSocket(pSafeCloseSocket->wRountID);
break;
}
case QUEUE_ALLOW_BATCH: //允许群发
{
//效验数据
ASSERT(wDataSize==sizeof(tagAllowBatchSend));
tagAllowBatchSend * pAllowBatchSend=(tagAllowBatchSend *)pBuffer;
//设置群发
CServerSocketItem * pServerSocketItem=EnumSocketItem(pAllowBatchSend->wIndex);
CThreadLockHandle SocketLockHandle(pServerSocketItem->GetSignedLock());
pServerSocketItem->AllowBatchSend(pAllowBatchSend->wRountID,pAllowBatchSend->cbAllow?true:false);
break;
}
case QUEUE_DETECT_SOCKET: //检测连接
{
//获取活动项
CThreadLockHandle ItemLockedHandle(&m_ItemLocked);
m_TempSocketItem.Copy(m_ActiveSocketItem);
ItemLockedHandle.UnLock();
//构造数据
CMD_KN_DetectSocket DetectSocket;
memset(&DetectSocket,0,sizeof(DetectSocket));
DWORD dwNowTickCount=GetTickCount(),dwTimeSpace=0L;
DWORD dwBreakTickCount=__max(dwNowTickCount-m_dwLastDetect,8000L);
//设置变量
m_dwLastDetect=GetTickCount();
//检测连接
WORD wRountID=0;
CServerSocketItem * pServerSocketItem=NULL;
for (INT_PTR i=0;i<m_TempSocketItem.GetCount();i++)
{
//获取连接
pServerSocketItem=m_TempSocketItem[i];
CThreadLockHandle SocketLockHandle(pServerSocketItem->GetSignedLock());
//检测连接
if (pServerSocketItem->IsReadySend()==true)
{
dwTimeSpace=dwNowTickCount-pServerSocketItem->GetRecvTickCount();
if (dwTimeSpace>dwBreakTickCount)
{
pServerSocketItem->CloseSocket(pServerSocketItem->GetRountID());
continue;
}
}
else
{
dwTimeSpace=dwNowTickCount-pServerSocketItem->GetRecvTickCount();
if (dwTimeSpace>4000L)
{
pServerSocketItem->CloseSocket(pServerSocketItem->GetRountID());
continue;
}
}
//发送数据
if (pServerSocketItem->IsReadySend()==true)
{
wRountID=pServerSocketItem->GetRountID();
DetectSocket.dwSendTickCount=GetTickCount();
pServerSocketItem->SendData(&DetectSocket,sizeof(DetectSocket),MDM_KN_COMMAND,SUB_KN_DETECT_SOCKET,wRountID);
}
}
break;
}
default: { ASSERT(FALSE); }
}
return;
}
//获取空闲对象
CServerSocketItem * CTCPSocketEngine::ActiveSocketItem()
{
CThreadLockHandle ItemLockedHandle(&m_ItemLocked,true);
//获取空闲对象
CServerSocketItem * pServerSocketItem=NULL;
if (m_FreeSocketItem.GetCount()>0)
{
INT_PTR nItemPostion=m_FreeSocketItem.GetCount()-1;
pServerSocketItem=m_FreeSocketItem[nItemPostion];
ASSERT(pServerSocketItem!=NULL);
m_FreeSocketItem.RemoveAt(nItemPostion);
m_ActiveSocketItem.Add(pServerSocketItem);
}
//创建新对象
if (pServerSocketItem==NULL)
{
WORD wStorageCount=(WORD)m_StorageSocketItem.GetCount();
if (wStorageCount<m_wMaxSocketItem)
{
try
{
pServerSocketItem=new CServerSocketItem(wStorageCount,this);
if (pServerSocketItem==NULL) return NULL;
m_StorageSocketItem.Add(pServerSocketItem);
m_ActiveSocketItem.Add(pServerSocketItem);
}
catch (...) { return NULL; }
}
}
return pServerSocketItem;
}
//获取连接对象
CServerSocketItem * CTCPSocketEngine::EnumSocketItem(WORD wIndex)
{
CThreadLockHandle ItemLockedHandle(&m_ItemLocked,true);
if (wIndex<m_StorageSocketItem.GetCount())
{
CServerSocketItem * pServerSocketItem=m_StorageSocketItem[wIndex];
ASSERT(pServerSocketItem!=NULL);
return pServerSocketItem;
}
return NULL;
}
//释放连接对象
bool CTCPSocketEngine::FreeSocketItem(CServerSocketItem * pServerSocketItem)
{
//效验参数
ASSERT(pServerSocketItem!=NULL);
//释放对象
CThreadLockHandle ItemLockedHandle(&m_ItemLocked,true);
INT_PTR nActiveCount=m_ActiveSocketItem.GetCount();
for (int i=0;i<nActiveCount;i++)
{
if (pServerSocketItem==m_ActiveSocketItem[i])
{
m_ActiveSocketItem.RemoveAt(i);
m_FreeSocketItem.Add(pServerSocketItem);
return true;
}
}
//释放失败
ASSERT(FALSE);
return false;
}
//检测连接
bool __cdecl CTCPSocketEngine::DetectSocket()
{
return m_SendQueueService.AddToQueue(QUEUE_DETECT_SOCKET,NULL,0);
}
//发送函数
bool __cdecl CTCPSocketEngine::SendData(WORD wIndex, WORD wRountID, WORD wMainCmdID, WORD wSubCmdID)
{
//效益状态
ASSERT(m_bService==true);
if (m_bService==false) return false;
//构造数据
tagSendDataRequest SendRequest;
SendRequest.wMainCmdID=wMainCmdID;
SendRequest.wSubCmdID=wSubCmdID;
SendRequest.wIndex=wIndex;
SendRequest.wRountID=wRountID;
SendRequest.wDataSize=0;
//加入发送队列
WORD wSendSize=sizeof(SendRequest)-sizeof(SendRequest.cbSendBuf);
return m_SendQueueService.AddToQueue(QUEUE_SEND_REQUEST,&SendRequest,wSendSize);
}
//发送函数
bool __cdecl CTCPSocketEngine::SendData(WORD wIndex, WORD wRountID, WORD wMainCmdID, WORD wSubCmdID, void * pData, WORD wDataSize)
{
//效益状态
ASSERT(m_bService==true);
if (m_bService==false) return false;
//效益数据
ASSERT((wDataSize+sizeof(CMD_Head))<=SOCKET_PACKAGE);
if ((wDataSize+sizeof(CMD_Head))>SOCKET_PACKAGE) return false;
//构造数据
tagSendDataRequest SendRequest;
SendRequest.wMainCmdID=wMainCmdID;
SendRequest.wSubCmdID=wSubCmdID;
SendRequest.wIndex=wIndex;
SendRequest.wRountID=wRountID;
SendRequest.wDataSize=wDataSize;
if (wDataSize>0)
{
ASSERT(pData!=NULL);
CopyMemory(SendRequest.cbSendBuf,pData,wDataSize);
}
//加入发送队列
WORD wSendSize=sizeof(SendRequest)-sizeof(SendRequest.cbSendBuf)+wDataSize;
return m_SendQueueService.AddToQueue(QUEUE_SEND_REQUEST,&SendRequest,wSendSize);
}
//批量发送
bool __cdecl CTCPSocketEngine::SendDataBatch(WORD wMainCmdID, WORD wSubCmdID, void * pData, WORD wDataSize)
{
//效益状态
//ASSERT(m_bService==true);
if (m_bService==false) return false;
//效益数据
ASSERT((wDataSize+sizeof(CMD_Head))<=SOCKET_PACKAGE);
if ((wDataSize+sizeof(CMD_Head))>SOCKET_PACKAGE) return false;
//构造数据
tagSendDataRequest SendRequest;
SendRequest.wMainCmdID=wMainCmdID;
SendRequest.wSubCmdID=wSubCmdID;
SendRequest.wIndex=INDEX_ALL_SOCKET;
SendRequest.wRountID=0;
SendRequest.wDataSize=wDataSize;
if (wDataSize>0)
{
ASSERT(pData!=NULL);
CopyMemory(SendRequest.cbSendBuf,pData,wDataSize);
}
//加入发送队列
WORD wSendSize=sizeof(SendRequest)-sizeof(SendRequest.cbSendBuf)+wDataSize;
return m_SendQueueService.AddToQueue(QUEUE_SEND_REQUEST,&SendRequest,wSendSize);
}
//关闭连接
bool __cdecl CTCPSocketEngine::CloseSocket(WORD wIndex, WORD wRountID)
{
CServerSocketItem * pServerSocketItem=EnumSocketItem(wIndex);
if (pServerSocketItem==NULL) return false;
CThreadLockHandle SocketLockHandle(pServerSocketItem->GetSignedLock());
return pServerSocketItem->CloseSocket(wRountID);
}
//设置关闭
bool __cdecl CTCPSocketEngine::ShutDownSocket(WORD wIndex, WORD wRountID)
{
tagSafeCloseSocket SafeCloseSocket;
SafeCloseSocket.wIndex=wIndex;
SafeCloseSocket.wRountID=wRountID;
return m_SendQueueService.AddToQueue(QUEUE_SAFE_CLOSE,&SafeCloseSocket,sizeof(SafeCloseSocket));
}
//允许群发
bool __cdecl CTCPSocketEngine::AllowBatchSend(WORD wIndex, WORD wRountID, bool bAllow)
{
tagAllowBatchSend AllowBatchSend;
AllowBatchSend.wIndex=wIndex;
AllowBatchSend.wRountID=wRountID;
AllowBatchSend.cbAllow=bAllow;
return m_SendQueueService.AddToQueue(QUEUE_ALLOW_BATCH,&AllowBatchSend,sizeof(AllowBatchSend));
}
//////////////////////////////////////////////////////////////////////////
//建立对象函数
extern "C" __declspec(dllexport) void * __cdecl CreateTCPSocketEngine(const GUID & Guid, DWORD dwInterfaceVer)
{
//建立对象
CTCPSocketEngine * pTCPSocketEngine=NULL;
try
{
pTCPSocketEngine=new CTCPSocketEngine();
if (pTCPSocketEngine==NULL) throw TEXT("创建失败");
void * pObject=pTCPSocketEngine->QueryInterface(Guid,dwInterfaceVer);
if (pObject==NULL) throw TEXT("接口查询失败");
return pObject;
}
catch (...) {}
//清理对象
SafeDelete(pTCPSocketEngine);
return NULL;
}
//////////////////////////////////////////////////////////////////////////
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -