⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netcomputer_generic.cpp

📁 这是一款2d游戏引擎
💻 CPP
📖 第 1 页 / 共 2 页
字号:

	netsession->remove_udp_map(this);

	shutdown = true;
	wakeup_trigger.set_flag();
	thread.wait();
}

void CL_NetComputer_Generic::received_udp_packet(CL_NetPacket &packet)
{
	try
	{
		unsigned char command = packet.input.read_uchar8();
		switch (command)
		{
		case cmd_udp_connect:
			{
				// Ok got udp connection,
				// inform we are go for udp:

				CL_OutputSource_Memory cmd;
				cmd.write_uchar8(cmd_udp_connect);

				CL_MutexSection mutex_section(&mutex);
				bool tcp_queue_empty = send_tcp_queue.empty();
				send_tcp_queue.push(cmd.get_data());
				if (tcp_queue_empty) wakeup_trigger.set_flag();

				udp_available = true;
			}
			break;

		case cmd_netpacket_msg:
			{
				std::map<int, std::string>::iterator iter = remote_atoms.find(packet.input.read_ushort16());

				// if it's an unknown channel, ignore message.
				if (iter == remote_atoms.end()) break;

				std::string channel_name = (*iter).second;

				int pos = packet.input.tell();
				CL_NetPacket message(packet.get_data()+pos, packet.get_size()-pos);
				CL_MutexSection mutex_section(&netsession->mutex);
				netsession->received_netpacket(channel_name, message, this);
			}
			break;
		}
	}
	catch (CL_Error error)
	{
		// bogus packet. Should this cause clannet to disconnect the netcomputer?
	}
}

/////////////////////////////////////////////////////////////////////////////
// CL_NetComputer_Generic implementation:

void CL_NetComputer_Generic::worker_thread()
{
	try
	{
		if (async_boot) // async connect. Do the connect now.
		{
			async_boot = false;
			address = CL_IPAddress(async_hostname, async_port);
			tcp_sock.bind(CL_IPAddress());
			tcp_sock.connect(address);
			server_port = async_port;
		}

		if (server)
		{
			CL_MutexSection mutex_section(&mutex);
			send_udp_info();
		}

		while (!disconnected)
		{
			CL_MutexSection mutex_section(&mutex, false);

			// Wait for activity:
			CL_EventListener listener;
			mutex_section.enter();
			listener.add_trigger(&wakeup_trigger);
			listener.add_trigger(tcp_sock.get_read_trigger());
			if (!send_tcp_queue.empty()) listener.add_trigger(tcp_sock.get_write_trigger());
			mutex_section.leave();

			listener.wait();
			if (shutdown) return;

			// Process activity:
			wakeup_trigger.reset();
			update_socket();
		}
	}
	catch (CL_Error err)
	{
		CL_MutexSection mutex_section(&mutex, false);

		disconnect_reason = err.message;
		disconnected = true;
	}

	// Remove computer from processing queue if disconnected.
	if (disconnected)
	{
		CL_MutexSection mutex_section(&netsession->mutex);
		netsession->computers.remove(this);
		netsession->disconnections.push(CL_NetComputer(this));
	}

	// Close socket connection:
	CL_MutexSection mutex_section(&mutex);
	tcp_sock = CL_Socket();
}

#define cl_min(a, b) (a < b ? a : b)

void CL_NetComputer_Generic::update_socket()
{
	if (disconnected) return;

	if (tcp_sock.get_read_trigger()->get_flag()) // Data available on socket:
	{
		char buffer[16*1024];

		if (data_left > 0) // currently routing message data.
		{
			int received = 0;
			
			received = tcp_sock.recv(buffer, cl_min(data_left, 16*1024));
			if (received == 0) throw CL_Error("Connection closed by foreign host");
			
			if (netstream) // Send data to netstream:
			{
				CL_MutexSection mutex_section(&netstream->mutex);
				netstream->receive_queue.push(std::string(buffer, received));
				netstream->read_trigger.set_flag();
			}
			else // Send data to netpacket
			{
				int pos = netpacket.get_size();
				netpacket.resize(pos + received);
				memcpy(netpacket.get_data()+pos, buffer, received);
			}

			data_left -= received;

			if (data_left == 0) // finished reading message data
			{
				if (!netstream)
				{
					CL_MutexSection mutex_section(&netsession->mutex);
					netsession->received_netpacket(remote_atoms[dest_netpacket_atom], netpacket, this);
					netpacket = CL_NetPacket();
				}

				netstream = 0;
			}
		}
		else // hmm, new message.
		{
			unsigned char command = tcp_sock.input.read_uchar8();
			switch (command)
			{
			case cmd_create_atom:
				{
					int atom_id = tcp_sock.input.read_ushort16();
					std::string atom_str = tcp_sock.input.read_string();

					remote_atoms[atom_id] = atom_str;
				}
				break;

			case cmd_udp_info:
				if (!server)
				{
					udp_connect_secret = tcp_sock.input.read_ushort16();
					udp_address = CL_IPAddress(address.get_address(), server_port);

					CL_NetPacket packet;
					packet.output.write_uchar8(cmd_udp_connect);
					packet.output.write_ushort16(udp_connect_secret);
					netsession->udp_sock.send(packet.get_data(), packet.get_size(), udp_address);
					// todo: keep sending this packet until a cmd_udp_connect is received

					// now add computer to udp_ip_port_map list
					netsession->udp_ip_port_map[udp_address] = this;
				}
				break;

			case cmd_udp_connect:
				{
					udp_available = true;
				}
				break;

			case cmd_netstream_connect:
				{
					std::map< std::string, CL_Signal_v1<CL_NetStream &> >::iterator it_connect;

					std::string channel_name = remote_atoms[tcp_sock.input.read_ushort16()];
					int channel_id = tcp_sock.input.read_ushort16();
					CL_MutexSection mutex_section(&netsession->mutex);
					it_connect = netsession->map_netstream_connect.find(channel_name);
					if (it_connect == netsession->map_netstream_connect.end())
					{
						// Tried to connect to non-existant netstream.
						mutex_section.leave();
						send_stream_close(channel_id);
					}
					CL_NetStream_Generic *stream = new CL_NetStream_Generic(channel_id, this, netsession);
					streams[channel_id] = stream;
					CL_NetSession_Generic::NewStreamPair pair(stream, channel_name);
					netsession->new_streams.push(pair);
				}
				break;

			case cmd_netstream_msg:
				{
					std::map< int, CL_NetStream_Generic *>::iterator it_channel;

					CL_MutexSection mutex_section(&mutex);
					unsigned int channel_id = tcp_sock.input.read_ushort16();
					data_left = tcp_sock.input.read_ushort16();
					it_channel = streams.find(channel_id);
					if (it_channel == streams.end())
					{
						// Tried to send data to non-existant netstream connection.
						netstream = 0;
						send_stream_close(channel_id);
					}
					else
					{
						netstream = it_channel->second;
					}
				}
				break;

			case cmd_netstream_closed:
				{
					std::map<int, CL_NetStream_Generic *>::iterator it_channel;

					CL_MutexSection mutex_section(&mutex);
					unsigned int channel_id = tcp_sock.input.read_ushort16();
					it_channel = streams.find(channel_id);
					if (it_channel != streams.end())
					{
						CL_MutexSection mutex_section(&it_channel->second->mutex);
						it_channel->second->closed = true;
						it_channel->second->read_trigger.set_flag();
						streams.erase(it_channel);
					}
				}
				break;

			case cmd_netpacket_msg:
				{
					dest_netpacket_atom = tcp_sock.input.read_ushort16();
					data_left = tcp_sock.input.read_ushort16();
				}
				break;
			}
		}
	}

	CL_MutexSection mutex_section(&mutex);
//		if (tcp_sock.get_write_trigger()->get_flag()) // Can write data to socket
	while (!send_tcp_queue.empty())
	{
		std::string &msg = send_tcp_queue.front();

		int sent = tcp_sock.send(msg.substr(send_pos));
		send_pos += sent;
		if (send_pos != static_cast<int>(msg.size())) break;

		send_tcp_queue.pop();
		send_pos = 0;
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -