⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpbase.cpp

📁 决战帝王1.5武神降临对喜爱决战的玩家共享研究用
💻 CPP
📖 第 1 页 / 共 2 页
字号:
		}

loop_pass:;
		if ( pSDS )
			delete pSDS;
		pSDS = NULL;

		if ( pBSocket && pBSocket->GetIOPendingEnableFlag() == TRUE )
		{
			if ( !pLastBSocket ) 
			{
				nContinueSendinfCount++;
				pLastBSocket = pBSocket;
			}
			else
			{
				if ( pLastBSocket == pBSocket )
					nContinueSendinfCount++;
				else
				{
					nContinueSendinfCount = 0;
					pLastBSocket = pBSocket;
				}
			}

			if ( nContinueSendinfCount > MAX_CONTINUE_SENDING_COUNT )
			{
				nContinueSendinfCount = 0;
				pLastBSocket = NULL;

				LeaveCriticalSection( pSendCriticalSection );
				Sleep(1);
				continue;
			}
			else
			{
				if ( pIocpBase->m_nDataListLength[nThreadNo] > 0 )
				{
					if ( GetTickCount() - process_tick > MAX_SEND_CONTINUE_TICK )
					{
						nContinueSendinfCount = 0;
						LeaveCriticalSection( pSendCriticalSection );
						Sleep(1);
						continue;
					}

					goto data_loop;
				}

				nContinueSendinfCount = 0;
				pLastBSocket = NULL;
			}
		}
		else
		{
			if ( ++nContinueSendinfCount > 50 )
			{
				nContinueSendinfCount = 0;
				LeaveCriticalSection( pSendCriticalSection );
				Sleep(1);
				continue;
			}
			else
			{
				if ( pIocpBase->m_nDataListLength[nThreadNo] > 0 )
				{
					if ( GetTickCount() - process_tick > MAX_SEND_CONTINUE_TICK )
					{
						nContinueSendinfCount = 0;
						LeaveCriticalSection( pSendCriticalSection );
						Sleep(1);
						continue;
					}

					goto data_loop;
				}

				nContinueSendinfCount = 0;
			}
		}

		LeaveCriticalSection( pSendCriticalSection );

	}

	return 1;
}
*/

UINT WorkerDataProcThread( void *lp )
{
	DATAPROCTHREADPACKET *pDPTP = (DATAPROCTHREADPACKET *)lp;
	CIOCPBASE *pIocpBase = pDPTP->pIocpbase;
	CPoolBaseManager *pSQM = pIocpBase->m_pPBM;

	int nThreadNo = 0;
	nThreadNo = pIocpBase->m_CurDataProcThreadNo;
	pIocpBase->m_CurDataProcThreadNo++;
	CBSocket *pSocket;


start_loop:;
	if ( pIocpBase->m_CurDataProcThreadNo < pIocpBase->m_ThreadCount )
	{
		Sleep(1);
		goto start_loop;
	}

	TRACE("Thread Started [%d][%d]...\n", nThreadNo, pIocpBase->m_CurDataProcThreadNo-1 );

	WAIT_RECV_DATA *wrd;

	// 胶饭靛 皋牢 风凭...
	for (;;)
	{
		// IKING 2002.7.3
		if ( pIocpBase->m_nHeadPtr[nThreadNo] == pIocpBase->m_nTailPtr[nThreadNo] )
		{
			Sleep(1);
			continue;
		}

		wrd = pIocpBase->m_pRecvData[nThreadNo][pIocpBase->m_nTailPtr[nThreadNo]];
		pIocpBase->m_pRecvData[nThreadNo][pIocpBase->m_nTailPtr[nThreadNo]] = NULL;
		pIocpBase->m_nTailPtr[nThreadNo]++;
		pIocpBase->m_nTailPtr[nThreadNo] %= WAIT_RECV_DATA_BUFFER;

		if ( wrd == NULL ) continue;

		//g_ThreadAliveTime[nThreadNo] = GetTickCount();

		if ( wrd->m_Type == SOCKET_FOR_DISCONNECT )
		{
			if ( wrd->usn >= 0 && wrd->usn < 32767 )
			{
				CPoolBaseManager *pSQM = pIocpBase->m_pPBM;
				if(!pSQM)
				{
					delete wrd;
					continue;
				}
				else
				{
					if ( pSQM->m_pResources->IsFree( wrd->usn ) == false ) 
					{
						pSocket = (CBSocket *)pSQM->m_pResources->GetDataValue( wrd->usn );
						if ( !pSocket )
						{
							delete wrd;
							continue;
						}
					}
					else
					{
						delete wrd;
						continue;
					}

				}
			}
			else
			{
				delete wrd;
				continue;
			}

			if ( pSocket == NULL )
			{
				delete wrd;
				continue;
			}

			pSocket->SockCloseProcess();
			
			delete wrd;
			continue;
		}

		// 荤侩磊 沥狼 单捞鸥 贸府 窃荐 龋免...
		pDPTP->fn(wrd);

		delete wrd;
	}


	return 1;
}

CIOCPBASE::CIOCPBASE()
{
	int i,j;
	m_ThreadCount = 1;
	m_CurThreadNo = 0;
	m_pPBM = NULL;
	m_pIopendingData = NULL;
	m_nIopendingDataCount = 0;
	m_bIOPendingStop = FALSE;

	for ( i = 0; i < MAX_WORKER_THREAD; i++ )
	{
		InitializeCriticalSection(&m_CS_ReceiveData[i]);
	}

	m_CurRecvThreadNo = 0;
	m_CurDataProcThreadNo = 0;

	// IKING 2002.7.3
	for ( i = 0; i < MAX_WORKER_THREAD+1; i++ )
	{
		m_nHeadPtr[i] = 0;
		m_nTailPtr[i] = 0;
	};

	for ( i = 0; i < MAX_WORKER_THREAD+1; i++ )
	{
		for ( j = 0; j < WAIT_RECV_DATA_BUFFER+1; j++ )
		{
			m_pRecvData[i][j] = NULL;
		}
	};
}

CIOCPBASE::~CIOCPBASE()
{
	// Listen Socket Array 瘤快扁...
	for ( int i = 0; i < m_ListenSocketArray.GetSize(); i++ )
	{
		if ( m_ListenSocketArray[i] )
		{
			delete m_ListenSocketArray[i];
			m_ListenSocketArray[i] = NULL;
		}
	}
	m_ListenSocketArray.RemoveAll();

	// Event Array 瘤快扁...
	for ( i = 0; i < m_hListenEventArray.GetSize(); i++ )
	{
		if ( m_hListenEventArray[i] )
		{
			delete m_hListenEventArray[i];
			m_hListenEventArray[i] = NULL;
		}
	}
	m_hListenEventArray.RemoveAll();

	// Thread Packet Array 瘤快扁...
	for ( i = 0; i < m_ThreadPacketArray.GetSize(); i++ )
	{
		if ( m_ThreadPacketArray[i] )
		{
			delete m_ThreadPacketArray[i];
			m_ThreadPacketArray[i] = NULL;
		}
	}
	m_ThreadPacketArray.RemoveAll();
	//

	for ( i = 0; i < MAX_WORKER_THREAD; i++ )
	{
		DeleteCriticalSection(&m_CS_ReceiveData[i]);
	}

	if ( m_pIopendingData )
	{
		delete[] m_pIopendingData;
		m_pIopendingData = NULL;
	}

	CloseHandle( m_CreateSignalEvent );
}

void CIOCPBASE::CreateAcceptThread()
{
	//DWORD id;
	//unsigned int id;
	//HANDLE acceptThread;

	m_pThreadPacket = new THREADPACKET;
	m_pThreadPacket->pIocpbase = this;
	m_pThreadPacket->phListenSockEvent = m_phListenSocketEvent;
	m_pThreadPacket->pListenSock = m_pSocketListen;
	m_pThreadPacket->iSocketType = m_Type;
	m_ThreadPacketArray.Add(m_pThreadPacket);

	//acceptThread = ::CreateThread( NULL, 0, AcceptListenThread, (LPVOID)m_pThreadPacket, 0, &id);
	//acceptThread = (HANDLE)_beginthreadex( NULL, 0, &AcceptListenThread, (LPVOID)m_pThreadPacket, 0, &id);
	m_acceptThread = AfxBeginThread( AcceptListenThread, (LPVOID)m_pThreadPacket );

	//::SetThreadPriority(acceptThread,THREAD_PRIORITY_ABOVE_NORMAL); //滚弊 蜡惯...
	//::SetThreadPriority(acceptThread,THREAD_PRIORITY_NORMAL);
}

void CIOCPBASE::CreateWorkerThread()
{
	SYSTEM_INFO		SystemInfo;

	HANDLE			hWorkerThread[MAXWORKERTHREAD];
	//DWORD			WorkerId[MAXWORKERTHREAD];
	unsigned int 	WorkerId[MAXWORKERTHREAD];

	//
	// try to get timing more accurate... Avoid context
	// switch that could occur when threads are released
	//
	//SetPriorityClass(GetCurrentProcess(), ABOVE_NORMAL_PRIORITY_CLASS);
	//SetPriorityClass(GetCurrentProcess(), HIGH_PRIORITY_CLASS);
	//SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
	SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
	//
	// Figure out how many processors we have to size the minimum
	// number of worker threads and concurrency
	//
	GetSystemInfo (&SystemInfo);

	//-------------------------------------------------------------------------
	// 荐脚 胶饭靛 父甸扁...
	m_dwNumberOfWorkers = 2 * SystemInfo.dwNumberOfProcessors + 2;
	m_dwConcurrency = 0;
	
	//for (int i = 0; i < m_ThreadCount; i++ )
	//{
	//	m_hIOCPort[i] = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, (DWORD)-1, m_dwConcurrency );
	//}

	m_hIOCPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE,
		                                 NULL,
										 (DWORD)-1,
										 m_dwConcurrency );
	
	// 胶饭靛 积己窍扁...
	for ( DWORD i = 0; i < m_dwNumberOfWorkers; i++ )
	{
		/*
		hWorkerThread[i] = ::CreateThread(
										NULL,
										0,
										WorkerClientSocketThread,
										(LPVOID)this,
										CREATE_SUSPENDED,
										&WorkerId[i]
										);
		*/

		hWorkerThread[i] = (HANDLE)_beginthreadex(
										NULL,
										0,
										WorkerClientSocketThread,
										(LPVOID)this,
										CREATE_SUSPENDED,
										&WorkerId[i]
										);

		//m_hWorkerThread[i] = AfxBeginThread( WorkerSocketThreadWrapper, (LPVOID)this );

		//SetThreadPriority( hWorkerThread[i], THREAD_PRIORITY_TIME_CRITICAL);
		//SetThreadPriority( hWorkerThread[i], THREAD_PRIORITY_HIGHEST);
		SetThreadPriority( hWorkerThread[i], THREAD_PRIORITY_ABOVE_NORMAL);
	}

	// 胶饭靛 劝己拳...
	for ( DWORD j = 0; j < m_dwNumberOfWorkers; j++ )
	{
		ResumeThread( hWorkerThread[j] );
	}
}

BOOL CIOCPBASE::Associate(CIOCPSocket *pIocpSock)
{
	HANDLE hTemp;

	if (!m_hIOCPort)
	{
		//TRACE("]Error : There is no Completion Port[%d]\n", iModSid );
		return FALSE;
	}
	
	hTemp = CreateIoCompletionPort( (HANDLE)pIocpSock->m_Socket, m_hIOCPort, (DWORD)pIocpSock->m_Sid, m_dwConcurrency);
	if ( hTemp == NULL )
	{
		#ifdef _DEBUG		
		LPVOID lpMsgBuf;
		FormatMessage( 
			FORMAT_MESSAGE_ALLOCATE_BUFFER | 
			FORMAT_MESSAGE_FROM_SYSTEM | 
			FORMAT_MESSAGE_IGNORE_INSERTS,
			NULL,
			GetLastError(),
			MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
			(LPTSTR) &lpMsgBuf,
			0,
			NULL 
		);

		TRACE("CreateIoCompletionPort Error : %s\n", lpMsgBuf );
		LocalFree( lpMsgBuf );
		#endif

		return FALSE;

	}
	
	return (hTemp == m_hIOCPort);
}

BOOL CIOCPBASE::Listen(int port, char *pStrSvrAddr )
{
	int opt;
	struct sockaddr_in addr;
	//struct linger lingerOpt;
	
	// Open a TCP socket (an Internet stream socket).
	//
	m_pSocketListen = new SOCKET;
	if ( m_pSocketListen == NULL ) return FALSE;

	*m_pSocketListen = WSASocket(AF_INET,
		                         SOCK_STREAM,
								 0,
								 NULL,
								 0,
								 WSA_FLAG_OVERLAPPED);

	if ( *m_pSocketListen < 0 ) 
	{
		TRACE("]Error(01) - Can't Open Stream Socket\n");
		return FALSE;
	}

	// Bind our local address so that the client can send to us. 
	//
	memset((void *)&addr, 0, sizeof(addr));
	addr.sin_family	= AF_INET;
	
	if ( pStrSvrAddr == NULL )
		addr.sin_addr.s_addr	= htonl(INADDR_ANY);
	else
		addr.sin_addr.s_addr	= inet_addr( pStrSvrAddr );

	addr.sin_port			= htons(port);

	// added in an attempt to allow rebinding to the port 
	//
	// KEEP_ALIVE甫 enable 矫挪促...
	opt = 1;
	setsockopt( *m_pSocketListen, SOL_SOCKET, SO_KEEPALIVE, (char *)&opt, sizeof(opt));
	
	// added in an attempt to allow rebinding to the port 
	//
	opt = 1;
	setsockopt( *m_pSocketListen, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt));

	// Linger off -> close socket immediately regardless of existance of data 
	//
	//lingerOpt.l_onoff = 0;
	//lingerOpt.l_linger = 0;
	//setsockopt( *m_pSocketListen, SOL_SOCKET, SO_LINGER, (char *)&lingerOpt, sizeof(lingerOpt) );

	// IKING 2002.1.
/*	int ret, optlen;
	BOOL bNagleingFlag = 1;
	optlen = sizeof(bNagleingFlag);
	ret = getsockopt( *m_pSocketListen, IPPROTO_TCP, TCP_NODELAY, (char *)&bNagleingFlag, &optlen );
	if ( ret == SOCKET_ERROR )
	{
		TRACE("GETTING TCP_NODELAY ERROR(%d)...\n", WSAGetLastError() );
	}

	if ( bNagleingFlag == FALSE )
	{
		bNagleingFlag = TRUE;
		ret = setsockopt( *m_pSocketListen, IPPROTO_TCP, TCP_NODELAY, (char *)&bNagleingFlag, sizeof(bNagleingFlag) );
		if ( ret == SOCKET_ERROR )
		{
			TRACE("SETTING TCP_NODELAY ERROR(%d)...\n", WSAGetLastError() );
		}
	}
*/	//
	
	if ( bind( *m_pSocketListen, (struct sockaddr *)&addr, sizeof(addr)) < 0 )
	{
		TRACE("]Error(02)- Can't bind local address\n");
		return FALSE;
	}

	int socklen, len, err;

//	socklen = DEFAULT_RCVBUF * 5;
	socklen = 8192 * 8;
	setsockopt( *m_pSocketListen, SOL_SOCKET, SO_RCVBUF, (char*)&socklen, sizeof(socklen) );
	
	len = sizeof(socklen);
	err = getsockopt( *m_pSocketListen, SOL_SOCKET, SO_RCVBUF, (char*)&socklen, &len );
	if (err == SOCKET_ERROR)
	{
		TRACE("]Set Socket RecvBuf of port(%d) as %d : Fail\n", port, socklen);
		return FALSE;
	}

//	socklen = DEFAULT_RCVBUF*5;
	socklen = 8192 * 8;//8192 * 4;
	setsockopt( *m_pSocketListen, SOL_SOCKET, SO_SNDBUF, (char*)&socklen, sizeof(socklen) );
	len = sizeof(socklen);
	err = getsockopt( *m_pSocketListen, SOL_SOCKET, SO_SNDBUF, (char*)&socklen, &len );

	if (err == SOCKET_ERROR)
	{
		TRACE("]Set Socket SendBuf of port(%d) as %d : Fail\n", port, socklen);
		return FALSE;
	}
	
	listen( *m_pSocketListen, 5 );
	m_ListenSocketArray.Add( m_pSocketListen );

	m_phListenSocketEvent = new HANDLE;
	*m_phListenSocketEvent = WSACreateEvent();
	m_hListenEventArray.Add(m_phListenSocketEvent);

	WSAEventSelect( *m_pSocketListen, *m_phListenSocketEvent, FD_ACCEPT);

	TRACE("]Port[%05d] initialzed\n", port);

	CreateAcceptThread();

	return TRUE;
}

int CIOCPBASE::Init( int type, CPoolBaseManager *pPBM, DWORD (*fn)(LPVOID lp) )
{
	m_Type = type;
	m_pPBM = pPBM;

	m_bAcceptEnableFlag = FALSE;

	// 函荐 檬扁拳...
	for ( int i = 0; i < m_ThreadCount; i++ )
	{
		m_nDataListLength[i] = 0;
	}

	m_CreateSignalEvent = CreateEvent( NULL, TRUE, FALSE, NULL );

	CreateWorkerThread();

	// 单捞鸥 贸府 弊饭靛 父甸扁...
	if ( fn )
	{
		//HANDLE hWorkerDataProcThread[MAXWORKERTHREAD];
		//unsigned int WorkerDataProcId[MAXWORKERTHREAD];

		m_Dptp.pIocpbase = this;
		m_Dptp.fn = fn;

		for ( i = 0; i < m_ThreadCount; i++ )
		{
			/*
			hWorkerDataProcThread[i] = (HANDLE)_beginthreadex(
										NULL,
										0,
										WorkerDataProcThread,
										(LPVOID)&m_Dptp,
										CREATE_SUSPENDED,
										&WorkerDataProcId[i]
										);
			*/

			m_hWorkerThread[i] = AfxBeginThread( WorkerDataProcThread, (LPVOID)&m_Dptp, THREAD_PRIORITY_ABOVE_NORMAL );

		}

		// 单捞鸥 胶饭靛 劝己拳...
		//for ( i = 0; i < m_ThreadCount; i++ )
		//{
		//	ResumeThread( hWorkerDataProcThread[i] );
		//}
		//}
	}

	return 1;
}

void CIOCPBASE::SetAliveTimeUpdate(int uid, DWORD ctick )
{
	if ( m_pPBM )
		m_pPBM->m_pResources->SetTimeStamp( uid, ctick );
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -