📄 csocket.cpp
字号:
/*******************************************************************
*
* DESCRIPTION:
*
* AUTHOR:yyc
*
* HISTORY:
*
* DATE:2002-8-19
*
*******************************************************************/
#include "cSocket.h"
#include <memory>
#include <cstdio>
#include <cerrno>
#include <ctime>
#include <cstdarg>
#include "logout.h"
RW_DEFINE_LOG("cSocket")
using namespace std;
using namespace yyc;
//*****************************************************************************
//**************************cSocket********************************************
cSocket::cSocket()
{
m_fd=-1;
m_iLocalport=0;
m_iHostport=0;
memset((void *)m_strHostip,0,20);
memset((void *)m_strLocalip,0,20);
m_iStatus=SOCK_CLOSED;
m_bAutoclose=false;
m_recvThread.set_func(recv_thread);
m_sendThread.set_func(send_thread);
m_iType=0;
m_iComm=3;
m_pParent=NULL;
m_iRef=1;
m_pArgs=NULL;
m_pfDodata=NULL;
}
cSocket::~cSocket()
{
int ret;
close();
if (m_pParent!=NULL) {
ret=m_pParent->m_client_set.remove(this);
RW_LOG_DEBUG("server(port:%d) have %d clients in client sets!\n",m_pParent->get_localhost_info(),ret);
}
m_pParent=NULL;
}
/**
* 关闭此cSocket对象
*/
int cSocket::close()
{
int fd;
fd=m_fd;m_fd=-1;//设置此数据,当线程发生错误退出时,不会删除此对象
m_iStatus=SOCK_CLOSED;
//停止线程
m_sendThread.join();
m_recvThread.join();
//关闭socket连接,释放socket资源
if(fd>=0) ::closesocket(fd);
//清空输入输出缓冲区
m_in_buf.clear();
m_out_buf.clear();
//清空客户端连接对象缓冲区
m_client_set.clear();
return OK_SOCK;
}
//启动收发线程
int cSocket::start_thread()
{
int retv=0;
if ((m_iComm & 2)!=0)
{
retv=m_sendThread.start((void *)this);
}
if (retv==0&&(m_iComm & 1)!=0)
{
retv=m_recvThread.start((void *)this);
}
return retv;
}
/**
* 检查socket是否有数据到达f=0
* 检查socket是否可写 f=1
*
* @param wait_usec 最大超时等待时间(us)
*
* @return =0:没有数据
* >0:有数据可读,或socket可写
* <0:socket发生错误
*/
int cSocket::checkdata(long wait_usec,int f)
{
int fd,retv=0;
struct timeval to;
fd_set rf;
fd=m_fd;
if (fd>=0&&fd<FD_SETSIZE)
{
FD_ZERO(&rf);
FD_SET(fd, &rf);
if ( wait_usec>0)
{
to.tv_sec =wait_usec/1000000L;
to.tv_usec =wait_usec%1000000L;
}
else
{
to.tv_sec = 0;
to.tv_usec = 0;
}
if(1==f)
retv = select(FD_SETSIZE, NULL, &rf, NULL, &to);
else
retv = select(FD_SETSIZE, &rf, NULL, NULL, &to);
if (retv >= 0)
{
if(retv>0&&FD_ISSET(fd, &rf))
retv=1;
else
retv=0;
}
else
{
int iErr=SOCK_M_GETERROR;
if (EBADF==iErr)
retv=-1;
else
retv=0;
}//?if (retv >= 0)
}
else
retv=-1;
return retv;
}
int cSocket::send_thread(void *args)
{
cSocket * psock;
int fd,retv,len;
struct sockaddr_in addr;
int iErr=0;
char *ptr=NULL;
if (args!=NULL)
{
psock=(cSocket *)args;
RW_LOG_INFO("cSocket send_thread run...\n");
char dataBuf[DEFAULT_BUF_SIZE];
while(psock->m_iStatus!=SOCK_CLOSED)
{
fd=psock->m_fd;
if(fd<0) break;
len=0;ptr=NULL;
//检查此csocket是否可写
if((retv=psock->checkdata(0,1))>0)
{
auto_ptr<cBuffer> pbuf((cBuffer *)NULL);
len=psock->getdata(dataBuf,cBuffer::MAX_BUF_SIZE,1);
if(len>0)
{
ptr=dataBuf;
}//?if(len>0)
else
{
//没有取到数据,则判断是否输出缓冲区没有数据,还是缓冲区太小,无法接收数据
if(psock->m_out_buf.size()<=0)
{
//没有数据
if(psock->m_bAutoclose)
break;
my_usleep(1000);
}//?if(psock->m_out_buf.size()<=0)
else
{
auto_ptr<cBuffer> pauto(psock->m_out_buf.pop());
pbuf=pauto;
if(pbuf.get()!=NULL)
{
ptr=pbuf->get();len=pbuf->len();
}
}//?if(psock->m_out_buf.size()<=0) else
}//?if(len>0) else
//发送数据----------------------------
if(len>0&&ptr!=NULL)
{
RW_LOG_DEBUG("send %d data from %s:%d to %s:%d\n",len,psock->m_strLocalip,psock->m_iLocalport,psock->m_strHostip,psock->m_iHostport) ;
if (psock->m_iType==0) //tcp
{
retv=::send(fd,ptr,len,MSG_NOSIGNAL);
}
else //udp
{
memset(&addr,0,sizeof(struct sockaddr_in));
addr.sin_family=AF_INET;
addr.sin_port=htons(psock->m_iHostport);
addr.sin_addr.s_addr=inet_addr((const char *)psock->m_strHostip);
retv=::sendto(fd,ptr,len,MSG_NOSIGNAL,(struct sockaddr *)&addr,sizeof(struct sockaddr_in));
}//?if (psock->sock_type==0) else
//ECONNRESET=104, Connection reset by peer
//#define EPIPE 32 Broken pipe
//EPIPE The local end has been shut down on a connection oriented socket. In this case the process will also
//receive a SIGPIPE unless MSG_NOSIGNAL is set.
//如不保护,将导致程序发生broken pipe错误
//#define ENOTSOCK 88 Socket operation on non-socket
iErr=SOCK_M_GETERROR;
if (retv<0&&(iErr==EBADF||iErr==ECONNRESET||iErr==EPIPE||iErr==ENOTSOCK))
{
RW_LOG_DEBUG("cSocket send_thread:socket sending data raised error,errno=%d!\n",iErr);
break;
}
}//?if(len>0)
//------------------------------------------------------
}//?if((retv=psock->checkdata(0,1))>0)
else
{
if(retv<0)
{
break;
}
RW_LOG_DEBUG("cSocket send_thread:socket can not write!!!\n");
}//?if((retv=psock->checkdata(0,1))>0) else
}//?while(psock->m_iStatus!=SOCK_CLOSED)
//为了防止接收和发送线程同时结束,释放csocket对象时产生冲突
//因此在退出时随机休眠若干us
if (psock->m_fd>=0)
{
retv=rand();
my_usleep(abs(retv-(retv/2000)*2000));
}
//线程循环结束,退出线程,退出前要关闭和摧毁SOCKET
//psock->m_fd<0,psock已经被从外部销毁,摧毁函数正在等待线程结束
if (psock->m_fd>=0)
{
//分离线程
//防止线程和psock->close()中的pthread_join死锁
//所以将线程标识设为0,表明线程已结束
psock->m_sendThread.detach();
psock->close();
//当线程由于异常而导致退出时(即不是调用close函数关闭的)
//判断此cSocket对象的引用计数是否<=0,如是则自动删除释放此对象
if(psock->m_iRef<=0)
delete psock;
else
psock->m_iRef--;
}//?if (psock->m_fd>=0)
}//?if (args!=NULL)
RW_LOG_INFO("cSocket send_thread end!\n");
return 0;
}
/**
* 接收数据线程
* 将接收的数据放入接收缓冲区
* 如果是服务csocket对象,则接收远端的连接请求,并自动创建普通csocket
* 对象连接,并将csocket对象保存到客户缓冲区中
*
* @param args 启动此线程的csocket对象指针
*
* @return
*/
int cSocket::recv_thread(void *args)
{
cSocket * psock;
int s,fd,retv,len,addrlen;
int recv_zero_len_count=0;//计数接收0长度数据次数
struct sockaddr_in addr;
int iErr=0;
if (args!=NULL)
{
psock=(cSocket *)args;
RW_LOG_INFO("cSocket recv_thread run...\n");
while(psock->m_iStatus!=SOCK_CLOSED)
{
fd=psock->m_fd;
if(fd<0) break;
if(psock->m_bAutoclose)
{
//已经设置等待发送缓冲为空则自动关闭此cSocket对象
//接收线程将不处理任何数据,等待结束
my_usleep(100);
continue;
}
retv=psock->checkdata(1000);//是否有数据
if (retv<0) break; //接收发生错误,sock_fd有问题,或者对方已断开连接,因此线程退出
if (0==retv) continue;//没有数据
//有数据接收,判断此SOCKET是否为侦听服务还是普通SOCKET
if (psock->m_iStatus==SOCK_LISTEN)//cSocket服务对象
{
addrlen = sizeof(addr);
//接收连接请求
s = ::accept(fd, (struct sockaddr *)&addr, (socklen_t *)&addrlen);
iErr=SOCK_M_GETERROR;
if (s<0 && (iErr==EBADF||iErr==ENOTSOCK)) break;//发生错误,socket fd有问题
RW_LOG_DEBUG("have client connecting,ip=%s,port=%d\n",inet_ntoa(addr.sin_addr),addr.sin_port);
psock->createClient(s,(char *)inet_ntoa(addr.sin_addr),addr.sin_port);
}
else //cSocket客户对象
{
auto_ptr<cBuffer> buf(new(nothrow) cBuffer(cBuffer::MAX_BUF_SIZE));
if(buf.get()!=NULL&&buf->len()>0)
{
addrlen = sizeof(addr);
//#define ENOTCONN 107 /* Transport endpoint is not connected */
//#define EPIPE 32 /* Broken pipe */
//EPIPE The local end has been shut down on a connection oriented socket. In this case the process will also
//receive a SIGPIPE unless MSG_NOSIGNAL is set.
//如不保护,将导致程序发生broken pipe错误
//#define ENOTSOCK 88 /* Socket operation on non-socket */
len=recvfrom(fd,buf->get(),buf->len(),MSG_NOSIGNAL,(struct sockaddr *) & addr, (socklen_t *)&addrlen);
iErr=SOCK_M_GETERROR;
if (len<0&&(iErr==EBADF||iErr==ENOTCONN||iErr==ENOTSOCK||iErr==EPIPE||iErr==ECONNRESET))
{
RW_LOG_WARN("cSocket recv_thread:socket receiving data raised error,errno=%d!\n",iErr);
break; ////发生错误,sock_fd有问题
}
if (len<=0)
{
//如果socket_checkdata检查有数据,但recvfrom连着5次成功接收,但数据个数为0
//则可以认为对方的socket连接出现异常,因此关闭此连接
recv_zero_len_count++;
if (recv_zero_len_count<5)
continue;
else
{
RW_LOG_WARN("recv %d zero_length data,socket link exception!\n",recv_zero_len_count) ;
break;
}
}
else
recv_zero_len_count=0;
if (0==psock->m_iType)
{
RW_LOG_DEBUG("receive %d data from ip=%s and port=%d\n",len,psock->m_strHostip,psock->m_iHostport);
}
else
{
RW_LOG_DEBUG("receive %d data from ip=%s and port=%d\n",len,inet_ntoa(addr.sin_addr),addr.sin_port);
}
if (NULL==psock->m_pfDodata)//如果没有设置数据处理过程则直接将数据放入输入缓冲
{
//重置数据缓冲区大小,避免浪费空间
//但牺牲效率
//buf->reset(len);
psock->m_in_buf.push(buf.release());
continue;//不作任何数据处理,继续接收数据
}
retv=(*psock->m_pfDodata)(buf->get(),len,psock);
if ((retv&1)!=0) //关闭此SOCKET连接
{
if ((retv&2)!=0) //等待发送缓冲区数据发送完
{
psock->m_bAutoclose=true;
}
else
break; //关闭SOCKET
}
}//?if(buf.get()!=NULL)
}//?else
}//?while(psock->m_status!=SOCK_CLOSED)
//为了防止接收和发送线程同时结束,释放csocket对象时产生冲突
//因此在退出时随机休眠若干us
if (psock->m_fd>=0)
{
retv=rand();
my_usleep(abs(retv-(retv/2000)*2000));
}
//线程循环结束,退出线程,退出前要关闭和摧毁SOCKET
//psock->sock_fd<0,psock已经被从外部销毁,摧毁函数正在等待线程结束
if (psock->m_fd>=0)
{
//分离线程
//防止线程和psock->close()中的pthread_join死锁
//所以将线程标识设为0,表明线程已结束
psock->m_recvThread.detach();
psock->close();
//当线程由于异常而导致退出时(即不是调用close函数关闭的)
//判断此cSocket对象的引用计数是否<=0,如是则自动删除释放此对象
if(psock->m_iRef<=0)
delete psock;
else
psock->m_iRef--;
}//?if (psock->sock_fd>=0)
}//?if (args!=NULL)
RW_LOG_INFO("cSocket recv_thread end!\n");
return 0;
}
/**
* 从输入/输出缓冲区中的取数据
*
* @param buf 接收缓冲区数据地址空间
* @param size 接收缓冲区的最大长度
* @inORout 0--输入缓冲区 1--输出缓冲区
* @return 返回接收的数据大小
*/
int cSocket::getdata(char *buf,int size,int inORout)
{
int retv=0;
if(buf==NULL||size<0) return ENULL_SOCK;
cFifo<cBuffer> *pfifo=&m_in_buf;
if(inORout==1) pfifo=&m_out_buf;
cBuffer *pbuf=pfifo->peek();
while(pbuf!=NULL)
{
if (pbuf->len()<=(size-retv)) {
if(pbuf->get()!=NULL){
memcpy(buf+retv,pbuf->get(),pbuf->len());
retv+=pbuf->len();
}
delete pbuf;
pfifo->pop();
}
else
break;
pbuf=pfifo->peek();
}
return retv;
}
/**
* 用指定的socket连接句柄初始化或创建cSocket客户对象
*
* @param fd 指定的socket连接句柄
* @param remoteip socket连接的主机IP
* @param remoteport socket连接的主机侦听端口
*
* @return <0:发生错误
* >=0:连接成功
* 如果此csocket对象为服务csocket,则自动创建一个普通csocket客户
* 连接指定的主机,并将此csocket对象保存到csocket服务对象的客户端缓冲区中,返回
* 新创建的csocket对象的句柄(从1开始)
* 如果为普通csocket客户对象,则返回0
*/
int cSocket::createClient(int fd,const char *remoteip,int remoteport)
{
int sckStatus,retv=0;
char * ptr;
int addr_len;
cSocket *psock=NULL;
struct sockaddr_in addr;
if (fd<0) return EHANDLE_SOCK;
sckStatus=m_iStatus;
if (sckStatus==SOCK_LISTEN) {
psock=new(nothrow) cSocket;
if(psock!=NULL)
{
psock->m_pfDodata=m_pfDodata;
psock->m_pArgs=m_pArgs;
psock->m_iRef=0;//引用计数为0
}
else
{
::closesocket(fd);
return EMEM_SOCK;
}
}
else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -