📄 unit_workerthread.cpp
字号:
//---------------------------------------------------------------------------
#pragma hdrstop
#include "Unit_WorkerThread.h"
//---------------------------------------------------------------------------
#pragma package(smart_init)
//worker thread executant,for all I/O C.P. workers
//完成端口例程线程执行体
//参数
//lpParam:用户数据,完成端口类指针
unsigned WINAPI WorkerThread(LPVOID lpParam)
{
unsigned Result; //线程返回值
TtcpIOCP *tcpIOCP;//完成端口管理器类指针
HANDLE hIOCP; //完成端口句柄
DWORD InternalThreadID;//当前线程ID
LPIODATA OutData; //数据元素链式结构体指针
DWORD dwIOSize; //完成例程提交数据长度值
LPOVERLAPPEDEX lpPerIOData;//重叠I/O信息扩展结构体指针
LPOVERLAPPED pOverLapped;//重叠I/O信息原始结构体指针
LPCONN_CTX lpConnCtx;//用户连接上下文结构体指针
DWORD LastError; //最后一次线程内OS级错误代码
DWORD Socket_Error;//最后一次线程内网络级(Socket)错误代码
bool bSuccess; //完成例成队列是否弹出
int nResult; //网络作业提交返回值
bool Should_Break;
#ifdef __DEBUG
DWORD PreThreadId;
#endif
Result=0;//预置线程返回值(Exit Code)
tcpIOCP=(TtcpIOCP *)lpParam;//将参数还原成原始类型
hIOCP=tcpIOCP->Handle;//取得完成端口例程句柄
InternalThreadID=GetCurrentThreadId();//取得当前线程的ID号
while(1)
{
////////////////////////////////////////////////////////////
//判断当前同步运行之线程数量是否超出用户预期运行之线程数量//
//如果超出该期望值,则判断是否有2个以上线程处于用户数据守 //
//候状态,如果有足够线程等待新的完成例程事件,则结束线程 //
// //
//值得注意的是: //
// 1.完成例程通知线程是使用LIFO的顺序, //
// 也就是最近(后)进入待的线程将得到最大的优先运行权 //
// 主要是考虑到当一个线程长时间不工作时对于操作系统 //
// 则须将分配给该线程的资源转换到磁盘缓冲当中出 //
// 从而提高物理内存的利用率,以预期达到最优的运行效果//
// 2.对于网络(包括所有关联于该完成例程的I/O事件都采用 //
// FIFO的顺序,用以保证数据的顺序性和有效性 //
////////////////////////////////////////////////////////////
if((tcpIOCP->ThreadCount>tcpIOCP->FExpectThreadCount)&&
(tcpIOCP->FActiveThreadCount>2))
{
//将线程置入线程顺收站
//以便让时脉处理例程完成对相关资源的回收,以及关闭相关句柄
tcpIOCP->DumpThread(InternalThreadID);
//结束线程循环,终结线程
break;
}
//Active Queued Thread Signal
//增加活动{实际为等待完成例程队列)计数
InterlockedIncrement(&(tcpIOCP->FActiveThreadCount));
//加入完成端口例程处理线程队列,以等待完成例程事件/讯号
dwIOSize=0;
lpConnCtx=NULL;
pOverLapped=NULL;
lpPerIOData=NULL;
OutData=NULL;
bSuccess=GetQueuedCompletionStatus(hIOCP,
&dwIOSize,
(LPDWORD)&lpConnCtx,
&pOverLapped,
INFINITE);
//Inactive Queued Thread Signal
//得到一个完成例程讯号,线程从等待队列中出列
//减少活动{实际为等待完成例程队列)计数
InterlockedDecrement(&(tcpIOCP->FActiveThreadCount));
//No more thread waiting for completion
//and in working State
//and Thread count is lower than the Max count
//Increment One Thread
//为保证至少有一个线程处于等待完成例程的处理线程的队列当中
//将在最大线程的限制范围内,增加实际工作线程
//该值可能会超出用户预期同步运行线程数
//if((1>(tcpIOCP->FActiveThreadCount))
/* if((tcpIOCP->ExpectThreadCount>(tcpIOCP->ThreadCount))
&&(tcpIOCP->ThreadCount<(long)MAX_THREAD_NUMBER)
&&(tcpIOCP->FRunningState>LS_SHUTDN))
{
tcpIOCP->AddThread();
}*/
//Terminate thread by PostQueuedCompletionStatus
//线程结束通知,由PostQueuedCompletionStatus发出
//并使得参数lpCompletionKey为NULL
if(lpConnCtx==NULL)
{
break;
}
//取得重叠I/O信息扩展结构体(从原型指针当中还原)
lpPerIOData=(LPOVERLAPPEDEX)(pOverLapped);
pOverLapped=NULL;
//当一个得到一个完成通知
//并且传输数据量为0表示客户端正常断开
if(bSuccess&&(dwIOSize==0))
{
//减少未决操作引用计数
//该计数主要是为了免除时脉例程处理回收站数据时
//在所有事件完成/取消前将相关的资源释放掉
//而导致MSWINSOCK.DLL内部内存地址访问非法错误
InterlockedDecrement(&(lpPerIOData->LockCount));
//将该连接上下文信息节点移入回收站
//以供资源回收例程进一步处理
lpPerIOData=NULL;
tcpIOCP->DumpConnection(lpConnCtx);
//继续处理下一个完成端口通知事件
continue;
}
//完成端口未通知相关事件
//则可能是由部分线程自我数量调控
//而自行终结导致在该线程提交的I/O请求被取消
//或者网络异常
if(!bSuccess)
{
/*Here should Print the InternalThreadID
and Previous InternalThread in OVERLAPPEDEX*/
//取得最近错误代码
LastError=GetLastError();
Socket_Error=WSAGetLastError();
if(NULL==lpPerIOData)
{
//SetLastError(0);
continue;
}
/*if( LastError!=ERROR_IO_PENDING
//重叠 I/O 操作在进行中。
//Overlapped I/O operation is in progress.
&&LastError!=ERROR_OPERATION_ABORTED
//由于线程退出或应用程序请求,已放弃 I/O 操作。
//The I/O operation has been aborted because of either a thread exit or an application request.
&&LastError!=ERROR_NETNAME_DELETED
//指定的网络名不存在
//The specified network name is no longer available.
&&LastError!=ERROR_INVALID_PARAMETER
//参数不正确
//The parameter is incorrect.
)*/
switch(LastError)
{
case ERROR_OPERATION_ABORTED:
#ifdef __DEBUG
/*if(lpPerIOData->WorkingThreadID!=InternalThreadID)
{
WSASetLastError(0);
SetLastError(0);
break;
}*/
#endif
case ERROR_NETNAME_DELETED://指定的网络名不再可用。
case ERROR_INVALID_PARAMETER://参数不正确。
//完成通知队列未弹出不能减少未决操作引用计数
//该计数主要是为了免除时脉例程处理回收站数据时
//在所有事件完成/取消前将相关的资源释放掉
//而导致MSWINSOCK.DLL内部内存地址访问非法错误
//将该连接上下文信息节点移入回收站
//以供资源回收例程进一步处理
default:
#ifdef __ERRORLOG
{
char *Msg;
Msg=(char *) AllocEx(256);
//memset(Msg,0,256);
strcpy(Msg,"ThrdID:");
ltoa(InternalThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SocketError:");
ltoa(Socket_Error,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ThreadError:");
ltoa(LastError,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I/O Size:");
ltoa(dwIOSize,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SOCKET:");
itoa(lpConnCtx->sockAccept,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|Oper:");
itoa(lpPerIOData->oper,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|LockCount:");
ltoa(lpPerIOData->LockCount,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PreThrdID:");
ltoa(lpPerIOData->WorkingThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PerIOData:");
ltoa((DWORD)lpPerIOData,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ConnCtx:");
ltoa((DWORD)lpConnCtx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I_OverLappedEx:");
ltoa((DWORD)lpConnCtx->I_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|O_OverLappedEx:");
ltoa((DWORD)lpConnCtx->O_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|Queued:Failed");
//GetThreadId( tcpIOCP->conn_CriticalSection.OwningThread)
SendMessage(HWND_BROADCAST,RegisterWindowMessage("DumpMessage"),(WPARAM)Msg,(LPARAM)strlen(Msg));
FreeEx(Msg);
Msg=NULL;
}
#endif
InterlockedDecrement(&(lpPerIOData->LockCount));
lpPerIOData=NULL;
tcpIOCP->DumpConnection(lpConnCtx);
//继续处理下一个完成端口通知事件
//SetLastError(0);
//WSASetLastError(0);
continue;
}
}
//减少未决操作引用计数
//该计数主要是为了免除时脉例程处理回收站数据时
//在所有事件完成/取消前将相关的资源释放掉
//而导致MSWINSOCK.DLL内部内存地址访问非法错误
//InterlockedDecrement(&(lpPerIOData->LockCount));
Should_Break=false;
switch(lpPerIOData->oper)
{
case SVR_IO_READ://send then recv
try
{
if(dwIOSize)//有收到数据
//交给数据处理过程进行处理
tcpIOCP->InternalOnReceiveData(lpConnCtx,dwIOSize,lpPerIOData->data);
}
catch(...)
{
}
//更新流量计数,包括当前用户连接的流量统计值
//以及全局流量统计值
/*tcpIOCP->Lock();
try
{*/
//更新单连接的总接收流量,超int64时"归零"
if(lpConnCtx->TotalRecv>(MAX_INT64-dwIOSize))
lpConnCtx->TotalRecv=dwIOSize;
else
lpConnCtx->TotalRecv=lpConnCtx->TotalRecv+dwIOSize;
//更新全局流量计数
InterlockedExchangeAdd(&(tcpIOCP->BytesRecvPerInterval),dwIOSize);
//判断该连接是否已经标示为回收
//进入回收状态的连接将不再提交作业请求
if(true==lpConnCtx->InDumping)
{
#ifdef __ERRORLOG
{
char *Msg;
Msg=(char *) AllocEx(256);
//memset(Msg,0,256);
strcpy(Msg,"ThrdID:");
ltoa(InternalThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SocketError:");
ltoa(WSAGetLastError(),(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ThreadError:");
ltoa(GetLastError(),(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I/O Size:");
ltoa(dwIOSize,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|SOCKET:");
itoa(lpConnCtx->sockAccept,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|Oper:Recv");
strcat(Msg,"|LockCount:");
ltoa(lpPerIOData->LockCount,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PreThrdID:");
ltoa(lpPerIOData->WorkingThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|PerIOData:");
ltoa((DWORD)lpPerIOData,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|ConnCtx:");
ltoa((DWORD)lpConnCtx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|I_OverLappedEx:");
ltoa((DWORD)lpConnCtx->O_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
strcat(Msg,"|Queued:Successed");
//GetThreadId( tcpIOCP->conn_CriticalSection.OwningThread)
SendMessage(HWND_BROADCAST,RegisterWindowMessage("DumpMessage"),(WPARAM)Msg,(LPARAM)strlen(Msg));
FreeEx(Msg);
Msg=NULL;
}
#endif
InterlockedDecrement(&(lpPerIOData->LockCount));
Should_Break=true;
}
//增加未决传输引用计数
//InterlockedIncrement(&(lpPerIOData->LockCount));
/*}
__finally
{
tcpIOCP->Unlock();
} */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -