📄 tcp_srv.cc
字号:
/********************************************************************* Description: tcp_srv.cc** Derived from a work by Fred Proctor & Will Shackleford** Author:* License: LGPL Version 2* System: Linux* * Copyright (c) 2004 All rights reserved.** Last change: * $Revision: 1.4 $* $Author: paul_c $* $Date: 2005/05/23 16:34:12 $********************************************************************//***************************************************************************** File: tcp_srv.cc* Purpose: Provides the functions for the class CMS_SERVER_REMOTE_TCP_PORT* which provides TCP specific overrides of the CMS_SERVER_REMOTE_PORT class.****************************************************************************/#ifdef __cplusplusextern "C" {#endif#include <string.h> /* memset(), strerror() */#include <stdlib.h> // malloc(), free()#include <unistd.h>#include <sys/socket.h>#include <sys/ioctl.h>#include <errno.h> /* errno */#include <signal.h> // SIGPIPE, signal()#ifdef __cplusplus}#endif#include <sys/types.h>#include <sys/wait.h> // waitpid#include <arpa/inet.h> /* inet_ntoa */#include "cms.hh" /* class CMS */#include "nml.hh" // class NML#include "tcp_srv.hh" /* class CMS_SERVER_REMOTE_TCP_PORT */#include "rcs_print.hh" /* rcs_print_error() */#include "linklist.hh" /* class LinkedList */#include "tcp_opts.hh" /* SET_TCP_NODELAY */#include "timer.hh" // esleep()#include "_timer.h"#include "cmsdiag.hh" // class CMS_DIAGNOSTICS_INFOextern "C" {#include "recvn.h" /* recvn() */#include "sendn.h" /* sendn() */}int tcpsvr_threads_created = 0;int tcpsvr_threads_killed = 0;int tcpsvr_threads_exited = 0;int tcpsvr_threads_returned_early = 0;TCPSVR_BLOCKING_READ_REQUEST::TCPSVR_BLOCKING_READ_REQUEST(){ access_type = CMS_READ_ACCESS; /* read or just peek */ last_id_read = 0; /* The server can compare with id from buffer */ /* to determine if the buffer is new */ /* to this client */ timeout_millis = -1; /* Milliseconds for blocking_timeout or -1 to wait forever */ _client_tcp_port = NULL; remport = NULL; server = NULL; _nml = NULL; _reply = NULL; _data = NULL; read_reply = NULL;}static inline double tcp_svr_reverse_double(double in){ double out; char *c1, *c2; c1 = ((char *) &in) + 7; c2 = (char *) &out; for (int i = 0; i < 8; i++) { *c2 = *c1; c1--; c2++; } return out;}TCPSVR_BLOCKING_READ_REQUEST::~TCPSVR_BLOCKING_READ_REQUEST(){ if (NULL != _nml) { NML *nmlcopy = (NML *) _nml; _nml = NULL; delete nmlcopy; } if (NULL != _data) { void *_datacopy = _data; if (NULL != read_reply) { if (_data == read_reply->data) { read_reply->data = NULL; } } _data = NULL; free(_datacopy); } if (NULL != _reply) { free(_reply); _reply = NULL; read_reply = NULL; } if (NULL != read_reply) { if (NULL != read_reply->data) { free(read_reply->data); read_reply->data = NULL; } delete read_reply; read_reply = NULL; }}CMS_SERVER_REMOTE_TCP_PORT::CMS_SERVER_REMOTE_TCP_PORT(CMS_SERVER * _cms_server):CMS_SERVER_REMOTE_PORT(_cms_server){ client_ports = (LinkedList *) NULL; connection_socket = 0; connection_port = 0; maxfdpl = 0; dtimeout = 20.0; memset(&server_socket_address, 0, sizeof(server_socket_address)); server_socket_address.sin_family = AF_INET; server_socket_address.sin_addr.s_addr = htonl(INADDR_ANY); server_socket_address.sin_port = 0; client_ports = new LinkedList; if (NULL == client_ports) { rcs_print_error("Can not create linked list for client ports.\n"); return; } polling_enabled = 0; memset(&select_timeout, 0, sizeof(select_timeout)); select_timeout.tv_sec = 30; select_timeout.tv_usec = 30; subscription_buffers = NULL; current_poll_interval_millis = 30000; memset(&read_fd_set, 0, sizeof(read_fd_set)); memset(&write_fd_set, 0, sizeof(write_fd_set));}CMS_SERVER_REMOTE_TCP_PORT::~CMS_SERVER_REMOTE_TCP_PORT(){ unregister_port(); if (NULL != client_ports) { delete client_ports; client_ports = (LinkedList *) NULL; }}void blocking_thread_kill(long int id){ if (id <= 0) { return; }#ifdef POSIX_THREADS pthread_kill(id, SIGINT); pthread_join(id, NULL);#endif#ifdef NO_THREADS kill(id, SIGINT); waitpid(id, NULL, 0);#endif tcpsvr_threads_killed++;}void CMS_SERVER_REMOTE_TCP_PORT::unregister_port(){ CLIENT_TCP_PORT *client; int number_of_connected_clients = 0; client = (CLIENT_TCP_PORT *) client_ports->get_head(); while (NULL != client) { rcs_print("Exiting even though client on %s is still connected.\n", inet_ntoa(client->address.sin_addr)); client = (CLIENT_TCP_PORT *) client_ports->get_next(); number_of_connected_clients++; } client = (CLIENT_TCP_PORT *) client_ports->get_head(); while (NULL != client) { delete client; client_ports->delete_current_node(); client = (CLIENT_TCP_PORT *) client_ports->get_next(); } if (NULL != subscription_buffers) { TCP_BUFFER_SUBSCRIPTION_INFO *sub_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head(); while (NULL != sub_info) { delete sub_info; sub_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); } delete subscription_buffers; subscription_buffers = NULL; } if (number_of_connected_clients > 0) { esleep(2.0); } if (connection_socket > 0) { close(connection_socket); connection_socket = 0; }}int CMS_SERVER_REMOTE_TCP_PORT::accept_local_port_cms(CMS * _cms){ if (NULL == _cms) { return 0; } if (_cms->remote_port_type != CMS_TCP_REMOTE_PORT_TYPE) { return 0; } if (NULL != _cms) { if (min_compatible_version < 1e-6 || (min_compatible_version > _cms->min_compatible_version && _cms->min_compatible_version > 1e-6)) { min_compatible_version = _cms->min_compatible_version; } if (_cms->confirm_write) { confirm_write = _cms->confirm_write; } } if (_cms->total_subdivisions > max_total_subdivisions) { max_total_subdivisions = _cms->total_subdivisions; } if (server_socket_address.sin_port == 0) { server_socket_address.sin_port = htons(((u_short) _cms->tcp_port_number)); port_num = _cms->tcp_port_number; return 1; } if (server_socket_address.sin_port == htons(((u_short) _cms->tcp_port_number))) { port_num = _cms->tcp_port_number; return 1; } return 0;}void CMS_SERVER_REMOTE_TCP_PORT::register_port(){ port_registered = 0; rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Registering server on TCP port %d.\n", ntohs(server_socket_address.sin_port)); if (server_socket_address.sin_port == 0) { rcs_print_error("server can not register on port number 0.\n"); return; } if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { rcs_print_error("socket error: %d -- %s\n", errno, strerror(errno)); rcs_print_error("Server can not open stream socket.\n"); return; } if (set_tcp_socket_options(connection_socket) < 0) { return; } if (bind(connection_socket, (struct sockaddr *) &server_socket_address, sizeof(server_socket_address)) < 0) { rcs_print_error("bind error: %d -- %s\n", errno, strerror(errno)); rcs_print_error ("Server can not bind the connection socket on port %d.\n", ntohs(server_socket_address.sin_port)); return; } if (listen(connection_socket, 5) < 0) { rcs_print_error("listen error: %d -- %s\n", errno, strerror(errno)); rcs_print_error("TCP Server: error on call to listen for port %d.\n", ntohs(server_socket_address.sin_port)); return; } port_registered = 1;}static int last_pipe_signum = 0;static void handle_pipe_error(int signum){ last_pipe_signum = signum; rcs_print_error("SIGPIPE intercepted.\n");}void CMS_SERVER_REMOTE_TCP_PORT::run(){ unsigned long bytes_ready; int ready_descriptors; if (NULL == client_ports) { rcs_print_error("CMS_SERVER: List of client ports is NULL.\n"); return; } CLIENT_TCP_PORT *new_client_port, *client_port_to_check; FD_ZERO(&read_fd_set); FD_ZERO(&write_fd_set); FD_SET(connection_socket, &read_fd_set); maxfdpl = connection_socket + 1; signal(SIGPIPE, handle_pipe_error); rcs_print_debug(PRINT_CMS_CONFIG_INFO, "running server for TCP port %d (connection_socket = %d).\n", ntohs(server_socket_address.sin_port), connection_socket); cms_server_count++; fd_set read_fd_set_copy, write_fd_set_copy; FD_ZERO(&read_fd_set_copy); FD_ZERO(&write_fd_set_copy); FD_SET(connection_socket, &read_fd_set_copy); while (1) { if (polling_enabled) { memcpy(&read_fd_set_copy, &read_fd_set, sizeof(fd_set)); memcpy(&write_fd_set_copy, &write_fd_set, sizeof(fd_set)); select_timeout.tv_sec = current_poll_interval_millis / 1000; select_timeout.tv_usec = (current_poll_interval_millis % 1000) * 1000; ready_descriptors = select(maxfdpl, &read_fd_set, &write_fd_set, (fd_set *) NULL, (timeval *) & select_timeout); if (ready_descriptors == 0) { update_subscriptions(); memcpy(&read_fd_set, &read_fd_set_copy, sizeof(fd_set)); memcpy(&write_fd_set, &write_fd_set_copy, sizeof(fd_set)); continue; } } else { ready_descriptors = select(maxfdpl, &read_fd_set, &write_fd_set, (fd_set *) NULL, (timeval *) NULL); } if (ready_descriptors < 0) { rcs_print_error("server: select error.(errno = %d | %s)\n", errno, strerror(errno)); } if (NULL == client_ports) { rcs_print_error("CMS_SERVER: List of client ports is NULL.\n"); return; } client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head(); while (NULL != client_port_to_check) { if (FD_ISSET(client_port_to_check->socket_fd, &read_fd_set)) { ioctl(client_port_to_check->socket_fd, FIONREAD, (caddr_t) & bytes_ready); if (bytes_ready <= 0) { rcs_print_debug(PRINT_SOCKET_CONNECT, "Socket closed by host with IP address %s.\n", inet_ntoa(client_port_to_check->address.sin_addr)); if (NULL != client_port_to_check->subscriptions) { TCP_CLIENT_SUBSCRIPTION_INFO *clnt_sub_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) client_port_to_check->subscriptions->get_head(); while (NULL != clnt_sub_info) { if (NULL != clnt_sub_info->sub_buf_info && clnt_sub_info->subscription_list_id >= 0) { if (NULL != clnt_sub_info->sub_buf_info-> sub_clnt_info) { clnt_sub_info->sub_buf_info-> sub_clnt_info-> delete_node(clnt_sub_info-> subscription_list_id); if (clnt_sub_info->sub_buf_info-> sub_clnt_info->list_size < 1) { delete clnt_sub_info->sub_buf_info-> sub_clnt_info; clnt_sub_info->sub_buf_info-> sub_clnt_info = NULL; if (NULL != subscription_buffers && clnt_sub_info->sub_buf_info-> list_id >= 0) { subscription_buffers-> delete_node(clnt_sub_info-> sub_buf_info->list_id); delete clnt_sub_info-> sub_buf_info; clnt_sub_info->sub_buf_info = NULL; } } clnt_sub_info->sub_buf_info = NULL; } delete clnt_sub_info; clnt_sub_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) client_port_to_check->subscriptions-> get_next(); } delete client_port_to_check->subscriptions; client_port_to_check->subscriptions = NULL; recalculate_polling_interval(); } } if (client_port_to_check->threadId > 0 && client_port_to_check->blocking) { blocking_thread_kill(client_port_to_check->threadId); } close(client_port_to_check->socket_fd); FD_CLR(client_port_to_check->socket_fd, &read_fd_set); client_port_to_check->socket_fd = -1; delete client_port_to_check; client_ports->delete_current_node(); } else { if (client_port_to_check->blocking) { if (client_port_to_check->threadId > 0) { rcs_print_debug(PRINT_SERVER_THREAD_ACTIVITY, "Data recieved from %s:%d when it should be blocking (bytes_ready=%d).\n", inet_ntoa (client_port_to_check->address. sin_addr), client_port_to_check->socket_fd, bytes_ready); rcs_print_debug(PRINT_SERVER_THREAD_ACTIVITY, "Killing handler %d.\n", client_port_to_check->threadId); blocking_thread_kill (client_port_to_check->threadId);#if 0 *((u_long *) temp_buffer) = htonl(client_port_to_check->serial_number); *((u_long *) temp_buffer + 1) = htonl((unsigned long) CMS_SERVER_SIDE_ERROR); *((u_long *) temp_buffer + 2) = htonl(0); /* size */ *((u_long *) temp_buffer + 3) = htonl(0); /* write_id */ *((u_long *) temp_buffer + 4) = htonl(0); /* was_read */ sendn(client_port_to_check->socket_fd, temp_buffer, 20, 0, dtimeout);#endif client_port_to_check->threadId = 0; client_port_to_check->blocking = 0; } } handle_request(client_port_to_check); } ready_descriptors--; } else { FD_SET(client_port_to_check->socket_fd, &read_fd_set); } client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_next(); } if (FD_ISSET(connection_socket, &read_fd_set) && ready_descriptors > 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -