📄 netcomputer_generic.cpp
字号:
netsession->remove_udp_map(this);
shutdown = true;
wakeup_trigger.set_flag();
thread.wait();
}
void CL_NetComputer_Generic::received_udp_packet(CL_NetPacket &packet)
{
try
{
unsigned char command = packet.input.read_uchar8();
switch (command)
{
case cmd_udp_connect:
{
// Ok got udp connection,
// inform we are go for udp:
CL_OutputSource_Memory cmd;
cmd.write_uchar8(cmd_udp_connect);
CL_MutexSection mutex_section(&mutex);
bool tcp_queue_empty = send_tcp_queue.empty();
send_tcp_queue.push(cmd.get_data());
if (tcp_queue_empty) wakeup_trigger.set_flag();
udp_available = true;
}
break;
case cmd_netpacket_msg:
{
std::map<int, std::string>::iterator iter = remote_atoms.find(packet.input.read_ushort16());
// if it's an unknown channel, ignore message.
if (iter == remote_atoms.end()) break;
std::string channel_name = (*iter).second;
int pos = packet.input.tell();
CL_NetPacket message(packet.get_data()+pos, packet.get_size()-pos);
CL_MutexSection mutex_section(&netsession->mutex);
netsession->received_netpacket(channel_name, message, this);
}
break;
}
}
catch (CL_Error error)
{
// bogus packet. Should this cause clannet to disconnect the netcomputer?
}
}
/////////////////////////////////////////////////////////////////////////////
// CL_NetComputer_Generic implementation:
void CL_NetComputer_Generic::worker_thread()
{
try
{
if (async_boot) // async connect. Do the connect now.
{
async_boot = false;
address = CL_IPAddress(async_hostname, async_port);
tcp_sock.bind(CL_IPAddress());
tcp_sock.connect(address);
server_port = async_port;
}
if (server)
{
CL_MutexSection mutex_section(&mutex);
send_udp_info();
}
while (!disconnected)
{
CL_MutexSection mutex_section(&mutex, false);
// Wait for activity:
CL_EventListener listener;
mutex_section.enter();
listener.add_trigger(&wakeup_trigger);
listener.add_trigger(tcp_sock.get_read_trigger());
if (!send_tcp_queue.empty()) listener.add_trigger(tcp_sock.get_write_trigger());
mutex_section.leave();
listener.wait();
if (shutdown) return;
// Process activity:
wakeup_trigger.reset();
update_socket();
}
}
catch (CL_Error err)
{
CL_MutexSection mutex_section(&mutex, false);
disconnect_reason = err.message;
disconnected = true;
}
// Remove computer from processing queue if disconnected.
if (disconnected)
{
CL_MutexSection mutex_section(&netsession->mutex);
netsession->computers.remove(this);
netsession->disconnections.push(CL_NetComputer(this));
}
// Close socket connection:
CL_MutexSection mutex_section(&mutex);
tcp_sock = CL_Socket();
}
#define cl_min(a, b) (a < b ? a : b)
void CL_NetComputer_Generic::update_socket()
{
if (disconnected) return;
if (tcp_sock.get_read_trigger()->get_flag()) // Data available on socket:
{
char buffer[16*1024];
if (data_left > 0) // currently routing message data.
{
int received = 0;
received = tcp_sock.recv(buffer, cl_min(data_left, 16*1024));
if (received == 0) throw CL_Error("Connection closed by foreign host");
if (netstream) // Send data to netstream:
{
CL_MutexSection mutex_section(&netstream->mutex);
netstream->receive_queue.push(std::string(buffer, received));
netstream->read_trigger.set_flag();
}
else // Send data to netpacket
{
int pos = netpacket.get_size();
netpacket.resize(pos + received);
memcpy(netpacket.get_data()+pos, buffer, received);
}
data_left -= received;
if (data_left == 0) // finished reading message data
{
if (!netstream)
{
CL_MutexSection mutex_section(&netsession->mutex);
netsession->received_netpacket(remote_atoms[dest_netpacket_atom], netpacket, this);
netpacket = CL_NetPacket();
}
netstream = 0;
}
}
else // hmm, new message.
{
unsigned char command = tcp_sock.input.read_uchar8();
switch (command)
{
case cmd_create_atom:
{
int atom_id = tcp_sock.input.read_ushort16();
std::string atom_str = tcp_sock.input.read_string();
remote_atoms[atom_id] = atom_str;
}
break;
case cmd_udp_info:
if (!server)
{
udp_connect_secret = tcp_sock.input.read_ushort16();
udp_address = CL_IPAddress(address.get_address(), server_port);
CL_NetPacket packet;
packet.output.write_uchar8(cmd_udp_connect);
packet.output.write_ushort16(udp_connect_secret);
netsession->udp_sock.send(packet.get_data(), packet.get_size(), udp_address);
// todo: keep sending this packet until a cmd_udp_connect is received
// now add computer to udp_ip_port_map list
netsession->udp_ip_port_map[udp_address] = this;
}
break;
case cmd_udp_connect:
{
udp_available = true;
}
break;
case cmd_netstream_connect:
{
std::map< std::string, CL_Signal_v1<CL_NetStream &> >::iterator it_connect;
std::string channel_name = remote_atoms[tcp_sock.input.read_ushort16()];
int channel_id = tcp_sock.input.read_ushort16();
CL_MutexSection mutex_section(&netsession->mutex);
it_connect = netsession->map_netstream_connect.find(channel_name);
if (it_connect == netsession->map_netstream_connect.end())
{
// Tried to connect to non-existant netstream.
mutex_section.leave();
send_stream_close(channel_id);
}
CL_NetStream_Generic *stream = new CL_NetStream_Generic(channel_id, this, netsession);
streams[channel_id] = stream;
CL_NetSession_Generic::NewStreamPair pair(stream, channel_name);
netsession->new_streams.push(pair);
}
break;
case cmd_netstream_msg:
{
std::map< int, CL_NetStream_Generic *>::iterator it_channel;
CL_MutexSection mutex_section(&mutex);
unsigned int channel_id = tcp_sock.input.read_ushort16();
data_left = tcp_sock.input.read_ushort16();
it_channel = streams.find(channel_id);
if (it_channel == streams.end())
{
// Tried to send data to non-existant netstream connection.
netstream = 0;
send_stream_close(channel_id);
}
else
{
netstream = it_channel->second;
}
}
break;
case cmd_netstream_closed:
{
std::map<int, CL_NetStream_Generic *>::iterator it_channel;
CL_MutexSection mutex_section(&mutex);
unsigned int channel_id = tcp_sock.input.read_ushort16();
it_channel = streams.find(channel_id);
if (it_channel != streams.end())
{
CL_MutexSection mutex_section(&it_channel->second->mutex);
it_channel->second->closed = true;
it_channel->second->read_trigger.set_flag();
streams.erase(it_channel);
}
}
break;
case cmd_netpacket_msg:
{
dest_netpacket_atom = tcp_sock.input.read_ushort16();
data_left = tcp_sock.input.read_ushort16();
}
break;
}
}
}
CL_MutexSection mutex_section(&mutex);
// if (tcp_sock.get_write_trigger()->get_flag()) // Can write data to socket
while (!send_tcp_queue.empty())
{
std::string &msg = send_tcp_queue.front();
int sent = tcp_sock.send(msg.substr(send_pos));
send_pos += sent;
if (send_pos != static_cast<int>(msg.size())) break;
send_tcp_queue.pop();
send_pos = 0;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -