⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 unit_workerthread.cpp

📁 一个完成端口的框架程序
💻 CPP
📖 第 1 页 / 共 2 页
字号:
//---------------------------------------------------------------------------


#pragma hdrstop

#include "Unit_WorkerThread.h"

//---------------------------------------------------------------------------

#pragma package(smart_init)
//worker thread executant,for all I/O C.P. workers
//完成端口例程线程执行体
//参数
//lpParam:用户数据,完成端口类指针
unsigned WINAPI WorkerThread(LPVOID lpParam)
{
   unsigned Result;  //线程返回值
   TtcpIOCP *tcpIOCP;//完成端口管理器类指针
   HANDLE hIOCP;     //完成端口句柄
   DWORD InternalThreadID;//当前线程ID
   LPIODATA OutData; //数据元素链式结构体指针
   DWORD dwIOSize;   //完成例程提交数据长度值
   LPOVERLAPPEDEX lpPerIOData;//重叠I/O信息扩展结构体指针
   LPOVERLAPPED pOverLapped;//重叠I/O信息原始结构体指针
   LPCONN_CTX lpConnCtx;//用户连接上下文结构体指针
   DWORD LastError;  //最后一次线程内OS级错误代码
   DWORD Socket_Error;//最后一次线程内网络级(Socket)错误代码
   bool bSuccess;    //完成例成队列是否弹出
   int nResult;      //网络作业提交返回值
   bool Should_Break;
   #ifdef __DEBUG
   DWORD PreThreadId;
   #endif
   Result=0;//预置线程返回值(Exit Code)
   tcpIOCP=(TtcpIOCP *)lpParam;//将参数还原成原始类型
   hIOCP=tcpIOCP->Handle;//取得完成端口例程句柄
   InternalThreadID=GetCurrentThreadId();//取得当前线程的ID号

   while(1)
   {
	  ////////////////////////////////////////////////////////////
	  //判断当前同步运行之线程数量是否超出用户预期运行之线程数量//
	  //如果超出该期望值,则判断是否有2个以上线程处于用户数据守 //
	  //候状态,如果有足够线程等待新的完成例程事件,则结束线程  //
	  //                                                        //
	  //值得注意的是:                                           //
	  //    1.完成例程通知线程是使用LIFO的顺序,                //
	  //      也就是最近(后)进入待的线程将得到最大的优先运行权  //
	  //      主要是考虑到当一个线程长时间不工作时对于操作系统  //
	  //      则须将分配给该线程的资源转换到磁盘缓冲当中出      //
	  //      从而提高物理内存的利用率,以预期达到最优的运行效果//
	  //    2.对于网络(包括所有关联于该完成例程的I/O事件都采用 //
	  //      FIFO的顺序,用以保证数据的顺序性和有效性          //
	  ////////////////////////////////////////////////////////////
	  if((tcpIOCP->ThreadCount>tcpIOCP->FExpectThreadCount)&&
		 (tcpIOCP->FActiveThreadCount>2))
	  {
		 //将线程置入线程顺收站
		 //以便让时脉处理例程完成对相关资源的回收,以及关闭相关句柄
		 tcpIOCP->DumpThread(InternalThreadID);
		 //结束线程循环,终结线程
		 break;
	  }

	  //Active Queued Thread Signal
	  //增加活动{实际为等待完成例程队列)计数
	  InterlockedIncrement(&(tcpIOCP->FActiveThreadCount));
	  //加入完成端口例程处理线程队列,以等待完成例程事件/讯号
          dwIOSize=0;
          lpConnCtx=NULL;
          pOverLapped=NULL;
          lpPerIOData=NULL;
          OutData=NULL;
	  bSuccess=GetQueuedCompletionStatus(hIOCP,
                                             &dwIOSize,
                                             (LPDWORD)&lpConnCtx,
                                             &pOverLapped,
                                             INFINITE);
	  //Inactive Queued Thread Signal
	  //得到一个完成例程讯号,线程从等待队列中出列
	  //减少活动{实际为等待完成例程队列)计数
	  InterlockedDecrement(&(tcpIOCP->FActiveThreadCount));

	  //No more thread waiting for completion
	  //and in working State
	  //and Thread count is lower than the Max count
	  //Increment One Thread
	  //为保证至少有一个线程处于等待完成例程的处理线程的队列当中
	  //将在最大线程的限制范围内,增加实际工作线程
	  //该值可能会超出用户预期同步运行线程数
	  //if((1>(tcpIOCP->FActiveThreadCount))
	  /*	  if((tcpIOCP->ExpectThreadCount>(tcpIOCP->ThreadCount))
		 &&(tcpIOCP->ThreadCount<(long)MAX_THREAD_NUMBER)
		 &&(tcpIOCP->FRunningState>LS_SHUTDN))
	  {
		 tcpIOCP->AddThread();
	  }*/

	  //Terminate thread by PostQueuedCompletionStatus
	  //线程结束通知,由PostQueuedCompletionStatus发出
	  //并使得参数lpCompletionKey为NULL
	  if(lpConnCtx==NULL)
	  {
		 break;
	  }

	  //取得重叠I/O信息扩展结构体(从原型指针当中还原)
	  lpPerIOData=(LPOVERLAPPEDEX)(pOverLapped);
          pOverLapped=NULL;
	  //当一个得到一个完成通知
	  //并且传输数据量为0表示客户端正常断开
	  if(bSuccess&&(dwIOSize==0))
	  {
		 //减少未决操作引用计数
		 //该计数主要是为了免除时脉例程处理回收站数据时
		 //在所有事件完成/取消前将相关的资源释放掉
		 //而导致MSWINSOCK.DLL内部内存地址访问非法错误
		 InterlockedDecrement(&(lpPerIOData->LockCount));
		 //将该连接上下文信息节点移入回收站
		 //以供资源回收例程进一步处理
                 lpPerIOData=NULL;
		 tcpIOCP->DumpConnection(lpConnCtx);
		 //继续处理下一个完成端口通知事件
		 continue;
	  }

	  //完成端口未通知相关事件
	  //则可能是由部分线程自我数量调控
	  //而自行终结导致在该线程提交的I/O请求被取消
	  //或者网络异常
	  if(!bSuccess)
	  {
		 /*Here should Print the InternalThreadID
		   and Previous InternalThread in OVERLAPPEDEX*/
		 //取得最近错误代码
		 LastError=GetLastError();
		 Socket_Error=WSAGetLastError();
                 if(NULL==lpPerIOData)
                 {
                    //SetLastError(0);
                    continue;
                 }
		 /*if(  LastError!=ERROR_IO_PENDING
						//重叠 I/O 操作在进行中。
						//Overlapped I/O operation is in progress.
			&&LastError!=ERROR_OPERATION_ABORTED
						//由于线程退出或应用程序请求,已放弃 I/O 操作。
						//The I/O operation has been aborted because of either a thread exit or an application request.
			&&LastError!=ERROR_NETNAME_DELETED
						//指定的网络名不存在
						//The specified network name is no longer available.
			&&LastError!=ERROR_INVALID_PARAMETER
						//参数不正确
						//The parameter is incorrect.
		   )*/

		 switch(LastError)
		 {
                        case ERROR_OPERATION_ABORTED:
                             #ifdef __DEBUG
							 /*if(lpPerIOData->WorkingThreadID!=InternalThreadID)
							 {
								WSASetLastError(0);
								SetLastError(0);
								break;
                             }*/
                             #endif
			case ERROR_NETNAME_DELETED://指定的网络名不再可用。

			case ERROR_INVALID_PARAMETER://参数不正确。
				 //完成通知队列未弹出不能减少未决操作引用计数
				 //该计数主要是为了免除时脉例程处理回收站数据时
				 //在所有事件完成/取消前将相关的资源释放掉
				 //而导致MSWINSOCK.DLL内部内存地址访问非法错误
				 //将该连接上下文信息节点移入回收站
				 //以供资源回收例程进一步处理

                        default:
                                 #ifdef __ERRORLOG
                                 {
								 char *Msg;
                                 Msg=(char *) AllocEx(256);
                                 //memset(Msg,0,256);
                                 strcpy(Msg,"ThrdID:");
                                 ltoa(InternalThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|SocketError:");
                                 ltoa(Socket_Error,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|ThreadError:");
                                 ltoa(LastError,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|I/O Size:");
                                 ltoa(dwIOSize,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|SOCKET:");
                                 itoa(lpConnCtx->sockAccept,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|Oper:");
                                 itoa(lpPerIOData->oper,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|LockCount:");
                                 ltoa(lpPerIOData->LockCount,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|PreThrdID:");
                                 ltoa(lpPerIOData->WorkingThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|PerIOData:");
                                 ltoa((DWORD)lpPerIOData,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|ConnCtx:");
                                 ltoa((DWORD)lpConnCtx,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|I_OverLappedEx:");
                                 ltoa((DWORD)lpConnCtx->I_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|O_OverLappedEx:");
                                 ltoa((DWORD)lpConnCtx->O_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|Queued:Failed");
                                 //GetThreadId( tcpIOCP->conn_CriticalSection.OwningThread)
                                 SendMessage(HWND_BROADCAST,RegisterWindowMessage("DumpMessage"),(WPARAM)Msg,(LPARAM)strlen(Msg));
                                 FreeEx(Msg);
                                 Msg=NULL;
                                 }
                                 #endif
                                 InterlockedDecrement(&(lpPerIOData->LockCount));
                                 lpPerIOData=NULL;
				 tcpIOCP->DumpConnection(lpConnCtx);
                                                                  
				 //继续处理下一个完成端口通知事件
								 //SetLastError(0);
								 //WSASetLastError(0);
				 continue;
		 }
	  }

	  //减少未决操作引用计数
	  //该计数主要是为了免除时脉例程处理回收站数据时
	  //在所有事件完成/取消前将相关的资源释放掉
	  //而导致MSWINSOCK.DLL内部内存地址访问非法错误
	  //InterlockedDecrement(&(lpPerIOData->LockCount));
          Should_Break=false;
	  switch(lpPerIOData->oper)
	  {
		 case SVR_IO_READ://send then recv
			  try
			  {
				 if(dwIOSize)//有收到数据
					//交给数据处理过程进行处理
					tcpIOCP->InternalOnReceiveData(lpConnCtx,dwIOSize,lpPerIOData->data);
			  }
			  catch(...)
			  {
			  }
			  //更新流量计数,包括当前用户连接的流量统计值
			  //以及全局流量统计值
			  /*tcpIOCP->Lock();
			  try
			  {*/
				 //更新单连接的总接收流量,超int64时"归零"
				 if(lpConnCtx->TotalRecv>(MAX_INT64-dwIOSize))
					lpConnCtx->TotalRecv=dwIOSize;
				 else
					lpConnCtx->TotalRecv=lpConnCtx->TotalRecv+dwIOSize;

				 //更新全局流量计数
				 InterlockedExchangeAdd(&(tcpIOCP->BytesRecvPerInterval),dwIOSize);

				 //判断该连接是否已经标示为回收
				 //进入回收状态的连接将不再提交作业请求
				 if(true==lpConnCtx->InDumping)
                                 {
                                 #ifdef __ERRORLOG
                                 {
                                 char *Msg;
                                 Msg=(char *) AllocEx(256);
                                 //memset(Msg,0,256);
                                 strcpy(Msg,"ThrdID:");
                                 ltoa(InternalThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|SocketError:");
                                 ltoa(WSAGetLastError(),(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|ThreadError:");
                                 ltoa(GetLastError(),(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|I/O Size:");
                                 ltoa(dwIOSize,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|SOCKET:");
                                 itoa(lpConnCtx->sockAccept,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|Oper:Recv");
                                 strcat(Msg,"|LockCount:");
                                 ltoa(lpPerIOData->LockCount,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|PreThrdID:");
                                 ltoa(lpPerIOData->WorkingThreadID,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|PerIOData:");
                                 ltoa((DWORD)lpPerIOData,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|ConnCtx:");
                                 ltoa((DWORD)lpConnCtx,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|I_OverLappedEx:");
                                 ltoa((DWORD)lpConnCtx->O_OverLappedEx,(char *)(&Msg[0]+strlen(Msg)),10);
                                 strcat(Msg,"|Queued:Successed");
                                 //GetThreadId( tcpIOCP->conn_CriticalSection.OwningThread)
                                 SendMessage(HWND_BROADCAST,RegisterWindowMessage("DumpMessage"),(WPARAM)Msg,(LPARAM)strlen(Msg));
                                 FreeEx(Msg);
                                 Msg=NULL;
                                 }
                                 #endif
                                    InterlockedDecrement(&(lpPerIOData->LockCount));
                                    Should_Break=true;
                                 }
				 //增加未决传输引用计数
				 //InterlockedIncrement(&(lpPerIOData->LockCount));
			  /*}
			  __finally
			  {
				 tcpIOCP->Unlock();
			  } */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -