📄 framework.cpp
字号:
/*-----------------------------------------------------------------------------
* FILE: framework.cpp
* AUTH: xuwannian@gmail.com
* TIME: 2008-04-29
*
*
* Windows下多线程网关(SOCKET)框架
*
* class CFramework是基类,如果需要实现具体连接,需要重载该类,其中有几个必需要
* 重载的方法:
* 1、OnCreateClient 是如何创建SOCKET的客户端线程
* 2、OnCreateServer 是如何创建SOCKET的服务端线程
* 3、OnExecuteMessage 是收到消息后如何处理这个消息
* 4、OnRecvData 是处理如何接收消息,因为系统分为长连接和短连接,每一个连
* 接不一定都是定长,所以需要重载该方法,实现接收到数据
*---------------------------------------------------------------------------*/
#include "include/socket.h"
#include "include/framework.h"
// 判断值是否是一个UNSIGNED值,
// 并且在0-y之间,如果0-y之间,则使用y作为默认参数
#define DEFAULT_UNSINGED_INT(x, y) (((x > 0) && x < y) ? x : y)
///////////////////////////////////////////////////////////////////////////////
CFramework::CFramework() : __bexit(false)
{
}
CFramework::~CFramework()
{
}
BOOL CFramework::Run(int argc, char** argv)
{
// 初始化应用程序
if (!InitApplication(argc, argv))
{
return FALSE;
}
// 创建客户端线程
HOSTSTRUCTS clients;
if (!OnCreateClient(clients))
{
return FALSE;
}
// 创建服务器端线程
HOSTSTRUCTS servers;
if (!OnCreateServer(servers))
{
return FALSE;
}
// 创建接收消息线程
int nRecvThreadMax = 1;
if (!OnCreateRecvMessage(nRecvThreadMax))
{
return FALSE;
}
int nThreadCount = 64;
// 创建处理消息队列的线程
if (!OnCreateDoMessage(nThreadCount))
{
return FALSE;
}
// 等待销毁程序
DestroyApplication();
return TRUE;
}
BOOL CFramework::InitApplication(int argc, char** argv)
{
// 反序列化
UnSerialize();
return TRUE;
}
void CFramework::DestroyApplication()
{
// 序列化
Serialize();
std::list<xuwn::thread*>::iterator it;
for (it = __threads.begin(); it != __threads.end(); it++)
{
xuwn::thread* pthread = (xuwn::thread*)*it;
pthread->join();
delete pthread;
::printf("thread return!\n");
}
::printf("All Threads returned!\n");
}
BOOL CFramework::OnConnected(const HOSTSTRUCT& host__)
{ return TRUE; }
void CFramework::OnDisconnected(const HOSTSTRUCT& host__)
{
::printf("主机:%s 断开连接!\n", host__.addr);
xuwn::CSocket::Close(host__.fd);
}
BOOL CFramework::OnRecvData(const HOSTSTRUCT& host__, xuwn::MESSAGE& msg)
{
// 表示该包为心跳包
if (0 == msg.size.bodys)
{
__queue.put(msg);
}
else // 如果不是心跳包则有可能仍需要接收消息
{
// 判断该SOCKET在10秒内是否有消息
switch (xuwn::CSocket::FdCanRead(host__.fd, 10))
{
case xuwn::EXCEPTION: // exception
case xuwn::TIMEOUT: // timeout or exception
{
xuwn::CSocket::Close(host__.fd);
throw CException("超时", __FILE__, __LINE__, ::WSAGetLastError());
}
return FALSE;
default: //
{
// has message to recv
int rc = ::recv(host__.fd, msg.buffer + msg.size.heads, msg.size.bodys, 0);
if (0 < rc)
{
__queue.put(msg);
}
else if (0 == rc)
{
OnDisconnected(host__);
return FALSE; // 目标主机断开连接
}
else
{
throw CException("异常", __FILE__, __LINE__, ::WSAGetLastError());
return FALSE;
}
}
return TRUE;
}
}
return TRUE;
}
void CFramework::OnExecuteMessage(const xuwn::MESSAGE& message)
{
::printf("MESSAGE: %s\n", message.buffer);
}
BOOL CFramework::OnCreateDoMessage(int& nThreadCount)
{
for (int i = 0; i < DEFAULT_UNSINGED_INT(nThreadCount, 128); i++)
{
xuwn::thread* pthrd = new xuwn::thread(&create_do_message_threads, this);
AddThreadToList(pthrd);
::Sleep(10);
}
return TRUE;
}
BOOL CFramework::OnCreateRecvMessage(int& nRecvThreadsCount)
{
for (int i = 0; i < DEFAULT_UNSINGED_INT(nRecvThreadsCount, 128); i++)
{
xuwn::thread* pthrd = new xuwn::thread(&create_recv_threads, this);
AddThreadToList(pthrd);
::Sleep(10);
}
return TRUE;
}
void CFramework::Serialize()
{ }
void CFramework::UnSerialize()
{ }
BOOL CFramework::OnCreateServer(HOSTSTRUCTS& hosts__)
{
xuwn::ARGS<HOSTSTRUCT > args(this);
while (0 == hosts__.get(args.__t))
{
xuwn::thread* pthrd = new xuwn::thread(&create_server_thread, &args);
AddThreadToList(pthrd);
::Sleep(100);
}
return TRUE;
}
BOOL CFramework::OnCreateClient(HOSTSTRUCTS& hosts__)
{
xuwn::ARGS<HOSTSTRUCT > args(this);
while (0 == hosts__.get(args.__t))
{
xuwn::thread* pthrd = new xuwn::thread(&create_client_thread, &args);
AddThreadToList(pthrd);
::Sleep(100);
}
return TRUE;
}
void CFramework::__server_thread(const HOSTSTRUCT& host__)
{
HOSTSTRUCT host;
::memcpy(&host, &host__, sizeof(host__));
xuwn::server s;
while (!s.Init(host.addr, host.port) && !__bexit)
{
::printf("Init socket: %s %d code: %d\n", host.addr, host.port, s.code);
::Sleep(1000);
}
::printf("Init socket: %s:%d successed\n", host.addr, host.port);
// 循环检测是否有连接请求
while (!__bexit)
{
// TRUE为有连接请求
if (s.HasConnect(2))
{
HOSTSTRUCT ask;
::memset(&ask, 0, sizeof(ask));
s.GetRemoteAddr(ask.addr);
ask.fd = s.GetSocket();
ask.port = host.port;
// 判断是否为允许访问的主机
// 如果不是,则关闭与之连接
if (IsApplyHost(ask))
{
// 请求登录,如果不需要登录直接返回TRUE
if (OnConnected(ask))
{
// 登录成功后,则添加到系统路由表中,
// 并添加待接收的队列,由接收线程负责接收消息
AddRoute(ask);
__socket_queue.put(ask);
}
else
{
xuwn::CSocket::Close(ask.fd);
}
}
else
{
::printf("Refuse connection: %s %d\n", ask.addr, ask.port);
xuwn::CSocket::Close(ask.fd);
}
}
::Sleep(200);
}
}
void CFramework::__client_thread(const HOSTSTRUCT& host__)
{
HOSTSTRUCT host;
::memcpy(&host, &host__, sizeof(host__));
::printf("addr: %s\n", host.addr);
xuwn::client c;
while (!__bexit)
{
if (TRUE != c.Init(host.addr, host.port))
{
::printf("Init Socket client: %s:%d failed code: %d\n", host.addr, host.port, c.code);
return ;
}
::printf("Init Socket client: %s:%d successed\n", host.addr, host.port);
// connect remote host server
while (TRUE != c.Connect() && !__bexit)
{
::printf("Connect host: %s %d code: %d\n", host.addr, host.port, c.code);
Sleep(1000);
continue;
}
host.fd = c.GetSocket();
// logon remote host
if (TRUE != OnConnected(host))
{
Sleep(1000);
continue;
}
// 将连接后的信息添加到路系统路由表中
AddRoute(host);
// recv message from socket
xuwn::MESSAGE msg;
while (TRUE == OnRecvData(host, msg) && !__bexit)
{
::memset(&msg, 0, sizeof(msg));
}
OnDisconnected(host);
}
}
void CFramework::__recv_thread()
{
HOSTSTRUCT host;
::memset(&host, 0, sizeof(host));
while (!__bexit)
{
// 从队列中取到主机信息,判断该主机信息是否有信息需要接收
// rc = 0 表示取到主机信息
int rc = __socket_queue.get(host);
if (0 == rc)
{
// 判断该SOCKET在2秒钟内是否有消息可读,如果有消息可
// 读,则调用OnRecvData()函数去接收消息
int check = xuwn::CSocket::FdCanRead(host.fd, 1);
switch (check)
{
case xuwn::EXCEPTION:
// 出现异常,需要关闭该SOCKET
// 并从__socket_queue中移除该主机的相应信息
::Sleep(1000);
// __socket_queue.put(host);
break;
case xuwn::TIMEOUT:
// 等待超时,表示在指定的时间内,该SOCKET无信息
// 可读,则需要重新放入到__socket_queue队列中,
// 等待下次再接收
//等待一段时间后再重新接收
::Sleep(500);
__socket_queue.put(host);
break;
default:
// 如果接收消息成功,则将该主机信息重新写入队列,下次再接收
// 如果接收消息失败,则关闭该SOCKET,并触发断开连接事件
xuwn::MESSAGE msg;
if (TRUE == OnRecvData(host, msg))
{
__socket_queue.put(host);
}
else
{
xuwn::CSocket::Close(host.fd);
OnDisconnected(host);
}
}
}
else
{
Sleep(100);
}
::memset(&host, 0, sizeof(host));
}
}
void CFramework::__do_message_thread()
{
xuwn::MESSAGE msg;
while (!__bexit)
{
// 从消息队列中取消息
int rc = __queue.get(msg);
switch (rc)
{
case 0:
// 从消息队列中取到了消息
OnExecuteMessage(msg);
break;
default:
Sleep(1000);
break;
}
}
}
void CFramework::Close()
{
__bexit = true;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -