⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 recvmessthread.cpp

📁 基于Linux下Epoll技术的EchoSvr
💻 CPP
📖 第 1 页 / 共 2 页
字号:
		char temp[8];
		makeValidateRep(temp,&validateRep);
		dmcThread->saveBuffer(fd,temp,8);
		modEvent(fd,EPOLLOUT);
	}
}

int G_RecvMessThread::connectTarget(struct sockaddr_in* servaddr)
{
	int sock = socket(PF_INET, SOCK_STREAM, 0);
	if(sock<0)
	{
		printf("ERR:create socket for connecting\n");
		return sock;
	}
	if(!setNonBlock(sock))
	{
		close(sock);
		return -1;
	}
	int err = connect(sock,(const sockaddr*)servaddr, sizeof(struct sockaddr_in));
	if(err<0 && errno != EINPROGRESS)       /* attention here */
  {
  	close(sock);
 		printf("ERR:connect to targetsvr\n");
  	return -1;
  } 
  return sock;
}

void G_RecvMessThread::checkTunnel(struct SESSION* session,int fd,struct epoll_event* ev,bool isReading)
{
	char temp[8];
	struct VALIDATEREP validateRep;
	validateRep.command = COMMAND_VALIDATE_REP;
	validateRep.len = 8;
	validateRep.result = SUCCESS;
	struct SESSION* otherSession = dmcThread->getSession(session->other);
		
	if(isReading)
	{
		int len = recvn(fd,buffer,65536-8);
		if(len<=0)//ERR:CONNECT TO TARGETSVR
		{
			otherSession->flag = SENDERR;
			validateRep.result = ERR_TARGETSVR;
			makeValidateRep(temp,&validateRep);
			dmcThread->saveBuffer(session->other,temp,8);
			modEvent(session->other,EPOLLOUT);
			//TODO::::::::::::auth log
			struct CLIENT* c = dmcThread->getClientByID(session->clientid);
			debug_output("CONN ERR: id:%d,ip:%s,port:%d------\n",session->clientid,c->ip,c->port);
			
			logThread->ReportAuthResult(session->clientid,4);
			closeSocket(fd,ev);
			
		}
		else//CONNECTED
		{		
			session->flag = CONNECTED;
			otherSession->flag = STABLE;
		
			modEvent(fd,EPOLLET);
			makeValidateRep(temp,&validateRep);
			dmcThread->saveBuffer(session->other,temp,8);
			dmcThread->appendBuffer(session->other,buffer,len);
			modEvent(session->other,EPOLLOUT|EPOLLIN);
			//TODO::::::::::::report online
			logThread->ReportOnline(session->clientid,session->ip,session->port);
			dmcThread->setConnected(session->clientid,session->other,fd);
			
		}
	}
	else
	{
		int error;
    socklen_t len = sizeof (error);
    int err = getsockopt(fd,SOL_SOCKET,SO_ERROR,&error,&len);
    if(err < 0 || error !=0)
		{
			otherSession->flag = SENDERR;
			validateRep.result = ERR_TARGETSVR;
		
			makeValidateRep(temp,&validateRep);
			dmcThread->saveBuffer(session->other,temp,8);
			modEvent(session->other,EPOLLOUT);
			logThread->ReportAuthResult(session->clientid,4);
			struct CLIENT* c = dmcThread->getClientByID(session->clientid);
			debug_output("CONN ERR: id:%d,ip:%s,port:%d------\n",session->clientid,c->ip,c->port);
			
			//TODO: ::::::::::auth LOG
			closeSocket(fd,ev);
		}
		else//CONNECTED
		{		

			session->flag = CONNECTED;
			otherSession->flag = STABLE;
		
			modEvent(fd,EPOLLET);
		
			makeValidateRep(temp,&validateRep);
			dmcThread->saveBuffer(session->other,temp,8);
			
			modEvent(session->other,EPOLLOUT|EPOLLIN);
			logThread->ReportOnline(session->clientid,session->ip,session->port);
			//TODO:::::::::report online
			dmcThread->setConnected(session->clientid,session->other,fd);
		}
	}
}

void G_RecvMessThread::swapRead(struct SESSION* session,int fd,struct epoll_event* ev,bool isClient)
{
	int sock = isClient?dmcThread->getFD1(session->clientid):
										dmcThread->getFD2(session->clientid);
	
	if(session->other<0) //invalid
	{ 
		if(sock == 0 || sock == fd)
			logThread->ReportOffline(session->clientid);
			
			//TODO::::::::::report offline
		closeSocket(fd,ev);
		subCounter();
	}
	else
	{
		if(sock != fd)//ERR:other thread log on or client update
		{
			dmcThread->setSessionInvalid(session->other);
			closeSocket(fd,ev);
			modEvent(session->other,EPOLLOUT);
		}
		else
		{
			int len = recvn(fd,buffer,65536);
			if(len<=0)
			{
				dmcThread->setSessionInvalid(session->other);
				closeSocket(fd,ev);
				modEvent(session->other,EPOLLOUT);
			}	
			else
			{
				dmcThread->saveBuffer(session->other,buffer,len);
				struct SESSION* otherSession = dmcThread->getSession(session->other);
				if(session->bufLen == 0)
				{
					modEvent(session->other,EPOLLIN|EPOLLOUT);
					modEvent(fd,EPOLLET);
				}
				else
				{
					modEvent(session->other,EPOLLOUT);
					modEvent(fd,EPOLLOUT);	
				}
			}
		}
	}
}

void G_RecvMessThread::swapWrite(struct SESSION* session,int fd,struct epoll_event* ev,bool isClient)
{
	int sock = isClient?dmcThread->getFD1(session->clientid):
										dmcThread->getFD2(session->clientid);
	
	if(session->other<0) //invalid
	{ 
		if(sock == 0 || sock == fd)
			logThread->ReportOffline(session->clientid);
		closeSocket(fd,ev);
		subCounter();
	}
	else
	{
		if(sock != fd)//ERR:other thread log on or client update
		{
			dmcThread->setSessionInvalid(session->other);
			closeSocket(fd,ev);
			modEvent(session->other,EPOLLOUT);
		}
		else
		{
			int len = sendn(fd,session->buffer,session->bufLen);
			
			if(len<0)
			{
				dmcThread->setSessionInvalid(session->other);
				closeSocket(fd,ev);
				modEvent(session->other,EPOLLOUT);
			}	
			else
			{
				//TODO:::::::::::report transfer
				if(isClient)
				{
					logThread->ReportMT(session->clientid,session->bufLen);
					dmcThread->reportMT(session->clientid,session->bufLen);
				}
				else
				{
					logThread->ReportMO(session->clientid,session->bufLen);
					dmcThread->reportMO(session->clientid,session->bufLen);
				}
				session->bufLen = 0;
				struct SESSION* otherSession = dmcThread->getSession(session->other);
				if(otherSession->bufLen > 0)
				{
						modEvent(session->other,EPOLLIN|EPOLLOUT);
						modEvent(fd,EPOLLET);
				}
				else
				{
					modEvent(session->other,EPOLLIN);
					modEvent(fd,EPOLLIN);
				}	
			}
		}
	}
}

void G_RecvMessThread::read(struct epoll_event* ev)
{
	int len;
	int fd = ev->data.fd;
	struct SESSION* session = dmcThread->getSession(fd);
	//printf("---------------%d,%d\n",fd,session);
		len = recvn(fd,buffer,65536);
		if(len<=0)
		{
			closeSocket(fd,ev);
			subCounter();
		}
		else
		{
			dmcThread->saveBuffer(fd,buffer,len);
			modEvent(fd,EPOLLOUT);
		}
	
}

int G_RecvMessThread::sendn(int nSocket , char *str , unsigned int nLen)
{
	int n = nLen , nRet;
	while(n > 0)
	{
		nRet = send(nSocket , str , n , MSG_NOSIGNAL);
		if(nRet <= 0)
		{
			if(errno == EINTR)
			{
				continue;
			}
			return -1;
		}
		n -= nRet;
		str += nRet;
	}
	return (nLen - n);
}

void G_RecvMessThread::write(struct epoll_event* ev)
{
	int len;
	int fd = ev->data.fd;
	struct SESSION* session = dmcThread->getSession(fd);
	len = sendn(fd,session->buffer,session->bufLen);
	if(len<0)
	{
			closeSocket(fd,ev);
			subCounter();
	}
	else
		{
			logThread->ReportMT(fd,len);
			modEvent(fd,EPOLLIN);
		}
		
}

void G_RecvMessThread::Run()
{
	//pause();    //suspend

	int nfds;
	struct epoll_event* ev;
	while(1)
	{
		nfds = epoll_wait(epfd,events,256,60000);
		for(int i=0; i<nfds; i++)
		{
                        ev = &events[i];
                        if(ev->events&EPOLLONESHOT)
                        {
				debug_output("rd close\n");
                        }
			if(ev->events&(EPOLLERR|EPOLLHUP))
                        {
				debug_output("ERR|HUP\n");
			}	
			if(ev->events&EPOLLIN)//read
			{
				if((ev->data.fd) >= 0)
					read(ev);
			}
			if(ev->events&EPOLLOUT)
			{
				if((ev->data.fd) >= 0)
					write(ev);
			}
		}
		
		usleep(10000);
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -