📄 unit_workerthread.cpp
字号:
if(Should_Break==true)
{
lpPerIOData=NULL;
lpConnCtx=NULL;
break;
}
/*发起新的接收请求,以等待接收用户数据*/
//ZeroMemory(lpPerIOData,sizeof(OVERLAPPEDEX));
lpPerIOData->OverLapped.hEvent=NULL;
lpPerIOData->OverLapped.Internal=0;
lpPerIOData->OverLapped.InternalHigh=0;
lpPerIOData->OverLapped.Offset=0;
lpPerIOData->OverLapped.OffsetHigh=0;
lpPerIOData->wbuf.buf=(char*)&(lpPerIOData->data);
lpPerIOData->wbuf.len=MAX_BUF_LEN;
lpPerIOData->oper=SVR_IO_READ;
lpPerIOData->flags=0;
#ifdef __DEBUG
PreThreadId=InterlockedExchange(&((long)lpPerIOData->WorkingThreadID),InternalThreadID);
#endif
//更新最后作业时间,以方便检测超时无数据传输的连接或非法连接
InterlockedExchange(&(lpConnCtx->LastDataTime),time(0));
nResult=WSARecv(lpConnCtx->sockAccept,
&(lpPerIOData->wbuf),
1,
NULL,
&(lpPerIOData->flags),
&(lpPerIOData->OverLapped),
NULL);
if(nResult==SOCKET_ERROR)
{
Socket_Error=WSAGetLastError();
if( Socket_Error!=ERROR_IO_PENDING)
{
#ifdef __ERRORLOG
{
char *Msg;
Msg=(char *) AllocEx(256);
//memset(Msg,0,256);
strcpy(Msg,"ThrdID:");
ltoa(InternalThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SocketError:");
ltoa(Socket_Error,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ThreadError:");
ltoa(GetLastError(),(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I/O Size:");
ltoa(dwIOSize,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SOCKET:");
itoa(lpConnCtx->sockAccept,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|Oper:Recv");
strcat(Msg,"|LockCount:");
ltoa(lpPerIOData->LockCount,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PreThrdID:");
ltoa(PreThreadId,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PerIOData:");
ltoa((DWORD)lpPerIOData,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ConnCtx:");
ltoa((DWORD)lpConnCtx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I_OverLappedEx:");
ltoa((DWORD)lpConnCtx->O_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|InDumping:Successed");
//GetThreadId( tcpIOCP->conn_CriticalSection.OwningThread)
SendMessage(HWND_BROADCAST,RegisterWindowMessage("DumpMessage"),(WPARAM)Msg,(LPARAM)strlen(Msg));
FreeEx(Msg);
Msg=NULL;
}
#endif
//网络错误清理连接
InterlockedDecrement(&(lpPerIOData->LockCount));
lpPerIOData=NULL;
tcpIOCP->DumpConnection(lpConnCtx);
//SetLastError(0);
//WSASetLastError(0);
}
}
break;
case SVR_IO_WRITE://recv then echo
/*tcpIOCP->Lock();
try
{*/
EnterCriticalSection(&(lpConnCtx->ctx_CriticalSection));
try
{
/*更新发送缓冲区的已发送计数
特别用来删除部分已经完成发送的缓冲区*/
LPIODATANode OutDataNode=lpConnCtx->O_DATA;
DWORD SentInByte=dwIOSize;
if(lpConnCtx->TotalSent>(MAX_INT64-SentInByte))
lpConnCtx->TotalSent=SentInByte;
else
lpConnCtx->TotalSent=lpConnCtx->TotalSent+SentInByte;
InterlockedExchangeAdd(&(tcpIOCP->BytesSentPerInterval),dwIOSize);
lpConnCtx->TotalSent=lpConnCtx->TotalSent+dwIOSize;
if(true==lpPerIOData->DataInExtend)
{
lpPerIOData->SentBytes=lpPerIOData->SentBytes+SentInByte;
SentInByte=0;
if(lpPerIOData->SentBytes>=lpPerIOData->Length)
{
lpPerIOData->DataInExtend=false;
lpPerIOData->Length=0;
lpPerIOData->SentBytes=0;
}
}
if(false==lpPerIOData->DataInExtend)
{
while(NULL!=OutDataNode)
{
OutData=OutDataNode->Data;
if(NULL!=OutData)
{
OutData->SentBytes=OutData->SentBytes+SentInByte;
SentInByte=0;
if(OutData->SentBytes>=OutData->Length)
{
OutDataNode->Data=OutData->Next;
FreeEx(OutData);
OutData=NULL;
OutData=OutDataNode->Data;
}
if(NULL!=OutData)
break;
}
if(NULL==OutData)
{
lpConnCtx->O_DATA=OutDataNode->Next;
FreeEx(OutDataNode);
OutDataNode=NULL;
OutDataNode=lpConnCtx->O_DATA;
if(NULL==OutDataNode)
break;
}
}/*End of while*/
}/*End of Not ExtendData*/
if(true==lpConnCtx->InDumping)
{
#ifdef __ERRORLOG
{
char *Msg;
Msg=(char *) AllocEx(256);
//memset(Msg,0,256);
strcpy(Msg,"ThrdID:");
ltoa(InternalThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SocketError:");
ltoa(WSAGetLastError(),(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ThreadError:");
ltoa(GetLastError(),(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I/O Size:");
ltoa(dwIOSize,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SOCKET:");
itoa(lpConnCtx->sockAccept,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|Oper:Recv");
strcat(Msg,"|LockCount:");
ltoa(lpPerIOData->LockCount,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PreThrdID:");
ltoa(lpPerIOData->WorkingThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PerIOData:");
ltoa((DWORD)lpPerIOData,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ConnCtx:");
ltoa((DWORD)lpConnCtx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I_OverLappedEx:");
ltoa((DWORD)lpConnCtx->O_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|InDumping:Successed");
//GetThreadId( tcpIOCP->conn_CriticalSection.OwningThread)
SendMessage(HWND_BROADCAST,RegisterWindowMessage("DumpMessage"),(WPARAM)Msg,(LPARAM)strlen(Msg));
FreeEx(Msg);
Msg=NULL;
}
#endif
//该连接已经置回收状态,不再提交作业
InterlockedDecrement(&(lpPerIOData->LockCount));
Should_Break=true;
}
else
if(false==lpPerIOData->DataInExtend&&NULL==OutDataNode)
{
//无数据待发送则不作为
InterlockedDecrement(&(lpPerIOData->LockCount));
Should_Break=true;
}
//增加未决请求计数
//InterlockedIncrement(&(lpPerIOData->LockCount));
}
__finally
{
LeaveCriticalSection(&(lpConnCtx->ctx_CriticalSection));
}
/*}
__finally
{
tcpIOCP->Unlock();
}*/
//No sending data
if(Should_Break==true)
{
OutData=NULL;
lpPerIOData=NULL;
lpConnCtx=NULL;
break;
}
lpPerIOData->oper=SVR_IO_WRITE;
lpPerIOData->flags=0;
lpPerIOData->OverLapped.hEvent=NULL;
lpPerIOData->OverLapped.Internal=0;
lpPerIOData->OverLapped.InternalHigh=0;
lpPerIOData->OverLapped.Offset=0;
lpPerIOData->OverLapped.OffsetHigh=0;
#ifdef __DEBUG
PreThreadId=InterlockedExchange(&((long)lpPerIOData->WorkingThreadID),InternalThreadID);
#endif
u_long Length;
//Begin=========================O_OverLappedEx=====================//
InterlockedExchange(&(lpConnCtx->LastDataTime),time(0));
if(true==lpPerIOData->DataInExtend)
{
//发送重叠I/O信息扩展结构体内部数据
Length=lpPerIOData->Length-lpPerIOData->SentBytes;
lpPerIOData->wbuf.len=Length;
lpPerIOData->wbuf.buf=(char *)(lpPerIOData->data+lpPerIOData->SentBytes);
nResult=WSASend(lpConnCtx->sockAccept,
&(lpPerIOData->wbuf),
1,
NULL,
lpPerIOData->flags,
&(lpPerIOData->OverLapped),
NULL);
}
//End===========================O_OverLappedEx=====================//
//Begin=============================O_DATA=========================//
else
{
//发送重叠I/O信息扩展结构体外的待发送数据
Length=OutData->Length-OutData->SentBytes;
lpPerIOData->wbuf.len=Length;
lpPerIOData->wbuf.buf=(char *)(OutData->Buffer+OutData->SentBytes);
nResult=WSASend(lpConnCtx->sockAccept,
&(lpPerIOData->wbuf),
1,
NULL,
lpPerIOData->flags,
&(lpPerIOData->OverLapped),
NULL);
}
OutData=NULL;
//Begin=============================O_DATA=========================//
if(nResult==SOCKET_ERROR)
{
Socket_Error=WSAGetLastError();
if(Socket_Error!=ERROR_IO_PENDING)
{
#ifdef __ERRORLOG
{
char *Msg;
Msg=(char *) AllocEx(256);
//memset(Msg,0,256);
strcpy(Msg,"ThrdID:");
ltoa(InternalThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SocketError:");
ltoa(Socket_Error,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ThreadError:");
ltoa(GetLastError(),(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I/O Size:");
ltoa(dwIOSize,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SOCKET:");
itoa(lpConnCtx->sockAccept,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|Oper:Send");
strcat(Msg,"|LockCount:");
ltoa(lpPerIOData->LockCount,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PreThrdID:");
ltoa(PreThreadId,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PerIOData:");
ltoa((DWORD)lpPerIOData,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ConnCtx:");
ltoa((DWORD)lpConnCtx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|O_OverLappedEx:");
ltoa((DWORD)lpConnCtx->O_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|Queued:Successed");
SendMessage(HWND_BROADCAST,RegisterWindowMessage("DumpMessage"),(WPARAM)Msg,(LPARAM)strlen(Msg));
FreeEx(Msg);
Msg=NULL;
}
#endif
//网络错误,清理连接
InterlockedDecrement(&(lpPerIOData->LockCount));
lpPerIOData=NULL;
tcpIOCP->DumpConnection(lpConnCtx);
//SetLastError(0);
//WSASetLastError(0);
}
}
break;
}/*End of switch*/
}
//调用_endthreadex结束线程
_endthreadex(Result);
//返回一个Exit Code,实际上这只是一个形式
//Exit Code已由_endthreadex给出,同时早就已经结束线程了
return Result;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -