⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpsocketserver.cpp

📁 视频监控vc源代码.对于做视频系统的朋友们很有帮助
💻 CPP
字号:
#include "TCPSocketServer.h"
#include <crtdbg.h>

TCPSocketServer::TCPSocketServer(void)
		  :m_hIOCP ( INVALID_HANDLE_VALUE ),
           m_MaxThreads ( 0 ),
           m_ConcurrentThreads ( 0 ),
           m_RunningThreads ( 0 ),
           m_NumThreads ( 0 ),
		   m_hStopEvent ( INVALID_HANDLE_VALUE ),
		   stop(true)
{
	for(int i = 0;i<1000;i++){
		PER_SOCKET_DATA *PerSocketData = new PER_SOCKET_DATA;
		m_socks.push_back(PerSocketData);
	}
}

TCPSocketServer::~TCPSocketServer(void){
	for(int i = 0;i<1000;i++){
		PER_SOCKET_DATA *PerSocketData = (PER_SOCKET_DATA*)m_socks.front();
		delete PerSocketData;
		m_socks.pop_front();
	}
}

bool TCPSocketServer::init(){

	SYSTEM_INFO sysinfo;
	GetSystemInfo(&sysinfo);

	m_ConcurrentThreads = sysinfo.dwNumberOfProcessors*2 + 2;
	m_MaxThreads = sysinfo.dwNumberOfProcessors*3 + 2;

    m_hStopEvent = CreateEvent ( NULL, TRUE, FALSE, NULL );
    if ( m_hStopEvent == NULL )return false;

	m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,NULL,m_ConcurrentThreads);
	if (m_hIOCP == NULL)return false;

	for ( UINT i = 0; i < m_MaxThreads; ++i ){
		if(!StartThread())return false;
	}

	return true;
}

bool TCPSocketServer::StartThread(){

    HANDLE hThread  = NULL;
    unsigned int ThreadId = 0;

	hThread = (HANDLE)_beginthreadex( NULL, 0, TCPSocketServer::ThreadProc, (LPVOID)this, 0, &ThreadId );

    if ( hThread == NULL )return false;

	m_ThreadMapLock.Acquire();
	m_ThreadMap.insert(hash_map<DWORD,DWORD>::value_type(ThreadId,(DWORD)hThread));
	m_ThreadMapLock.Release();
	::InterlockedIncrement((long*)&m_NumThreads);
	::InterlockedIncrement((long*)&m_RunningThreads);
	return true;
}

int TCPSocketServer::Send(char* pData, size_t nLen, SOCKET s, DWORD dwParam){

	if(dwParam == 0)return -1;

	DWORD NumBytes;
	int nRet;

	PER_SOCKET_DATA *PerSocketData = (PER_SOCKET_DATA *)dwParam;
	++PerSocketData->io_count;

	PER_IO_OPERATION_DATA *PerIOData = new PER_IO_OPERATION_DATA;
	ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));
	memcpy(PerIOData->Buffer,pData,nLen);
	PerIOData->DataBuf.len = nLen;
	PerIOData->DataBuf.buf = PerIOData->Buffer;
	PerIOData->OperationType = SEND_POSTED;

	nRet = WSASend(s,&(PerIOData->DataBuf),1,&NumBytes,0,(OVERLAPPED *)PerIOData,NULL);

//	ZeroMemory(PerIOData->Buffer,sizeof(PerIOData->Buffer));//just for test.
	if(SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())){
		--PerSocketData->io_count;
		closesocket(s);
		delete PerIOData;
		return -1;
	}

	return 0;
}
int TCPSocketServer::Connect(unsigned short usPort,unsigned long ulIP,SOCKET& addr_a,DWORD& addr_b){

	SOCKET sck = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
	if(INVALID_SOCKET == sck)return -1;

	sockaddr_in sAddr;
	sAddr.sin_family=AF_INET;
	sAddr.sin_port=htons(usPort);
	sAddr.sin_addr.S_un.S_addr=htonl(ulIP);
	if ( SOCKET_ERROR == connect(sck,(sockaddr*)&sAddr,sizeof(sockaddr_in)) ) return -1;

	PER_SOCKET_DATA *PerSocketData = new PER_SOCKET_DATA();
	PerSocketData->socket = sck;
	PerSocketData->io_count = 0;

	HANDLE iocp = CreateIoCompletionPort((HANDLE)sck,m_hIOCP,(ULONG_PTR)PerSocketData,m_ConcurrentThreads);
	if(iocp == NULL)return -1;

	OnAccepted(sck,(DWORD)PerSocketData);
//	OnConnect(sck,(DWORD)PerSocketData);
	addr_a = sck;
	addr_b = (DWORD)PerSocketData;

	PER_IO_OPERATION_DATA* PerIOData = new PER_IO_OPERATION_DATA;
	ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));
	PerIOData->DataBuf.len = sizeof(PerIOData->Buffer);
	PerIOData->DataBuf.buf = PerIOData->Buffer;
	PerIOData->OperationType = RECV_POSTED;

	DWORD Flags = 0;
	DWORD RecvBytes = 0;

	int nRet = WSARecv(sck,&PerIOData->DataBuf,1,&RecvBytes,&Flags,(OVERLAPPED *)PerIOData,NULL);
	if (SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())){
		delete PerIOData;
		ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
		return -1;
	}

	return 0;
}
unsigned int TCPSocketServer::ThreadProc(void* lpParam){

	TCPSocketServer &pthis=*((TCPSocketServer*)lpParam);

	DWORD NumBytes = 0;
	LPOVERLAPPED Overlapped = NULL;
	PER_IO_OPERATION_DATA *PerIoData = NULL;
	PER_SOCKET_DATA *PerSocketData = NULL;
	int nRet;
	DWORD RecvBytes,Flags;

	while(true){

		::InterlockedDecrement((long*)&(pthis.m_RunningThreads));
		nRet = GetQueuedCompletionStatus(pthis.m_hIOCP,&NumBytes,(ULONG_PTR*)&PerSocketData,&Overlapped,INFINITE);
		::InterlockedIncrement ((long*)&(pthis.m_RunningThreads));
		
		if (!nRet) {
			if(Overlapped != NULL){
				PerIoData = (PER_IO_OPERATION_DATA *)Overlapped;
				if((RECV_POSTED == PerIoData->OperationType) && (PerSocketData!=NULL)){
					pthis.ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
				}
				delete PerIoData;
			}
			continue;
		}else if( NumBytes == 0 ){ //nRet == ERROR_SUCCESS
			PerIoData = (PER_IO_OPERATION_DATA *)Overlapped;
			if((RECV_POSTED == PerIoData->OperationType) && (PerSocketData!=NULL)){
				pthis.ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
			}
			delete PerIoData;
			continue;
		}

		PerIoData = (PER_IO_OPERATION_DATA *)Overlapped;

		if(PerIoData->OperationType == STOP_POSTED)break;

		switch(PerIoData->OperationType){
			case RECV_POSTED:
				pthis.OnReceive(PerIoData->Buffer,NumBytes,PerSocketData->socket,(DWORD)PerSocketData);

				ZeroMemory(PerIoData,sizeof(PER_IO_OPERATION_DATA));
				PerIoData->DataBuf.len = sizeof(PerIoData->Buffer);
				PerIoData->DataBuf.buf = PerIoData->Buffer;
				PerIoData->OperationType = RECV_POSTED;

				Flags = 0;

				nRet = WSARecv(PerSocketData->socket,&(PerIoData->DataBuf),1,&RecvBytes,&Flags,(OVERLAPPED *)PerIoData,NULL);
				if (SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())){
					delete PerIoData;
					pthis.ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
				}
				
				break;
 
			case SEND_POSTED:
				--PerSocketData->io_count;
				pthis.OnSend(PerSocketData->socket,(DWORD)PerSocketData);
				delete PerIoData;
				break;
		}
	}

	pthis.ThreadCleanUp();

//	_endthreadex(0);
	return 0;
}
void TCPSocketServer::ThreadCleanUp(){

    m_ThreadMapLock.Acquire ( );
    hash_map<DWORD,DWORD>::iterator it = m_ThreadMap.find(::GetCurrentThreadId());
    CloseHandle ( (HANDLE)it->second );
    m_ThreadMap.erase ( it );
    m_ThreadMapLock.Release ( );

    ::InterlockedDecrement ( (long*)&m_RunningThreads );
    if ( ::InterlockedDecrement ( (long*)&m_NumThreads ) == 0 )
    {
	    ::SetEvent ( m_hStopEvent );
    }
}
bool TCPSocketServer::Stop(){

    hash_map<DWORD,DWORD>::iterator it;
    m_ThreadMapLock.Acquire ( );
    size_t numthreads = m_ThreadMap.size();
    m_ThreadMapLock.Release ( );

	PER_IO_OPERATION_DATA *PerIOData = new PER_IO_OPERATION_DATA;
	ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));
	PerIOData->DataBuf.len = sizeof(PerIOData->Buffer);
	PerIOData->DataBuf.buf = PerIOData->Buffer;
	PerIOData->OperationType = STOP_POSTED;

    for ( ; numthreads > 0; numthreads-- ){
	    PostQueuedCompletionStatus(m_hIOCP,100,NULL,PerIOData);
    }

    if ( WaitForSingleObject(m_hStopEvent,WIOCP_ENDWAIT)!=WAIT_OBJECT_0){
		m_ThreadMapLock.Acquire();
		for ( it = m_ThreadMap.begin( ); it != m_ThreadMap.end( ); it++){
			TerminateThread ( (HANDLE)it->second, 0 );
			CloseHandle ( (HANDLE)it->second );
		}
		m_ThreadMapLock.Release();
    }

	delete PerIOData;

    CloseHandle ( m_hIOCP );
    m_hIOCP = INVALID_HANDLE_VALUE;
    CloseHandle ( m_hStopEvent );
    m_hStopEvent = INVALID_HANDLE_VALUE;

    return true;
}
bool TCPSocketServer::start(unsigned short usPort,unsigned long ulIP){

	int nRet;
	DWORD RecvBytes,Flags;
	SOCKET Accept;
	PER_IO_OPERATION_DATA *PerIOData;

	m_FirstSocket = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
	if(INVALID_SOCKET == m_FirstSocket)return false;
	
	sockaddr_in sAddr;
	sAddr.sin_family=AF_INET;
	sAddr.sin_port=htons(usPort);
	sAddr.sin_addr.S_un.S_addr=htonl(ulIP);

	nRet = bind(m_FirstSocket,(PSOCKADDR)&sAddr,sizeof(sAddr));
	if(SOCKET_ERROR == nRet)return false;

    nRet = listen(m_FirstSocket,5);
    if(SOCKET_ERROR == nRet)return false;

	sockaddr_in sCon;
	int addr_len = sizeof(sockaddr_in);

	while(stop){
		Accept = WSAAccept(m_FirstSocket, (sockaddr *)&sCon, &addr_len, NULL, 0);

		_RPT2(_CRT_WARN,"Someone Connected, Ip: %s,  Port: %d\n", inet_ntoa(sCon.sin_addr),ntohs(sCon.sin_port));

		if (SOCKET_ERROR == Accept)break;

		PER_SOCKET_DATA *PerSocketData = new PER_SOCKET_DATA();
		PerSocketData->socket = Accept;
		PerSocketData->io_count = 0;

		HANDLE iocp = CreateIoCompletionPort((HANDLE)Accept,m_hIOCP,(ULONG_PTR)PerSocketData,m_ConcurrentThreads);
		if(iocp == NULL)break;

		OnAccepted(Accept,(DWORD)PerSocketData);

		PerIOData = new PER_IO_OPERATION_DATA;
		ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));
		PerIOData->DataBuf.len = sizeof(PerIOData->Buffer);
		PerIOData->DataBuf.buf = PerIOData->Buffer;
		PerIOData->OperationType = RECV_POSTED;

		Flags = 0;

		nRet = WSARecv(Accept,&PerIOData->DataBuf,1,&RecvBytes,&Flags,(OVERLAPPED *)PerIOData,NULL);
		if (SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())){
			delete PerIOData;
			ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
		}
	}

	Stop();
	
	return true;
}
void TCPSocketServer::ClearSocket(SOCKET s, DWORD dwParam){
	OnClosed(s,dwParam);
	closesocket(s);
	//当前没用的sock放到list中,然后删除第一个,冗余
	PER_SOCKET_DATA *PerSocketData = (PER_SOCKET_DATA*)dwParam;
	m_socks.push_back(PerSocketData);
	PerSocketData = (PER_SOCKET_DATA*)m_socks.front();
	delete PerSocketData;
	m_socks.pop_front();
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -