📄 recvmessthread.cpp
字号:
char temp[8];
makeValidateRep(temp,&validateRep);
dmcThread->saveBuffer(fd,temp,8);
modEvent(fd,EPOLLOUT);
}
}
int G_RecvMessThread::connectTarget(struct sockaddr_in* servaddr)
{
int sock = socket(PF_INET, SOCK_STREAM, 0);
if(sock<0)
{
printf("ERR:create socket for connecting\n");
return sock;
}
if(!setNonBlock(sock))
{
close(sock);
return -1;
}
int err = connect(sock,(const sockaddr*)servaddr, sizeof(struct sockaddr_in));
if(err<0 && errno != EINPROGRESS) /* attention here */
{
close(sock);
printf("ERR:connect to targetsvr\n");
return -1;
}
return sock;
}
void G_RecvMessThread::checkTunnel(struct SESSION* session,int fd,struct epoll_event* ev,bool isReading)
{
char temp[8];
struct VALIDATEREP validateRep;
validateRep.command = COMMAND_VALIDATE_REP;
validateRep.len = 8;
validateRep.result = SUCCESS;
struct SESSION* otherSession = dmcThread->getSession(session->other);
if(isReading)
{
int len = recvn(fd,buffer,65536-8);
if(len<=0)//ERR:CONNECT TO TARGETSVR
{
otherSession->flag = SENDERR;
validateRep.result = ERR_TARGETSVR;
makeValidateRep(temp,&validateRep);
dmcThread->saveBuffer(session->other,temp,8);
modEvent(session->other,EPOLLOUT);
//TODO::::::::::::auth log
struct CLIENT* c = dmcThread->getClientByID(session->clientid);
debug_output("CONN ERR: id:%d,ip:%s,port:%d------\n",session->clientid,c->ip,c->port);
logThread->ReportAuthResult(session->clientid,4);
closeSocket(fd,ev);
}
else//CONNECTED
{
session->flag = CONNECTED;
otherSession->flag = STABLE;
modEvent(fd,EPOLLET);
makeValidateRep(temp,&validateRep);
dmcThread->saveBuffer(session->other,temp,8);
dmcThread->appendBuffer(session->other,buffer,len);
modEvent(session->other,EPOLLOUT|EPOLLIN);
//TODO::::::::::::report online
logThread->ReportOnline(session->clientid,session->ip,session->port);
dmcThread->setConnected(session->clientid,session->other,fd);
}
}
else
{
int error;
socklen_t len = sizeof (error);
int err = getsockopt(fd,SOL_SOCKET,SO_ERROR,&error,&len);
if(err < 0 || error !=0)
{
otherSession->flag = SENDERR;
validateRep.result = ERR_TARGETSVR;
makeValidateRep(temp,&validateRep);
dmcThread->saveBuffer(session->other,temp,8);
modEvent(session->other,EPOLLOUT);
logThread->ReportAuthResult(session->clientid,4);
struct CLIENT* c = dmcThread->getClientByID(session->clientid);
debug_output("CONN ERR: id:%d,ip:%s,port:%d------\n",session->clientid,c->ip,c->port);
//TODO: ::::::::::auth LOG
closeSocket(fd,ev);
}
else//CONNECTED
{
session->flag = CONNECTED;
otherSession->flag = STABLE;
modEvent(fd,EPOLLET);
makeValidateRep(temp,&validateRep);
dmcThread->saveBuffer(session->other,temp,8);
modEvent(session->other,EPOLLOUT|EPOLLIN);
logThread->ReportOnline(session->clientid,session->ip,session->port);
//TODO:::::::::report online
dmcThread->setConnected(session->clientid,session->other,fd);
}
}
}
void G_RecvMessThread::swapRead(struct SESSION* session,int fd,struct epoll_event* ev,bool isClient)
{
int sock = isClient?dmcThread->getFD1(session->clientid):
dmcThread->getFD2(session->clientid);
if(session->other<0) //invalid
{
if(sock == 0 || sock == fd)
logThread->ReportOffline(session->clientid);
//TODO::::::::::report offline
closeSocket(fd,ev);
subCounter();
}
else
{
if(sock != fd)//ERR:other thread log on or client update
{
dmcThread->setSessionInvalid(session->other);
closeSocket(fd,ev);
modEvent(session->other,EPOLLOUT);
}
else
{
int len = recvn(fd,buffer,65536);
if(len<=0)
{
dmcThread->setSessionInvalid(session->other);
closeSocket(fd,ev);
modEvent(session->other,EPOLLOUT);
}
else
{
dmcThread->saveBuffer(session->other,buffer,len);
struct SESSION* otherSession = dmcThread->getSession(session->other);
if(session->bufLen == 0)
{
modEvent(session->other,EPOLLIN|EPOLLOUT);
modEvent(fd,EPOLLET);
}
else
{
modEvent(session->other,EPOLLOUT);
modEvent(fd,EPOLLOUT);
}
}
}
}
}
void G_RecvMessThread::swapWrite(struct SESSION* session,int fd,struct epoll_event* ev,bool isClient)
{
int sock = isClient?dmcThread->getFD1(session->clientid):
dmcThread->getFD2(session->clientid);
if(session->other<0) //invalid
{
if(sock == 0 || sock == fd)
logThread->ReportOffline(session->clientid);
closeSocket(fd,ev);
subCounter();
}
else
{
if(sock != fd)//ERR:other thread log on or client update
{
dmcThread->setSessionInvalid(session->other);
closeSocket(fd,ev);
modEvent(session->other,EPOLLOUT);
}
else
{
int len = sendn(fd,session->buffer,session->bufLen);
if(len<0)
{
dmcThread->setSessionInvalid(session->other);
closeSocket(fd,ev);
modEvent(session->other,EPOLLOUT);
}
else
{
//TODO:::::::::::report transfer
if(isClient)
{
logThread->ReportMT(session->clientid,session->bufLen);
dmcThread->reportMT(session->clientid,session->bufLen);
}
else
{
logThread->ReportMO(session->clientid,session->bufLen);
dmcThread->reportMO(session->clientid,session->bufLen);
}
session->bufLen = 0;
struct SESSION* otherSession = dmcThread->getSession(session->other);
if(otherSession->bufLen > 0)
{
modEvent(session->other,EPOLLIN|EPOLLOUT);
modEvent(fd,EPOLLET);
}
else
{
modEvent(session->other,EPOLLIN);
modEvent(fd,EPOLLIN);
}
}
}
}
}
void G_RecvMessThread::read(struct epoll_event* ev)
{
int len;
int fd = ev->data.fd;
struct SESSION* session = dmcThread->getSession(fd);
//printf("---------------%d,%d\n",fd,session);
len = recvn(fd,buffer,65536);
if(len<=0)
{
closeSocket(fd,ev);
subCounter();
}
else
{
dmcThread->saveBuffer(fd,buffer,len);
modEvent(fd,EPOLLOUT);
}
}
int G_RecvMessThread::sendn(int nSocket , char *str , unsigned int nLen)
{
int n = nLen , nRet;
while(n > 0)
{
nRet = send(nSocket , str , n , MSG_NOSIGNAL);
if(nRet <= 0)
{
if(errno == EINTR)
{
continue;
}
return -1;
}
n -= nRet;
str += nRet;
}
return (nLen - n);
}
void G_RecvMessThread::write(struct epoll_event* ev)
{
int len;
int fd = ev->data.fd;
struct SESSION* session = dmcThread->getSession(fd);
len = sendn(fd,session->buffer,session->bufLen);
if(len<0)
{
closeSocket(fd,ev);
subCounter();
}
else
{
logThread->ReportMT(fd,len);
modEvent(fd,EPOLLIN);
}
}
void G_RecvMessThread::Run()
{
//pause(); //suspend
int nfds;
struct epoll_event* ev;
while(1)
{
nfds = epoll_wait(epfd,events,256,60000);
for(int i=0; i<nfds; i++)
{
ev = &events[i];
if(ev->events&EPOLLONESHOT)
{
debug_output("rd close\n");
}
if(ev->events&(EPOLLERR|EPOLLHUP))
{
debug_output("ERR|HUP\n");
}
if(ev->events&EPOLLIN)//read
{
if((ev->data.fd) >= 0)
read(ev);
}
if(ev->events&EPOLLOUT)
{
if((ev->data.fd) >= 0)
write(ev);
}
}
usleep(10000);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -