📄 tcp_srv.cc
字号:
ready_descriptors--; socklen_t client_address_length; new_client_port = new CLIENT_TCP_PORT(); client_address_length = sizeof(new_client_port->address); new_client_port->socket_fd = accept(connection_socket, (struct sockaddr *) &new_client_port->address, &client_address_length); current_clients++; if (current_clients > max_clients) { max_clients = current_clients; } if (new_client_port->socket_fd < 0) { rcs_print_error("server: accept error -- %d %s \n", errno, strerror(errno)); continue; } rcs_print_debug(PRINT_SOCKET_CONNECT, "Socket opened by host with IP address %s.\n", inet_ntoa(new_client_port->address.sin_addr)); new_client_port->serial_number = 0; new_client_port->blocking = 0; if (NULL != client_ports) { client_ports->store_at_tail(new_client_port, sizeof(new_client_port), 0); } if (maxfdpl < new_client_port->socket_fd + 1) { maxfdpl = new_client_port->socket_fd + 1; } FD_SET(new_client_port->socket_fd, &read_fd_set); } else { FD_SET(connection_socket, &read_fd_set); } if (0 != ready_descriptors) { rcs_print_error("%d descriptors ready but not serviced.\n", ready_descriptors); } update_subscriptions(); }}static int tcpsvr_handle_blocking_request_sigint_count = 0;static int tcpsvr_last_sig = 0;void tcpsvr_handle_blocking_request_sigint_handler(int sig){ tcpsvr_last_sig = sig; tcpsvr_handle_blocking_request_sigint_count++;}#if defined(POSIX_THREADS) || defined(NO_THREADS)void *tcpsvr_handle_blocking_request(void *_req){ signal(SIGINT, tcpsvr_handle_blocking_request_sigint_handler); TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req = (TCPSVR_BLOCKING_READ_REQUEST *) _req; char temp_buffer[0x2000]; if (_req == NULL) { tcpsvr_threads_returned_early++; return 0; } double dtimeout = ((double) (blocking_read_req->timeout_millis + 10)) / 1000.0; if (dtimeout < 0) { dtimeout = 600.0; } if (dtimeout < 0.5) { dtimeout = 0.5; } if (dtimeout > 600.0) { dtimeout = 600.0; } CLIENT_TCP_PORT *_client_tcp_port = blocking_read_req->_client_tcp_port; CMS_SERVER *server = blocking_read_req->server; if (NULL == server || NULL == _client_tcp_port) { tcpsvr_threads_returned_early++; return 0; } memset(temp_buffer, 0, 0x2000); REMOTE_BLOCKING_READ_REPLY *read_reply; if (NULL != _client_tcp_port->diag_info) { _client_tcp_port->diag_info->buffer_number = blocking_read_req->buffer_number; server->set_diag_info(_client_tcp_port->diag_info); } else if (server->diag_enabled) { server->reset_diag_info(blocking_read_req->buffer_number); } read_reply = (REMOTE_BLOCKING_READ_REPLY *) server->process_request(blocking_read_req); blocking_read_req->read_reply = read_reply; if (NULL == read_reply) { _client_tcp_port->blocking = 0; rcs_print_error("Server could not process request.\n"); *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl((unsigned long) CMS_SERVER_SIDE_ERROR); *((u_long *) temp_buffer + 2) = htonl(0); /* size */ *((u_long *) temp_buffer + 3) = htonl(0); /* write_id */ *((u_long *) temp_buffer + 4) = htonl(0); /* was_read */ sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout); _client_tcp_port->errors++; _client_tcp_port->blocking_read_req = NULL; delete blocking_read_req; _client_tcp_port->threadId = 0; tcpsvr_threads_returned_early++; return 0; } *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl(read_reply->status); *((u_long *) temp_buffer + 2) = htonl(read_reply->size); *((u_long *) temp_buffer + 3) = htonl(read_reply->write_id); *((u_long *) temp_buffer + 4) = htonl(read_reply->was_read); if (read_reply->size < (0x2000 - 20) && read_reply->size > 0) { memcpy(temp_buffer + 20, read_reply->data, read_reply->size); _client_tcp_port->blocking = 0; if (sendn (_client_tcp_port->socket_fd, temp_buffer, 20 + read_reply->size, 0, dtimeout) < 0) { _client_tcp_port->blocking = 0; _client_tcp_port->errors++; _client_tcp_port->blocking_read_req = NULL; delete blocking_read_req; _client_tcp_port->threadId = 0; tcpsvr_threads_returned_early++; return 0; } } else { _client_tcp_port->blocking = 0; if (sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout) < 0) { _client_tcp_port->blocking = 0; _client_tcp_port->errors++; _client_tcp_port->blocking_read_req = NULL; delete blocking_read_req; _client_tcp_port->threadId = 0; tcpsvr_threads_returned_early++; return 0; } if (read_reply->size > 0) { if (sendn (_client_tcp_port->socket_fd, read_reply->data, read_reply->size, 0, dtimeout) < 0) { _client_tcp_port->blocking = 0; _client_tcp_port->errors++; _client_tcp_port->blocking_read_req = NULL; delete blocking_read_req; _client_tcp_port->threadId = 0; tcpsvr_threads_returned_early++; return 0; } } } _client_tcp_port->blocking_read_req = NULL; delete blocking_read_req; _client_tcp_port->threadId = 0; tcpsvr_threads_exited++;#ifdef POSIX_THREADS pthread_exit(0);#endif#ifdef NO_THREADS exit(0);#endif}#endifvoid CMS_SERVER_REMOTE_TCP_PORT::handle_request(CLIENT_TCP_PORT * _client_tcp_port){ CLIENT_TCP_PORT *client_port_to_check = NULL; pid_t pid = getpid(); pid_t tid = 0; CMS_SERVER *server; server = find_server(pid, tid); if (NULL == server) { rcs_print_error ("CMS_SERVER_REMOTE_TCP_PORT::handle_request() Cannot find server object for pid = %d.\n", pid); return; } if (server->using_passwd_file) { current_user_info = get_connected_user(_client_tcp_port->socket_fd); } if (_client_tcp_port->errors >= _client_tcp_port->max_errors) { rcs_print_error("Too many errors - closing connection(%d)\n", _client_tcp_port->socket_fd); client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head(); while (NULL != client_port_to_check) { if (client_port_to_check->socket_fd == _client_tcp_port->socket_fd) { delete client_port_to_check; client_ports->delete_current_node(); } client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_next(); } close(_client_tcp_port->socket_fd); current_clients--; FD_CLR(_client_tcp_port->socket_fd, &read_fd_set); _client_tcp_port->socket_fd = -1; } if (recvn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, -1, NULL) < 0) { rcs_print_error("Can not read from client port (%d) from %s\n", _client_tcp_port->socket_fd, inet_ntoa(_client_tcp_port->address.sin_addr)); _client_tcp_port->errors++; return; } long request_type, buffer_number, received_serial_number; received_serial_number = ntohl(*((u_long *) temp_buffer)); if (received_serial_number != _client_tcp_port->serial_number) { rcs_print_error ("received_serial_number (%d) does not equal expected serial number.(%d)\n", received_serial_number, _client_tcp_port->serial_number); _client_tcp_port->serial_number = received_serial_number; _client_tcp_port->errors++; } _client_tcp_port->serial_number++; request_type = ntohl(*((u_long *) temp_buffer + 1)); buffer_number = ntohl(*((u_long *) temp_buffer + 2)); rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPSVR request recieved: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n", _client_tcp_port->socket_fd, _client_tcp_port->serial_number, request_type, buffer_number); if (NULL != _client_tcp_port->diag_info) { _client_tcp_port->diag_info->buffer_number = buffer_number; server->set_diag_info(_client_tcp_port->diag_info); } else if (server->diag_enabled) { server->reset_diag_info(buffer_number); } switch_function(_client_tcp_port, server, request_type, buffer_number, received_serial_number); if (NULL != _client_tcp_port->diag_info && NULL != server->last_local_port_used && server->diag_enabled) { if (NULL != server->last_local_port_used->cms) { if (NULL != server->last_local_port_used->cms->handle_to_global_data) { _client_tcp_port->diag_info->bytes_moved = server->last_local_port_used->cms->handle_to_global_data-> total_bytes_moved; } } }}void CMS_SERVER_REMOTE_TCP_PORT::switch_function(CLIENT_TCP_PORT * _client_tcp_port, CMS_SERVER * server, long request_type, long buffer_number, long received_serial_number){ int total_subdivisions = 1; CLIENT_TCP_PORT *client_port_to_check = NULL; switch (request_type) { case REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE: { if (NULL == _client_tcp_port->diag_info) { _client_tcp_port->diag_info = new REMOTE_SET_DIAG_INFO_REQUEST(); } if (recvn (_client_tcp_port->socket_fd, server->set_diag_info_buf, 68, 0, -1, NULL) < 0) { rcs_print_error ("Can not read from client port (%d) from %s\n", _client_tcp_port->socket_fd, inet_ntoa(_client_tcp_port->address.sin_addr)); _client_tcp_port->errors++; return; } _client_tcp_port->diag_info->bytes_moved = 0.0; _client_tcp_port->diag_info->buffer_number = buffer_number; memcpy(_client_tcp_port->diag_info->process_name, server->set_diag_info_buf, 16); memcpy(_client_tcp_port->diag_info->host_sysinfo, server->set_diag_info_buf + 16, 32); _client_tcp_port->diag_info->pid = htonl(*((u_long *) (server->set_diag_info_buf + 48))); _client_tcp_port->diag_info->c_num = htonl(*((u_long *) (server->set_diag_info_buf + 52))); memcpy(&(_client_tcp_port->diag_info->rcslib_ver), server->set_diag_info_buf + 56, 8); _client_tcp_port->diag_info->reverse_flag = *((int *) ((char *) server->set_diag_info_buf + 64)); if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { _client_tcp_port->diag_info->rcslib_ver = (double) tcp_svr_reverse_double((double) _client_tcp_port->diag_info->rcslib_ver); } } break; case REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE: { REMOTE_GET_DIAG_INFO_REQUEST diagreq; diagreq.buffer_number = buffer_number; REMOTE_GET_DIAG_INFO_REPLY *diagreply = NULL; diagreply = (REMOTE_GET_DIAG_INFO_REPLY *) server-> process_request(&diagreq); if (NULL == diagreply) { *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl((unsigned long) CMS_SERVER_SIDE_ERROR); if (sendn (_client_tcp_port->socket_fd, temp_buffer, 24, 0, dtimeout) < 0) { _client_tcp_port->errors++; } return; } if (NULL == diagreply->cdi) { *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl((unsigned long) CMS_SERVER_SIDE_ERROR); if (sendn (_client_tcp_port->socket_fd, temp_buffer, 24, 0, dtimeout) < 0) { _client_tcp_port->errors++; } return; } memset(temp_buffer, 0, 0x2000); unsigned long dpi_offset = 32; *((u_long *) temp_buffer) = htonl(_client_tcp_port->serial_number); *((u_long *) temp_buffer + 1) = htonl(diagreply->status); *((u_long *) temp_buffer + 2) = htonl(diagreply->cdi->last_writer); *((u_long *) temp_buffer + 3) = htonl(diagreply->cdi->last_reader); double curtime = etime(); double reversed_temp = 0.0; if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { reversed_temp = (double) tcp_svr_reverse_double((double) curtime); memcpy(temp_buffer + 16, &reversed_temp, 8); } else { memcpy(temp_buffer + 16, &(curtime), 8); } int dpi_count = 0; if (NULL != diagreply->cdi->dpis) { CMS_DIAG_PROC_INFO *dpi = (CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis->get_head(); while ((dpi_offset < ((int) 0x2000 - sizeof(CMS_DIAG_PROC_INFO))) && dpi != NULL) { dpi_count++; memcpy(temp_buffer + dpi_offset, dpi->name, 16); dpi_offset += 16; memcpy(temp_buffer + dpi_offset, dpi->host_sysinfo, 32); dpi_offset += 32; *((u_long *) ((char *) temp_buffer + dpi_offset)) = htonl(dpi->pid); dpi_offset += 4; if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { reversed_temp = (double) tcp_svr_reverse_double((double) dpi->rcslib_ver); memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); } else { memcpy(temp_buffer + dpi_offset, &(dpi->rcslib_ver), 8); } dpi_offset += 8; *((u_long *) ((char *) temp_buffer + dpi_offset)) = htonl(dpi->access_type); dpi_offset += 4; *((u_long *) ((char *) temp_buffer + dpi_offset)) = htonl(dpi->msg_id); dpi_offset += 4; *((u_long *) ((char *) temp_buffer + dpi_offset)) = htonl(dpi->msg_size); dpi_offset += 4; *((u_long *) ((char *) temp_buffer + dpi_offset)) = htonl(dpi->msg_type); dpi_offset += 4; *((u_long *) ((char *) temp_buffer + dpi_offset)) = htonl(dpi->number_of_accesses); dpi_offset += 4; *((u_long *) ((char *) temp_buffer + dpi_offset)) = htonl(dpi->number_of_new_messages); dpi_offset += 4; if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { reversed_temp = (double) tcp_svr_reverse_double((double) dpi->bytes_moved); memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); } else { memcpy(temp_buffer + dpi_offset, &(dpi->bytes_moved), 8); } dpi_offset += 8; if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { reversed_temp = (double) tcp_svr_reverse_double((double) dpi->bytes_moved_across_socket); memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); } else { memcpy(temp_buffer + dpi_offset, &(dpi->bytes_moved_across_socket), 8); } dpi_offset += 8; if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { reversed_temp = (double) tcp_svr_reverse_double((double) dpi->last_access_time); memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); } else { memcpy(temp_buffer + dpi_offset, &(dpi->last_access_time), 8); } dpi_offset += 8; if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { reversed_temp = (double) tcp_svr_reverse_double((double) dpi->first_access_time); memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); } else { memcpy(temp_buffer + dpi_offset, &(dpi->first_access_time), 8); } dpi_offset += 8; if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { reversed_temp = (double) tcp_svr_reverse_double((double) dpi->min_difference); memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); } else { memcpy(temp_buffer + dpi_offset, &(dpi->min_difference), 8); } dpi_offset += 8; if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { reversed_temp = (double) tcp_svr_reverse_double((double) dpi->max_difference); memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); } else { memcpy(temp_buffer + dpi_offset, &(dpi->max_difference), 8); } dpi_offset += 8; int is_last_writer = (dpi == diagreply->cdi->last_writer_dpi);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -