📄 internetport.cpp
字号:
{
BufferToList(0);
}
else
{
Close();
Open(); // 尽快重新打开
}
}
} // select
return true;
}
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::RecvToBuf(CRecvBuffer* pRcvBuf, SOCKET sock) // return false: error
{
char* pBuf = pRcvBuf->GetTail();
int nLen = message_port::RECV_BUFFER_SIZE - pRcvBuf->GetLen();
int ret = recv(sock, pBuf, nLen, 0); // 接收数据
if(ret > 0)
{
pRcvBuf->AddLen(ret);
return true;
}
if(ret == 0)
{
LOGERROR("message: socket close.");
}
else if(ret < 0)
{
int err = WSAGetLastError();
if(err == WSAEWOULDBLOCK)
return true;
LOGERROR("message: socket error[%d], recv() failed.", err);
}
return false;
}
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::BufferToList(int idx) // return false: 无消息
{
if(m_id == 0)
{
while(true)
{
// 处理消息
if(m_setRecvBuf[idx].GetLen() < MSG_HEAD_SIZE)
break;
CMessageHead* pHead = m_setRecvBuf[idx].GetHead();
int nLen = SIZE_OF_TYPE(pHead->m_nVarType);
if(m_setRecvBuf[idx].GetLen() < MSG_HEAD_SIZE + nLen)
break;
if(pHead->m_nPacket >= SYS_PACKET_BASE)
{
if(pHead->m_nPacket == SYS_PACKET_LOGINKEY) // 处理节点验证
{
LPCTSTR pszKey = (LPCTSTR)pHead->m_bufData;
IF_NOT(strlen(m_szKey) && strcmp(pszKey, m_szKey) == 0)
{
closesocket(m_setSocket[idx]);
//??? 未强制检查KEY
}
}
else if(pHead->m_nPacket == SYS_PACKET_PORTID) // 处理节点登录
{
IF_OK(pHead->m_nVarType == VARTYPE_INT)
{
int nLoginPort = *((int*)pHead->m_bufData);
IF_OK(nLoginPort > MASTER_PORT_ID && nLoginPort < m_nPortSize // >MASTER_PORT_ID : 不能为主节点号
&& idx >= m_nPortSize)
{
if(m_setSocket[nLoginPort] != INVALID_SOCKET)
{
// 双重登录,关闭旧SOCKET
closesocket(m_setSocket[nLoginPort]);
}
m_setSocket[nLoginPort] = m_setSocket[idx];
m_setRecvBuf[nLoginPort] = m_setRecvBuf[idx];
m_setSocket.erase(m_setSocket.begin() + idx); // 要删除,倒排
m_setRecvBuf.erase(m_setRecvBuf.begin() + idx);
// 移动idx
idx = nLoginPort;
}
}
SendFrom(m_id, idx, SYS_PACKET_PORTSIZE, VARTYPE_INT, &m_nPortSize);
}
else
{
ASSERT(!"Unknown system packet!");
}
}
else // not system packet
{
if(pHead->m_nPort != MASTER_PORT_ID) //? m_nPort: 子节点上传到主节点时,表示TARGET_PORT
{
// 处理转发消息
IF_OK(idx < m_nPortSize)
SendFrom(idx, pHead->m_nPort, pHead->m_nPacket, (VAR_TYPE)pHead->m_nVarType, pHead->m_bufData); // idx == sub_port_id
}
else
{
// 存消息
IF_OK(idx < m_nPortSize)
PushMsg(idx, pHead->m_nPacket, (VAR_TYPE)pHead->m_nVarType, pHead->m_bufData); // idx == sub_port_id
}
}
// 清除BUFFER中的消息
m_setRecvBuf[idx].DelMsg(MSG_HEAD_SIZE + nLen);
} // while
}
else // 子节点
{
while(true)
{
// 处理消息
if(m_setRecvBuf[0].GetLen() < MSG_HEAD_SIZE)
break;
CMessageHead* pHead = m_setRecvBuf[0].GetHead();
int nLen = SIZE_OF_TYPE(pHead->m_nVarType);
if(m_setRecvBuf[0].GetLen() < MSG_HEAD_SIZE + nLen)
break;
if(pHead->m_nPacket == SYS_PACKET_PORTSIZE) // 处理节点尺寸
{
IF_OK(pHead->m_nVarType == VARTYPE_INT)
{
int nPortSize = *((int*)pHead->m_bufData);
IF_OK(idx == MASTER_PORT_ID)
{
if(nPortSize != m_nPortSize)
{
m_nPortSize = nPortSize; //? reset size!!!
m_setSocket.resize(m_nPortSize, m_sockMain);
}
}
}
}
else
{
// 存消息
PushMsg(pHead->m_nPort, pHead->m_nPacket, (VAR_TYPE)pHead->m_nVarType, pHead->m_bufData);
}
// 清除BUFFER中的消息
m_setRecvBuf[0].DelMsg(MSG_HEAD_SIZE + nLen);
}
}
return (m_setMsg.size() != 0);
}
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::WaitMsg(int nMilliSec)
{
if(!IsOpen())
return false;
if(m_setMsg.size())
return true;
RecvMsg(nMilliSec);
return (m_setMsg.size() != 0);
}
///////////////////////////////////////////////////////////////////////////////////////
// 关闭接口,不再接收消息。可重复调用。
bool CInternetPort::Close()
{
if(m_nState == STATE_CLOSED)
return true;
m_nState = STATE_CLOSED;
if(m_id == 0) // 第一号接口为服务端,其它接口为客户端
{
// 清空消息串
for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
delete(*iter);
m_setMsg.clear();
// 清空BUFFER
m_setRecvBuf.resize(m_nPortSize);
for(int i = 0; i < m_setRecvBuf.size(); i++)
m_setRecvBuf[i].Reset();
// 关闭主SOCKET
if(m_sockMain != INVALID_SOCKET)
{
closesocket(m_sockMain);
m_sockMain = INVALID_SOCKET;
}
// 关闭子SOCKET
m_setSocket.resize(m_nPortSize);
for( i = 0; i < m_setSocket.size(); i++)
{
if(m_setSocket[i] != INVALID_SOCKET)
{
closesocket(m_setSocket[i]);
m_setSocket[i] = INVALID_SOCKET;
}
}
}
else
{
// 清空消息串
for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
delete(*iter);
m_setMsg.clear();
// 清空BUFFER
m_setRecvBuf[0].Reset();
// 关闭主SOCKET
if(m_sockMain != INVALID_SOCKET)
{
closesocket(m_sockMain);
m_sockMain = INVALID_SOCKET;
}
// 清空SOCKET表
for(int i = 0; i < m_setSocket.size(); i++)
m_setSocket[i] = INVALID_SOCKET;
}
return true;
}
///////////////////////////////////////////////////////////////////////////////////////
// CInternetPort
///////////////////////////////////////////////////////////////////////////////////////
int CInternetPort::SIZE_OF_TYPE (int type)
{
const int fixlen_type_size = 10;
const int size_of[fixlen_type_size] = {1,1,2,2, 4,4,4,4, 4,8};
if(type < VARTYPE_NONE)
return type;
else if(type == VARTYPE_NONE)
{
ASSERT(!"SIZE_OF_TYPE");
return 0;
}
else if(type - (VARTYPE_NONE+1) < fixlen_type_size)
return size_of[type - (VARTYPE_NONE+1)];
ASSERT(!"SIZE_OF_TYPE.");
return 0;
}
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::PushMsg(int nPort, int nPacket, VAR_TYPE nVarType, const void* buf) // nData中的串和结构都会被COPY
{
CHECKF(buf);
if(m_nState != STATE_OK) // Close()后,不接收消息
return false;
if(nPort < 0 || nPort >= m_nPortSize)
{
ASSERT(!"PushMsg(nPort)");
return false;
}
int nSize = SIZE_OF_TYPE(nVarType);
if(nSize == 0 || nSize > MAX_MSGPACKSIZE)
{
ASSERT(!"PushMsg(nSize)");
return false;
}
CMessagePacket* pMsg = new CMessagePacket; // VVVVVVVVVVVVVVVVVVV
if(!pMsg)
{
ASSERT(!"new");
return false;
}
pMsg->m_nPortFrom = nPort;
pMsg->m_nPacket = nPacket;
pMsg->m_nVarType = nVarType;
memcpy(pMsg->m_bufData, buf, nSize);
m_setMsg.push_back(pMsg); // AAAAAAAAAAAAAAAAAAAAAAAA
// 检查溢出
if((m_setMsg.size()%100) == 0) //? 100报警
LOGERROR("CInternetPacket[%d]接口的数量达到[%u]", m_id, m_setMsg.size());
if(m_setMsg.size() > 10000)
{
LOGERROR("ERROR: 消息堆溢出,InternetPort[%d]关闭!", m_id);
Close();
}
return m_setMsg.size() <= 1000; //? 1000出错
}
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::PopMsg(int *pPort, int *pPacket, VAR_TYPE *pVarType, void* buf)
{
CHECKF(pPort && pPacket && pVarType && buf);
if(m_nState != STATE_OK) // Close()后,不处理消息
return false;
if(m_setMsg.size() == 0)
return false;
for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
{
CMessagePacket* pMsg = *iter;
if((*pPort == PORT_ANY || *pPort == pMsg->m_nPortFrom) && (*pPacket == PACKET_ANY || *pPacket == pMsg->m_nPacket))
{
*pPort = pMsg->m_nPortFrom;
*pPacket = pMsg->m_nPacket;
*pVarType = (VAR_TYPE)pMsg->m_nVarType;
int nSize = SIZE_OF_TYPE(*pVarType);
if(nSize)
{
memcpy(buf, pMsg->m_bufData, nSize);
}
delete(pMsg);
m_setMsg.erase(iter);
return true;
}
}
return false;
}
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::Init()
{
m_setSocket.resize(m_nPortSize, INVALID_SOCKET);
if(m_id == MASTER_PORT_ID)
{
for(int i = 0; i < m_nPortSize; i++)
m_setRecvBuf.push_back(CRecvBuffer());
}
else
m_setRecvBuf.push_back(CRecvBuffer());
return true;
}
///////////////////////////////////////////////////////////////////////////////////////
void CInternetPort::Clear()
{
Close();
for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
delete(*iter);
m_setMsg.clear();
}
///////////////////////////////////////////////////////////////////////////////////////
// static
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::InitPortSet(int nPortSize, LPCTSTR pszMasterIP, int nMasterPort)
{
if(!pszMasterIP || pszMasterIP[0] == 0 || strlen(pszMasterIP) >= IPSTR_SIZE)
return false;
if(nPortSize <= m_id)
nPortSize = m_id + 1; // 容错
if(nPortSize < 1 || nPortSize > MAX_PORT_SIZE)
{
ASSERT(!"网络节点数错误! 请修改FD_SETSIZE值并重新编译程序。");
return false;
}
m_nPortSize = nPortSize;
strcpy(m_szMasterIP, pszMasterIP);
m_nMasterPort = nMasterPort;
// 初始化网络
WSADATA wsaData;
int err;
if((err = WSAStartup(SOCKET_VERSION, &wsaData)) != 0)
{
LOGERROR("WSAStartup failed[%d].", err);
return false;
}
m_bWinStartup = true;
// 检查版本
if(wsaData.wVersion != 0x0002)
{
WSACleanup();
LOGERROR("wsaData version unmatch[0x%04X].", wsaData.wVersion);
return false;
}
Init();
return true;
}
///////////////////////////////////////////////////////////////////////////////////////
void CInternetPort::ClearPortSet()
{
Clear();
if(m_bWinStartup)
WSACleanup();
}
///////////////////////////////////////////////////////////////////////////////////////
IMessagePort* CInternetPort::CreateNew(int nPortID, int nPortSize, LPCTSTR szMasterIP, int nMasterPort, LPCTSTR szKey)
{
CHECKF(szMasterIP && szKey);
CHECKF(nPortID >= 0 && nPortSize >=0 && nMasterPort >= 0);
if(nPortID == 0 && nPortSize == 0 || nPortID != 0 && strlen(szMasterIP) == 0)
return NULL;
CInternetPort* pPort = new CInternetPort(nPortID);
if(pPort)
{
if(pPort->InitPortSet(nPortSize, szMasterIP, nMasterPort))
{
SafeCopy(pPort->m_szKey, szKey, MAX_KEYSIZE);
return (IMessagePort*)pPort;
}
delete pPort;
}
return NULL;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -