⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 framework.cpp

📁 用c++编写的网络通信框架
💻 CPP
字号:
/*-----------------------------------------------------------------------------
 * FILE: framework.cpp
 * AUTH: xuwannian@gmail.com
 * TIME: 2008-04-29
 *
 *
 * Windows下多线程网关(SOCKET)框架
 *
 * class CFramework是基类,如果需要实现具体连接,需要重载该类,其中有几个必需要
 * 重载的方法:
 *	1、OnCreateClient	是如何创建SOCKET的客户端线程
 *	2、OnCreateServer	是如何创建SOCKET的服务端线程
 *	3、OnExecuteMessage 是收到消息后如何处理这个消息
 *	4、OnRecvData		是处理如何接收消息,因为系统分为长连接和短连接,每一个连
 *						接不一定都是定长,所以需要重载该方法,实现接收到数据
 *---------------------------------------------------------------------------*/
#include "include/socket.h"
#include "include/framework.h"


// 判断值是否是一个UNSIGNED值,
// 并且在0-y之间,如果0-y之间,则使用y作为默认参数
#define DEFAULT_UNSINGED_INT(x, y) (((x > 0) && x < y) ? x : y)
///////////////////////////////////////////////////////////////////////////////

CFramework::CFramework() : __bexit(false)
{
}

CFramework::~CFramework()
{
}

BOOL CFramework::Run(int argc, char** argv)
{
	// 初始化应用程序
	if (!InitApplication(argc, argv))
	{
		return FALSE;
	}

	// 创建客户端线程
	HOSTSTRUCTS clients;
	if (!OnCreateClient(clients))
	{
		return FALSE;
	}

	// 创建服务器端线程
	HOSTSTRUCTS servers;
	if (!OnCreateServer(servers))
	{
		return FALSE;
	}

	// 创建接收消息线程
	int nRecvThreadMax = 1;
	if (!OnCreateRecvMessage(nRecvThreadMax))
	{
		return FALSE;
	}

	int nThreadCount = 64;
	// 创建处理消息队列的线程
	if (!OnCreateDoMessage(nThreadCount))
	{
		return FALSE;
	}

	// 等待销毁程序
	DestroyApplication();
	return TRUE;
}

BOOL CFramework::InitApplication(int argc, char** argv)
{
	// 反序列化
	UnSerialize();
	return TRUE;
}

void CFramework::DestroyApplication()
{
	// 序列化
	Serialize();
	std::list<xuwn::thread*>::iterator it;

	for (it = __threads.begin(); it != __threads.end(); it++)
	{
		xuwn::thread* pthread = (xuwn::thread*)*it;
		pthread->join();
		delete pthread;
		::printf("thread return!\n");
	}

	::printf("All Threads returned!\n");
}

BOOL CFramework::OnConnected(const HOSTSTRUCT& host__)
{ return TRUE; }

void CFramework::OnDisconnected(const HOSTSTRUCT& host__)
{
	::printf("主机:%s 断开连接!\n", host__.addr);
	xuwn::CSocket::Close(host__.fd);
}

BOOL CFramework::OnRecvData(const HOSTSTRUCT& host__, xuwn::MESSAGE& msg)
{
	// 表示该包为心跳包
	if (0 == msg.size.bodys)
	{
		__queue.put(msg);
	}
	else	// 如果不是心跳包则有可能仍需要接收消息
	{
		// 判断该SOCKET在10秒内是否有消息
		switch (xuwn::CSocket::FdCanRead(host__.fd, 10))
		{
		case  xuwn::EXCEPTION:	// exception
		case  xuwn::TIMEOUT:	// timeout or exception
			{
				xuwn::CSocket::Close(host__.fd);
				throw CException("超时", __FILE__, __LINE__, ::WSAGetLastError());
			}
			return FALSE;
		default:	//
			{
				// has message to recv
				int rc = ::recv(host__.fd, msg.buffer + msg.size.heads, msg.size.bodys, 0);
				
				if (0 < rc)
				{
					__queue.put(msg);
				}
				else if (0 == rc)
				{
					OnDisconnected(host__);
					return FALSE;	// 目标主机断开连接
				}
				else
				{
					throw CException("异常", __FILE__, __LINE__, ::WSAGetLastError());
					return FALSE;
				}
			}
			return TRUE;
		}
	}

	return TRUE;
}

void CFramework::OnExecuteMessage(const xuwn::MESSAGE& message)
{
	::printf("MESSAGE: %s\n", message.buffer);
}

BOOL CFramework::OnCreateDoMessage(int& nThreadCount)
{
	for (int i = 0; i < DEFAULT_UNSINGED_INT(nThreadCount, 128); i++)
	{
		xuwn::thread* pthrd = new xuwn::thread(&create_do_message_threads, this);
		AddThreadToList(pthrd);
		::Sleep(10);
	}
	return TRUE;
}

BOOL CFramework::OnCreateRecvMessage(int& nRecvThreadsCount)
{
	for (int i = 0; i < DEFAULT_UNSINGED_INT(nRecvThreadsCount, 128); i++)
	{
		xuwn::thread* pthrd = new xuwn::thread(&create_recv_threads, this);
		AddThreadToList(pthrd);
		::Sleep(10);
	}
	return TRUE;
}

void CFramework::Serialize()
{ }

void CFramework::UnSerialize()
{ }

BOOL CFramework::OnCreateServer(HOSTSTRUCTS& hosts__)
{
	xuwn::ARGS<HOSTSTRUCT > args(this);
	while (0 == hosts__.get(args.__t))
	{
		xuwn::thread* pthrd = new xuwn::thread(&create_server_thread, &args);
		AddThreadToList(pthrd);
		::Sleep(100);
	}
	return TRUE;
}

BOOL CFramework::OnCreateClient(HOSTSTRUCTS& hosts__)
{
	xuwn::ARGS<HOSTSTRUCT > args(this);
	while (0 == hosts__.get(args.__t))
	{
		xuwn::thread* pthrd = new xuwn::thread(&create_client_thread, &args);
		AddThreadToList(pthrd);
		::Sleep(100);
	}
	return TRUE;
}

void CFramework::__server_thread(const HOSTSTRUCT& host__)
{
	HOSTSTRUCT host;
	::memcpy(&host, &host__, sizeof(host__));

	xuwn::server s;
	
	while (!s.Init(host.addr, host.port) && !__bexit)
	{
		::printf("Init socket: %s %d code: %d\n", host.addr, host.port, s.code);
		::Sleep(1000);
	}

	::printf("Init socket: %s:%d successed\n", host.addr, host.port);

	// 循环检测是否有连接请求
	while (!__bexit)
	{
		// TRUE为有连接请求
		if (s.HasConnect(2))
		{
			HOSTSTRUCT ask;
			::memset(&ask, 0, sizeof(ask));

			s.GetRemoteAddr(ask.addr);
			ask.fd		= s.GetSocket();
			ask.port	= host.port;
			
			// 判断是否为允许访问的主机
			// 如果不是,则关闭与之连接
			if (IsApplyHost(ask))
			{
				// 请求登录,如果不需要登录直接返回TRUE
				if (OnConnected(ask)) 
				{
					// 登录成功后,则添加到系统路由表中,
					// 并添加待接收的队列,由接收线程负责接收消息
					AddRoute(ask);
					__socket_queue.put(ask);
				}
				else	
				{
					xuwn::CSocket::Close(ask.fd);
				}
			}
			else
			{
				::printf("Refuse connection: %s %d\n", ask.addr, ask.port);
				xuwn::CSocket::Close(ask.fd);
			}
		}
		::Sleep(200);
	}
}

void CFramework::__client_thread(const HOSTSTRUCT& host__)
{
	HOSTSTRUCT host;
	::memcpy(&host, &host__, sizeof(host__));
	::printf("addr: %s\n", host.addr);
	
	xuwn::client c;

	while (!__bexit)
	{
		if (TRUE != c.Init(host.addr, host.port))
		{
			::printf("Init Socket client: %s:%d failed code: %d\n", host.addr, host.port, c.code);
			return ;
		}
		
		::printf("Init Socket client: %s:%d successed\n", host.addr, host.port);
		// connect remote host server
		while (TRUE != c.Connect() && !__bexit)
		{
			::printf("Connect host: %s %d code: %d\n", host.addr, host.port, c.code);
			Sleep(1000);
			continue;
		}
		
		host.fd = c.GetSocket();

		// logon remote host
		if (TRUE != OnConnected(host))
		{
			Sleep(1000);
			continue;
		}

		// 将连接后的信息添加到路系统路由表中
		AddRoute(host);

		// recv message from socket
		xuwn::MESSAGE msg;
		while (TRUE == OnRecvData(host, msg) && !__bexit)
		{
			::memset(&msg, 0, sizeof(msg));
		}
		
		OnDisconnected(host);
	}
}

void CFramework::__recv_thread()
{
	HOSTSTRUCT host;
	::memset(&host, 0, sizeof(host));

	while (!__bexit)
	{
		// 从队列中取到主机信息,判断该主机信息是否有信息需要接收
		// rc = 0 表示取到主机信息
		int rc = __socket_queue.get(host);
		if (0 == rc)
		{
			// 判断该SOCKET在2秒钟内是否有消息可读,如果有消息可
			// 读,则调用OnRecvData()函数去接收消息
			int check = xuwn::CSocket::FdCanRead(host.fd, 1);
			switch (check)
			{
			case xuwn::EXCEPTION:
				// 出现异常,需要关闭该SOCKET
				// 并从__socket_queue中移除该主机的相应信息
				::Sleep(1000);
//				__socket_queue.put(host);
				break;
			case xuwn::TIMEOUT:
				// 等待超时,表示在指定的时间内,该SOCKET无信息
				// 可读,则需要重新放入到__socket_queue队列中,
				// 等待下次再接收
				//等待一段时间后再重新接收
				::Sleep(500);
				__socket_queue.put(host);
				break;
			default:
				// 如果接收消息成功,则将该主机信息重新写入队列,下次再接收
				// 如果接收消息失败,则关闭该SOCKET,并触发断开连接事件
				xuwn::MESSAGE msg;
				if (TRUE == OnRecvData(host, msg))
				{
					__socket_queue.put(host);
				}
				else 
				{
					xuwn::CSocket::Close(host.fd);
					OnDisconnected(host);
				}
			}
		}
		else
		{
			Sleep(100);
		}
		::memset(&host, 0, sizeof(host));
	}
}

void CFramework::__do_message_thread()
{
	xuwn::MESSAGE msg;
	while (!__bexit)
	{
		// 从消息队列中取消息
		int rc = __queue.get(msg);
		
		switch (rc)
		{
		case 0:
			// 从消息队列中取到了消息
			OnExecuteMessage(msg);
			break;
		default:
			Sleep(1000);
			break;
		}
	}
}

void CFramework::Close()
{
	__bexit = true;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -