📄 internetport.cpp
字号:
// 接口类。接口:基于消息通讯机制的网络间通讯接口。
// 仙剑修,2002.9.6
#include "InternetPort.h"
using namespace message_port;
///////////////////////////////////////////////////////////////////////////////////////
// construct
///////////////////////////////////////////////////////////////////////////////////////
CInternetPort::CInternetPort(int nPort)
{
m_nPortSize = 0;
m_id = nPort;
m_nState = STATE_CLOSED;
m_sockMain = INVALID_SOCKET;
m_szKey[0] = 0;
m_bWinStartup = false;
}
///////////////////////////////////////////////////////////////////////////////////////
// interface
///////////////////////////////////////////////////////////////////////////////////////
// 初始化,设置接口ID号,开始接收消息。可重复调用(PORT_ID不能改变)。客户端会等待CONNECT_OVERTIMESECS秒
bool CInternetPort::Open ()
{
if(IsOpen())
return true;
if(m_id == 0) // 第一号接口为服务端,其它接口为客户端
{
m_nState = STATE_CLOSED;
m_sockMain = CreateServerSocket();
if(m_sockMain == INVALID_SOCKET)
return false;
}
else
{
if(m_nState != STATE_OPENING)
{
m_nState = STATE_CLOSED;
m_sockMain = CreateClientSocket();
if(m_sockMain == INVALID_SOCKET)
return false;
m_nState = STATE_OPENING;
}
timeval timeout;
timeout.tv_sec = CONNECT_OVERTIMESECS/1000;
timeout.tv_usec = (CONNECT_OVERTIMESECS%1000) * 1000;
fd_set readset, writeset, exceptset;
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exceptset);
FD_SET(m_sockMain, &readset);
FD_SET(m_sockMain, &writeset);
FD_SET(m_sockMain, &exceptset);
int ret = select(FD_SETSIZE, NULL, &writeset, &exceptset, &timeout);
if (ret == 0)
return false;
else if(ret < 0 || FD_ISSET(m_sockMain, &exceptset))
{
Close();
return false;
}
for(int i = 0; i < m_nPortSize; i++)
m_setSocket[i] = m_sockMain;
// 节点登录,通知主节点
m_nState = STATE_OK; // 连接成功,以便下面SEND()
Send(MASTER_PORT_ID, SYS_PACKET_LOGINKEY, STRING_TYPE(m_szKey), m_szKey);
Send(MASTER_PORT_ID, SYS_PACKET_PORTID, VARTYPE_INT, &m_id);
}
m_nState = STATE_OK;
for(int i = 0; i < m_setRecvBuf.size(); i++)
m_setRecvBuf[i].Reset();
return true;
}
///////////////////////////////////////////////////////////////////////////////////////
SOCKET CInternetPort::CreateServerSocket()
{
// 创建主套接字
SOCKET sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == INVALID_SOCKET)
{
int err = WSAGetLastError();
LOGERROR("socket() failed[%d].", err);
return INVALID_SOCKET;
}
// 设置SOCKET的REUSEADDR
int optval = 1;
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &optval, sizeof(optval)))
{
int err = WSAGetLastError();
closesocket(sock);
LOGERROR("setsockopt(SO_REUSEADDR) failed[%d].", err);
return INVALID_SOCKET;
}
// 设置SOCKET的KEEPALIVE
optval = 1;
if(setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &optval, sizeof(optval)))
{
int err = WSAGetLastError();
closesocket(sock);
LOGERROR("setsockopt(SO_KEEPALIVE) failed[%d].", err);
return INVALID_SOCKET;
}
// 设置SOCKET的SNDBUF
optval = SNDBUF_SIZE;
if(setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &optval, sizeof(optval)))
{
int err = WSAGetLastError();
closesocket(sock);
LOGERROR("setsockopt(SO_SNDBUF) failed[%d].", err);
return INVALID_SOCKET;
}
/*/ 读取SOCKET的SNDBUF
int ret = sizeof(optval);
if(getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &optval, &ret) == 0)
{
LOGERROR("message: 服务SOCKET内部发送缓冲区尺寸为[%d]K.", optval/1024);
}//*/
// 绑定
SOCKADDR_IN sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(m_nMasterPort);
if(bind(sock, (LPSOCKADDR) &sin, sizeof(sin)))
{
int err = WSAGetLastError();
closesocket(sock);
LOGERROR("bind() failed[%d].", err);
return INVALID_SOCKET;
}
// 设置为非阻塞方式
unsigned long a = 1;
if(ioctlsocket(sock, FIONBIO, &a))
{
int err = WSAGetLastError();
closesocket(sock);
LOGERROR("ioctlsocket() failed[%d].", err);
return INVALID_SOCKET;
}
// 监听端口
if(listen(sock, SOMAXCONN)) // SOMAXCONN: WIN的宏定义
{
int err = WSAGetLastError();
closesocket(sock);
LOGERROR("listen() failed[%d].", err);
return INVALID_SOCKET;
}
return sock;
}
///////////////////////////////////////////////////////////////////////////////////////
SOCKET CInternetPort::CreateClientSocket()
{
// 创建套接字
SOCKET sock = socket(PF_INET, SOCK_STREAM, 0);
if(sock == INVALID_SOCKET)
{
int err = WSAGetLastError();
LOGERROR("socket() failed[%d].", err);
return INVALID_SOCKET;
}
// 设置SOCKET为KEEPALIVE
int optval=1;
if(setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &optval, sizeof(optval)))
{
int err = WSAGetLastError();
LOGERROR("setsockopt(SO_KEEPALIVE) failed[%d].", err);
closesocket(sock);
return INVALID_SOCKET;
}
// 设置SENDBUF
optval = SNDBUF_SIZE;
if(setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &optval, sizeof(optval)))
{
int err = WSAGetLastError();
LOGERROR("setsockopt(SO_SNDBUF) failed[%d].", err);
closesocket(sock);
return INVALID_SOCKET;
}
/*/ 读取SOCKET的SNDBUF
int ret = sizeof(optval);
if(getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &optval, &ret) == 0)
{
if(!m_tOpen) // 首次
LOGMSG("帐号SOCKET内部发送缓冲区尺寸为[%d]K.", optval/1024);
}//*/
// 设置为非阻塞方式
unsigned long a = 1;
if(ioctlsocket(sock, FIONBIO, &a))
{
int err = WSAGetLastError();
LOGERROR("ioctlsocket() failed[%d].", err);
closesocket(sock);
return INVALID_SOCKET;
}
// 域名->IP地址
UINT nAddr = inet_addr(m_szMasterIP); // 必须为 UINT, 以便与in_addr兼容
if(nAddr == INADDR_NONE)
{
hostent * hp;
hp = gethostbyname(m_szMasterIP);
if(hp == 0)
{
int err = WSAGetLastError();
LOGERROR("Invalid ip addr[%s], err[%d].", m_szMasterIP, err);
closesocket(sock);
return INVALID_SOCKET;
}
nAddr = inet_addr(hp->h_addr_list[0]); // 或 h_addr。自动取第一网卡的IP。
}
sockaddr_in addr_in;
memset((void *)&addr_in, sizeof(addr_in), 0);
addr_in.sin_family = AF_INET;
addr_in.sin_port = htons(m_nMasterPort);
addr_in.sin_addr.s_addr = nAddr;
if(connect(sock, (sockaddr *)&addr_in, sizeof(addr_in)) == SOCKET_ERROR)
{
int err = WSAGetLastError();
if(err != WSAEWOULDBLOCK)
{
LOGERROR("Connect() failed[%d].", err);
closesocket(sock);
return INVALID_SOCKET;
}
else // if(err == WSAEWOULDBLOCK)
{
return sock;
}
} // connect
return sock;
}
///////////////////////////////////////////////////////////////////////////////////////
// 发送消息到指定接口。包含消息ID、数据类型、数据。
bool CInternetPort::Send (int nPort, int nPacket, VAR_TYPE nVarType, const void* buf)
{
if(!IsOpen())
return false;
if(nPort == INVALID_PORT)
return true;
return SendFrom(m_id, nPort, nPacket, nVarType, buf);
}
///////////////////////////////////////////////////////////////////////////////////////
// 发送消息到指定接口。包含消息ID、数据类型、数据。
bool CInternetPort::SendFrom(int nPortFrom, int nPort, int nPacket, VAR_TYPE nVarType, const void* buf)
{
CHECKF(buf);
if(nPort < 0 || nPort >= m_nPortSize)
{
ASSERT(!"nPort");
return false;
}
if(nPort == m_id)
{
PushMsg(m_id, nPacket, nVarType, buf);
return true;
}
if(m_setSocket[nPort] == INVALID_SOCKET)
{
LOGERROR("port[%d] send(%d, %d, %d, %s) failed.", m_id, nPort, nPacket, nVarType, buf);
return false;
}
int nLen = MSG_HEAD_SIZE;
CMessageHead cMsgHead;
char* pBuf = (char *)&cMsgHead;
if(m_id == MASTER_PORT_ID)
cMsgHead.m_nPort = nPortFrom; //? m_nPort: 主节点上传到子节点时,表示SOURCE_PORT
else
cMsgHead.m_nPort = nPort; //? m_nPort: 子节点上传到主节点时,表示TARGET_PORT
cMsgHead.m_nPacket = nPacket;
cMsgHead.m_nVarType = nVarType;
int ret = send(m_setSocket[nPort], pBuf, nLen, 0); // 发送HEAD数据
if(ret == nLen)
{
nLen = SIZE_OF_TYPE(nVarType);
pBuf = (char *)buf;
ret = send(m_setSocket[nPort], pBuf, nLen, 0); // 发送DATA数据
if(ret == nLen)
return true;
}
int err = WSAGetLastError();
if(ret > 0 || err == WSAEWOULDBLOCK)
LOGERROR("port[%d] overflow.", m_id);
else
LOGERROR("port[%d] error[%d].", m_id, err);
if(m_id == 0)
{
closesocket(m_setSocket[nPort]);
m_setSocket[nPort] = INVALID_SOCKET;
}
else
{
Close();
Open(); // 尽快重新打开
}
return false;
}
///////////////////////////////////////////////////////////////////////////////////////
// 接收指定接口(或所有接口)发来的消息。可指定消息ID,也可不指定。
bool CInternetPort::Recv (int nPort, int nPacket, VAR_TYPE nVarType, void* buf, CMessageStatus* pStatus /*= NULL*/) // return false: 没数据
{
CHECKF(buf);
if(!IsOpen())
{
if(pStatus)
pStatus->m_nError = STATUS_FLAG_CLOSE;
return false;
}
RecvMsg(0);
// 处理内部消息
int nRcvPort = nPort, nRcvPacket = nPacket;
VAR_TYPE nRcvVarType;
if(PopMsg(&nRcvPort, &nRcvPacket, &nRcvVarType, buf)) // 内部函数,不用打开互斥锁
{
// 检查类型
if(SIZE_OF_TYPE(nRcvVarType) > SIZE_OF_TYPE(nVarType))
{
ASSERT(!"VarType");
if(pStatus)
pStatus->m_nError = STATUS_FLAG_ERROR; //? 以后可支持自动转换类型
return false;
}
if(pStatus)
{
pStatus->m_nPortFrom = nRcvPort;
pStatus->m_nPacket = nRcvPacket;
pStatus->m_nVarType = nRcvVarType;
pStatus->m_nError = STATUS_FLAG_OK;
}
return true;
}
if(pStatus)
{
if(IsOpen())
pStatus->m_nError = STATUS_FLAG_OK;
else
pStatus->m_nError = STATUS_FLAG_ERROR;
}
return false;
}
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::RecvMsg(int nMilliSec)
{
if(m_nState == STATE_CLOSED)
return false;
// 接收SOCKET消息
timeval timeout;
timeout.tv_sec = nMilliSec / 1000;
timeout.tv_usec = (nMilliSec%1000) * 1000;
fd_set readset;
FD_ZERO(&readset);
if(m_sockMain != INVALID_SOCKET)
FD_SET(m_sockMain, &readset);
if(m_id == 0)
{
for(int i = 0; i < m_setSocket.size(); i++)
{
if(m_setSocket[i] != INVALID_SOCKET)
FD_SET(m_setSocket[i], &readset);
}
}
int ret = select(FD_SETSIZE, &readset, NULL, NULL, &timeout);
if (ret > 0)
{
if(m_id == 0)
{
// 服务端。处理ACCEPT
if(FD_ISSET(m_sockMain, &readset))
{
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
int len = sizeof(addr);
SOCKET newsock = accept(m_sockMain, (sockaddr *) &addr, (int *) &len);
if(newsock != INVALID_SOCKET)
{
m_setSocket.push_back(newsock);
m_setRecvBuf.push_back(CRecvBuffer());
}
}
for(int i = m_setSocket.size() - 1; i >= 0; i--) // 要删除,倒排
{
if(m_setSocket[i] == INVALID_SOCKET || !FD_ISSET(m_setSocket[i], &readset))
continue;
if(RecvToBuf(&m_setRecvBuf[i], m_setSocket[i]))
{
BufferToList(i); //? 对于登录消息,SOCKET可能前移
}
else // error
{
closesocket(m_setSocket[i]);
m_setSocket[i] = INVALID_SOCKET;
if(i >= m_nPortSize)
{
m_setSocket.erase(m_setSocket.begin() + i); // 要删除,倒排
m_setRecvBuf.erase(m_setRecvBuf.begin() + i); // 要删除,倒排
}
}
} // for all socket
} // 服务端
else
{
// 客户端
if(RecvToBuf(&m_setRecvBuf[0], m_sockMain))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -