📄 tcp_srv.cc
字号:
server->kill_server(); break; case REMOTE_CMS_CLOSE_CHANNEL_REQUEST_TYPE: client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head(); while (NULL != client_port_to_check) { if (client_port_to_check->socket_fd == _client_tcp_port->socket_fd) { break; } client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_next(); } FD_CLR(_client_tcp_port->socket_fd, &read_fd_set); close(_client_tcp_port->socket_fd); current_clients--; if (NULL != _client_tcp_port->subscriptions) { remove_subscription_client(_client_tcp_port, buffer_number); } _client_tcp_port->socket_fd = -1; delete _client_tcp_port; client_ports->delete_current_node(); break; case REMOTE_CMS_GET_KEYS_REQUEST_TYPE: server->get_keys_req.buffer_number = buffer_number; if (recvn(_client_tcp_port->socket_fd, server->get_keys_req.name, 16, 0, -1, NULL) < 0) { _client_tcp_port->errors++; return; } server->get_keys_reply = (REMOTE_GET_KEYS_REPLY *) server->process_request(&server-> get_keys_req); if (NULL == server->get_keys_reply) { rcs_print_error("Server could not process request.\n"); memset(temp_buffer, 0, 20); *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); server->gen_random_key(((char *) temp_buffer) + 4, 2); server->gen_random_key(((char *) temp_buffer) + 12, 2); sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout); return; } else { *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); memcpy(((char *) temp_buffer) + 4, server->get_keys_reply->key1, 8); memcpy(((char *) temp_buffer) + 12, server->get_keys_reply->key2, 8); /* successful ? */ sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout); return; } break; case REMOTE_CMS_LOGIN_REQUEST_TYPE: server->login_req.buffer_number = buffer_number; if (recvn(_client_tcp_port->socket_fd, server->login_req.name, 16, 0, -1, NULL) < 0) { _client_tcp_port->errors++; return; } if (recvn(_client_tcp_port->socket_fd, server->login_req.passwd, 16, 0, -1, NULL) < 0) { _client_tcp_port->errors++; return; } server->login_reply = (REMOTE_LOGIN_REPLY *) server->process_request(&server-> login_req); if (NULL == server->login_reply) { rcs_print_error("Server could not process request.\n"); *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl(0); /* not successful */ sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); return; } else { *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl(server->login_reply->success); /* successful ? */ sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); return; } break; case REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE: server->set_subscription_req.buffer_number = buffer_number; server->set_subscription_req.subscription_type = ntohl(*((u_long *) temp_buffer + 3)); server->set_subscription_req.poll_interval_millis = ntohl(*((u_long *) temp_buffer + 4)); server->set_subscription_reply = (REMOTE_SET_SUBSCRIPTION_REPLY *) server-> process_request(&server->set_subscription_req); if (NULL == server->set_subscription_reply) { rcs_print_error("Server could not process request.\n"); *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl(0); /* not successful */ sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); return; } else { if (server->set_subscription_reply->success) { if (server->set_subscription_req.subscription_type == CMS_POLLED_SUBSCRIPTION || server->set_subscription_req.subscription_type == CMS_VARIABLE_SUBSCRIPTION) { add_subscription_client(buffer_number, server->set_subscription_req. subscription_type, server->set_subscription_req. poll_interval_millis, _client_tcp_port); } if (server->set_subscription_req.subscription_type == CMS_NO_SUBSCRIPTION) { remove_subscription_client(_client_tcp_port, buffer_number); } } *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl(server->set_subscription_reply->success); /* successful ? */ sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); return; } break; default: _client_tcp_port->errors++; rcs_print_error("Unrecognized request type received.(%ld)\n", request_type); break; }}void CMS_SERVER_REMOTE_TCP_PORT::add_subscription_client(int buffer_number, int subscription_type, int poll_interval_millis, CLIENT_TCP_PORT * clnt){ if (NULL == subscription_buffers) { subscription_buffers = new LinkedList(); } if (NULL == subscription_buffers) { rcs_print_error("Can`t create subscription_buffers list.\n"); } TCP_BUFFER_SUBSCRIPTION_INFO *buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head(); while (NULL != buf_info) { if (buf_info->buffer_number == buffer_number) { break; } buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); } if (NULL == buf_info) { buf_info = new TCP_BUFFER_SUBSCRIPTION_INFO(); buf_info->buffer_number = buffer_number; buf_info->sub_clnt_info = new LinkedList(); buf_info->list_id = subscription_buffers->store_at_tail(buf_info, sizeof(*buf_info), 0); } buf_info->min_last_id = 0; if (NULL == clnt->subscriptions) { clnt->subscriptions = new LinkedList(); } TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head(); while (temp_clnt_info != NULL) { if (temp_clnt_info->buffer_number == buffer_number) { break; } temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next(); } if (NULL == temp_clnt_info) { temp_clnt_info = new TCP_CLIENT_SUBSCRIPTION_INFO(); temp_clnt_info->last_sub_sent_time = 0.0; temp_clnt_info->buffer_number = buffer_number; temp_clnt_info->subscription_paused = 0; temp_clnt_info->last_id_read = 0; temp_clnt_info->sub_buf_info = buf_info; temp_clnt_info->clnt_port = clnt; temp_clnt_info->last_sub_sent_time = etime(); temp_clnt_info->subscription_list_id = clnt->subscriptions->store_at_tail(temp_clnt_info, sizeof(*temp_clnt_info), 0); buf_info->sub_clnt_info->store_at_tail(temp_clnt_info, sizeof(*temp_clnt_info), 0); } temp_clnt_info->subscription_type = subscription_type; temp_clnt_info->poll_interval_millis = poll_interval_millis; recalculate_polling_interval();}void CMS_SERVER_REMOTE_TCP_PORT::remove_subscription_client(CLIENT_TCP_PORT * clnt, int buffer_number){ TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head(); while (temp_clnt_info != NULL) { if (temp_clnt_info->buffer_number == buffer_number) { if (NULL != temp_clnt_info->sub_buf_info) { if (NULL != temp_clnt_info->sub_buf_info->sub_clnt_info) { temp_clnt_info->sub_buf_info->sub_clnt_info-> delete_node(temp_clnt_info->subscription_list_id); if (temp_clnt_info->sub_buf_info->sub_clnt_info-> list_size == 0) { subscription_buffers->delete_node(temp_clnt_info-> sub_buf_info->list_id); delete temp_clnt_info->sub_buf_info->sub_clnt_info; temp_clnt_info->sub_buf_info->sub_clnt_info = NULL; delete temp_clnt_info->sub_buf_info; temp_clnt_info->sub_buf_info = NULL; } } } delete temp_clnt_info; temp_clnt_info = NULL; break; } temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next(); } recalculate_polling_interval();}void CMS_SERVER_REMOTE_TCP_PORT::recalculate_polling_interval(){ int min_poll_interval_millis = 30000; polling_enabled = 0; TCP_BUFFER_SUBSCRIPTION_INFO *buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head(); while (NULL != buf_info) { TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info-> get_head(); while (temp_clnt_info != NULL) { if (temp_clnt_info->poll_interval_millis < min_poll_interval_millis && temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION) { min_poll_interval_millis = temp_clnt_info->poll_interval_millis; polling_enabled = 1; } temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->get_next(); } buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); } if (min_poll_interval_millis >= ((int) (clk_tck() * 1000.0))) { current_poll_interval_millis = min_poll_interval_millis; } else { current_poll_interval_millis = ((int) (clk_tck() * 1000.0)); } select_timeout.tv_sec = current_poll_interval_millis / 1000; select_timeout.tv_usec = (current_poll_interval_millis % 1000) * 1000; dtimeout = (current_poll_interval_millis + 10) * 1000.0; if (dtimeout < 0.5) { dtimeout = 0.5; }}void CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions(){ pid_t pid = getpid(); pid_t tid = 0; CMS_SERVER *server; server = find_server(pid, tid); if (NULL == server) { rcs_print_error ("CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions Cannot find server object for pid = %d.\n", pid); return; } if (NULL == subscription_buffers) { return; } double cur_time = etime(); TCP_BUFFER_SUBSCRIPTION_INFO *buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head(); while (NULL != buf_info) { server->read_req.buffer_number = buf_info->buffer_number; server->read_req.access_type = CMS_READ_ACCESS; server->read_req.last_id_read = buf_info->min_last_id; server->read_reply = (REMOTE_READ_REPLY *) server->process_request(&server->read_req); if (NULL == server->read_reply) { rcs_print_error("Server could not process request.\n"); buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); continue; } if (server->read_reply->write_id == buf_info->min_last_id || server->read_reply->size < 1) { buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); continue; } *((u_long *) temp_buffer) = 0; *((u_long *) temp_buffer + 1) = htonl(server->read_reply->status); *((u_long *) temp_buffer + 2) = htonl(server->read_reply->size); *((u_long *) temp_buffer + 3) = htonl(server->read_reply->write_id); *((u_long *) temp_buffer + 4) = htonl(server->read_reply->was_read); TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info-> get_head(); buf_info->min_last_id = server->read_reply->write_id; while (temp_clnt_info != NULL) { double time_diff = cur_time - temp_clnt_info->last_sub_sent_time; int time_diff_millis = (int) ((double) time_diff * 1000.0); rcs_print_debug(PRINT_SERVER_SUBSCRIPTION_ACTIVITY, "Subscription time_diff_millis=%d\n", time_diff_millis); if (((temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION && time_diff_millis + 10 >= temp_clnt_info->poll_interval_millis) || temp_clnt_info->subscription_type == CMS_VARIABLE_SUBSCRIPTION) && temp_clnt_info->last_id_read != server->read_reply->write_id) { temp_clnt_info->last_id_read = server->read_reply->write_id; temp_clnt_info->last_sub_sent_time = cur_time; temp_clnt_info->clnt_port->serial_number++; *((u_long *) temp_buffer) = htonl(temp_clnt_info->clnt_port->serial_number); if (server->read_reply->size < 0x2000 - 20 && server->read_reply->size > 0) { memcpy(temp_buffer + 20, server->read_reply->data, server->read_reply->size); if (sendn (temp_clnt_info->clnt_port->socket_fd, temp_buffer, 20 + server->read_reply->size, 0, dtimeout) < 0) { temp_clnt_info->clnt_port->errors++; return; } } else { if (sendn(temp_clnt_info->clnt_port->socket_fd, temp_buffer, 20, 0, dtimeout) < 0) { temp_clnt_info->clnt_port->errors++; return; } if (server->read_reply->size > 0) { if (sendn(temp_clnt_info->clnt_port->socket_fd, server->read_reply->data, server->read_reply->size, 0, dtimeout) < 0) { temp_clnt_info->clnt_port->errors++; return; } } } } if (temp_clnt_info->last_id_read < buf_info->min_last_id) { buf_info->min_last_id = temp_clnt_info->last_id_read; } temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->get_next(); } buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); }}TCP_BUFFER_SUBSCRIPTION_INFO::TCP_BUFFER_SUBSCRIPTION_INFO(){ buffer_number = -1; min_last_id = 0; list_id = -1; sub_clnt_info = NULL;}TCP_BUFFER_SUBSCRIPTION_INFO::~TCP_BUFFER_SUBSCRIPTION_INFO(){ buffer_number = -1; min_last_id = 0; list_id = -1; if (NULL != sub_clnt_info) { delete sub_clnt_info; sub_clnt_info = NULL; }}TCP_CLIENT_SUBSCRIPTION_INFO::TCP_CLIENT_SUBSCRIPTION_INFO(){ subscription_type = CMS_NO_SUBSCRIPTION; poll_interval_millis = 30000; last_sub_sent_time = 0.0; subscription_list_id = -1; buffer_number = -1; subscription_paused = 0; last_id_read = 0; sub_buf_info = NULL; clnt_port = NULL;}TCP_CLIENT_SUBSCRIPTION_INFO::~TCP_CLIENT_SUBSCRIPTION_INFO(){ subscription_type = CMS_NO_SUBSCRIPTION; poll_interval_millis = 30000; last_sub_sent_time = 0.0; subscription_list_id = -1; buffer_number = -1; subscription_paused = 0; last_id_read = 0; sub_buf_info = NULL; clnt_port = NULL;}CLIENT_TCP_PORT::CLIENT_TCP_PORT(){ serial_number = 0; errors = 0; max_errors = 50; address.sin_port = 0; address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); socket_fd = -1; subscriptions = NULL; tid = -1; pid = -1; blocking_read_req = NULL; threadId = 0; diag_info = NULL;}CLIENT_TCP_PORT::~CLIENT_TCP_PORT(){ if (socket_fd > 0) { close(socket_fd); socket_fd = -1; } if (NULL != subscriptions) { TCP_CLIENT_SUBSCRIPTION_INFO *sub_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_head(); while (NULL != sub_info) { delete sub_info; sub_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_next(); } delete subscriptions; subscriptions = NULL; }#ifdef NO_THREADS if (NULL != blocking_read_req) { delete blocking_read_req; blocking_read_req = NULL; }#endif if (NULL != diag_info) { delete diag_info; diag_info = NULL; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -