⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp_srv.cc

📁 Source code for an Numeric Cmputer
💻 CC
📖 第 1 页 / 共 4 页
字号:
/********************************************************************* Description: tcp_srv.cc**   Derived from a work by Fred Proctor & Will Shackleford** Author:* License: LGPL Version 2* System: Linux*    * Copyright (c) 2004 All rights reserved.** Last change: * $Revision: 1.4 $* $Author: paul_c $* $Date: 2005/05/23 16:34:12 $********************************************************************//***************************************************************************** File: tcp_srv.cc* Purpose: Provides the functions for the class CMS_SERVER_REMOTE_TCP_PORT*  which provides TCP specific overrides of the CMS_SERVER_REMOTE_PORT class.****************************************************************************/#ifdef __cplusplusextern "C" {#endif#include <string.h>		/* memset(), strerror() */#include <stdlib.h>		// malloc(), free()#include <unistd.h>#include <sys/socket.h>#include <sys/ioctl.h>#include <errno.h>		/* errno */#include <signal.h>		// SIGPIPE, signal()#ifdef __cplusplus}#endif#include <sys/types.h>#include <sys/wait.h>		// waitpid#include <arpa/inet.h>		/* inet_ntoa */#include "cms.hh"		/* class CMS */#include "nml.hh"		// class NML#include "tcp_srv.hh"		/* class CMS_SERVER_REMOTE_TCP_PORT */#include "rcs_print.hh"		/* rcs_print_error() */#include "linklist.hh"		/* class LinkedList */#include "tcp_opts.hh"		/* SET_TCP_NODELAY */#include "timer.hh"		// esleep()#include "_timer.h"#include "cmsdiag.hh"		// class CMS_DIAGNOSTICS_INFOextern "C" {#include "recvn.h"		/* recvn() */#include "sendn.h"		/* sendn() */}int tcpsvr_threads_created = 0;int tcpsvr_threads_killed = 0;int tcpsvr_threads_exited = 0;int tcpsvr_threads_returned_early = 0;TCPSVR_BLOCKING_READ_REQUEST::TCPSVR_BLOCKING_READ_REQUEST(){    access_type = CMS_READ_ACCESS;	/* read or just peek */    last_id_read = 0;		/* The server can compare with id from buffer 				 */    /* to determine if the buffer is new */    /* to this client */    timeout_millis = -1;	/* Milliseconds for blocking_timeout or -1 to 				   wait forever */    _client_tcp_port = NULL;    remport = NULL;    server = NULL;    _nml = NULL;    _reply = NULL;    _data = NULL;    read_reply = NULL;}static inline double tcp_svr_reverse_double(double in){    double out;    char *c1, *c2;    c1 = ((char *) &in) + 7;    c2 = (char *) &out;    for (int i = 0; i < 8; i++) {	*c2 = *c1;	c1--;	c2++;    }    return out;}TCPSVR_BLOCKING_READ_REQUEST::~TCPSVR_BLOCKING_READ_REQUEST(){    if (NULL != _nml) {	NML *nmlcopy = (NML *) _nml;	_nml = NULL;	delete nmlcopy;    }    if (NULL != _data) {	void *_datacopy = _data;	if (NULL != read_reply) {	    if (_data == read_reply->data) {		read_reply->data = NULL;	    }	}	_data = NULL;	free(_datacopy);    }    if (NULL != _reply) {	free(_reply);	_reply = NULL;	read_reply = NULL;    }    if (NULL != read_reply) {	if (NULL != read_reply->data) {	    free(read_reply->data);	    read_reply->data = NULL;	}	delete read_reply;	read_reply = NULL;    }}CMS_SERVER_REMOTE_TCP_PORT::CMS_SERVER_REMOTE_TCP_PORT(CMS_SERVER * _cms_server):CMS_SERVER_REMOTE_PORT(_cms_server){    client_ports = (LinkedList *) NULL;    connection_socket = 0;    connection_port = 0;    maxfdpl = 0;    dtimeout = 20.0;    memset(&server_socket_address, 0, sizeof(server_socket_address));    server_socket_address.sin_family = AF_INET;    server_socket_address.sin_addr.s_addr = htonl(INADDR_ANY);    server_socket_address.sin_port = 0;    client_ports = new LinkedList;    if (NULL == client_ports) {	rcs_print_error("Can not create linked list for client ports.\n");	return;    }    polling_enabled = 0;    memset(&select_timeout, 0, sizeof(select_timeout));    select_timeout.tv_sec = 30;    select_timeout.tv_usec = 30;    subscription_buffers = NULL;    current_poll_interval_millis = 30000;    memset(&read_fd_set, 0, sizeof(read_fd_set));    memset(&write_fd_set, 0, sizeof(write_fd_set));}CMS_SERVER_REMOTE_TCP_PORT::~CMS_SERVER_REMOTE_TCP_PORT(){    unregister_port();    if (NULL != client_ports) {	delete client_ports;	client_ports = (LinkedList *) NULL;    }}void blocking_thread_kill(long int id){    if (id <= 0) {	return;    }#ifdef POSIX_THREADS    pthread_kill(id, SIGINT);    pthread_join(id, NULL);#endif#ifdef NO_THREADS    kill(id, SIGINT);    waitpid(id, NULL, 0);#endif    tcpsvr_threads_killed++;}void CMS_SERVER_REMOTE_TCP_PORT::unregister_port(){    CLIENT_TCP_PORT *client;    int number_of_connected_clients = 0;    client = (CLIENT_TCP_PORT *) client_ports->get_head();    while (NULL != client) {	rcs_print("Exiting even though client on %s is still connected.\n",	    inet_ntoa(client->address.sin_addr));	client = (CLIENT_TCP_PORT *) client_ports->get_next();	number_of_connected_clients++;    }    client = (CLIENT_TCP_PORT *) client_ports->get_head();    while (NULL != client) {	delete client;	client_ports->delete_current_node();	client = (CLIENT_TCP_PORT *) client_ports->get_next();    }    if (NULL != subscription_buffers) {	TCP_BUFFER_SUBSCRIPTION_INFO *sub_info =	    (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head();	while (NULL != sub_info) {	    delete sub_info;	    sub_info = (TCP_BUFFER_SUBSCRIPTION_INFO *)		subscription_buffers->get_next();	}	delete subscription_buffers;	subscription_buffers = NULL;    }    if (number_of_connected_clients > 0) {	esleep(2.0);    }    if (connection_socket > 0) {	close(connection_socket);	connection_socket = 0;    }}int CMS_SERVER_REMOTE_TCP_PORT::accept_local_port_cms(CMS * _cms){    if (NULL == _cms) {	return 0;    }    if (_cms->remote_port_type != CMS_TCP_REMOTE_PORT_TYPE) {	return 0;    }    if (NULL != _cms) {	if (min_compatible_version < 1e-6 ||	    (min_compatible_version > _cms->min_compatible_version &&		_cms->min_compatible_version > 1e-6)) {	    min_compatible_version = _cms->min_compatible_version;	}	if (_cms->confirm_write) {	    confirm_write = _cms->confirm_write;	}    }    if (_cms->total_subdivisions > max_total_subdivisions) {	max_total_subdivisions = _cms->total_subdivisions;    }    if (server_socket_address.sin_port == 0) {	server_socket_address.sin_port =	    htons(((u_short) _cms->tcp_port_number));	port_num = _cms->tcp_port_number;	return 1;    }    if (server_socket_address.sin_port ==	htons(((u_short) _cms->tcp_port_number))) {	port_num = _cms->tcp_port_number;	return 1;    }    return 0;}void CMS_SERVER_REMOTE_TCP_PORT::register_port(){    port_registered = 0;    rcs_print_debug(PRINT_CMS_CONFIG_INFO,	"Registering server on TCP port %d.\n",	ntohs(server_socket_address.sin_port));    if (server_socket_address.sin_port == 0) {	rcs_print_error("server can not register on port number 0.\n");	return;    }    if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {	rcs_print_error("socket error: %d -- %s\n", errno, strerror(errno));	rcs_print_error("Server can not open stream socket.\n");	return;    }    if (set_tcp_socket_options(connection_socket) < 0) {	return;    }    if (bind(connection_socket, (struct sockaddr *) &server_socket_address,	    sizeof(server_socket_address)) < 0) {	rcs_print_error("bind error: %d -- %s\n", errno, strerror(errno));	rcs_print_error	    ("Server can not bind the connection socket on port %d.\n",	    ntohs(server_socket_address.sin_port));	return;    }    if (listen(connection_socket, 5) < 0) {	rcs_print_error("listen error: %d -- %s\n", errno, strerror(errno));	rcs_print_error("TCP Server: error on call to listen for port %d.\n",	    ntohs(server_socket_address.sin_port));	return;    }    port_registered = 1;}static int last_pipe_signum = 0;static void handle_pipe_error(int signum){    last_pipe_signum = signum;    rcs_print_error("SIGPIPE intercepted.\n");}void CMS_SERVER_REMOTE_TCP_PORT::run(){    unsigned long bytes_ready;    int ready_descriptors;    if (NULL == client_ports) {	rcs_print_error("CMS_SERVER: List of client ports is NULL.\n");	return;    }    CLIENT_TCP_PORT *new_client_port, *client_port_to_check;    FD_ZERO(&read_fd_set);    FD_ZERO(&write_fd_set);    FD_SET(connection_socket, &read_fd_set);    maxfdpl = connection_socket + 1;    signal(SIGPIPE, handle_pipe_error);    rcs_print_debug(PRINT_CMS_CONFIG_INFO,	"running server for TCP port %d (connection_socket = %d).\n",	ntohs(server_socket_address.sin_port), connection_socket);    cms_server_count++;    fd_set read_fd_set_copy, write_fd_set_copy;    FD_ZERO(&read_fd_set_copy);    FD_ZERO(&write_fd_set_copy);    FD_SET(connection_socket, &read_fd_set_copy);    while (1) {	if (polling_enabled) {	    memcpy(&read_fd_set_copy, &read_fd_set, sizeof(fd_set));	    memcpy(&write_fd_set_copy, &write_fd_set, sizeof(fd_set));	    select_timeout.tv_sec = current_poll_interval_millis / 1000;	    select_timeout.tv_usec =		(current_poll_interval_millis % 1000) * 1000;	    ready_descriptors =		select(maxfdpl, &read_fd_set, &write_fd_set,		(fd_set *) NULL, (timeval *) & select_timeout);	    if (ready_descriptors == 0) {		update_subscriptions();		memcpy(&read_fd_set, &read_fd_set_copy, sizeof(fd_set));		memcpy(&write_fd_set, &write_fd_set_copy, sizeof(fd_set));		continue;	    }	} else {	    ready_descriptors =		select(maxfdpl, &read_fd_set, &write_fd_set,		(fd_set *) NULL, (timeval *) NULL);	}	if (ready_descriptors < 0) {	    rcs_print_error("server: select error.(errno = %d | %s)\n",		errno, strerror(errno));	}	if (NULL == client_ports) {	    rcs_print_error("CMS_SERVER: List of client ports is NULL.\n");	    return;	}	client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head();	while (NULL != client_port_to_check) {	    if (FD_ISSET(client_port_to_check->socket_fd, &read_fd_set)) {		ioctl(client_port_to_check->socket_fd, FIONREAD,		    (caddr_t) & bytes_ready);		if (bytes_ready <= 0) {		    rcs_print_debug(PRINT_SOCKET_CONNECT,			"Socket closed by host with IP address %s.\n",			inet_ntoa(client_port_to_check->address.sin_addr));		    if (NULL != client_port_to_check->subscriptions) {			TCP_CLIENT_SUBSCRIPTION_INFO *clnt_sub_info =			    (TCP_CLIENT_SUBSCRIPTION_INFO *)			    client_port_to_check->subscriptions->get_head();			while (NULL != clnt_sub_info) {			    if (NULL != clnt_sub_info->sub_buf_info &&				clnt_sub_info->subscription_list_id >= 0) {				if (NULL !=				    clnt_sub_info->sub_buf_info->				    sub_clnt_info) {				    clnt_sub_info->sub_buf_info->					sub_clnt_info->					delete_node(clnt_sub_info->					subscription_list_id);				    if (clnt_sub_info->sub_buf_info->					sub_clnt_info->list_size < 1) {					delete clnt_sub_info->sub_buf_info->					    sub_clnt_info;					clnt_sub_info->sub_buf_info->					    sub_clnt_info = NULL;					if (NULL != subscription_buffers					    && clnt_sub_info->sub_buf_info->					    list_id >= 0) {					    subscription_buffers->						delete_node(clnt_sub_info->						sub_buf_info->list_id);					    delete clnt_sub_info->						sub_buf_info;					    clnt_sub_info->sub_buf_info =						NULL;					}				    }				    clnt_sub_info->sub_buf_info = NULL;				}				delete clnt_sub_info;				clnt_sub_info =				    (TCP_CLIENT_SUBSCRIPTION_INFO *)				    client_port_to_check->subscriptions->				    get_next();			    }			    delete client_port_to_check->subscriptions;			    client_port_to_check->subscriptions = NULL;			    recalculate_polling_interval();			}		    }		    if (client_port_to_check->threadId > 0			&& client_port_to_check->blocking) {			blocking_thread_kill(client_port_to_check->threadId);		    }		    close(client_port_to_check->socket_fd);		    FD_CLR(client_port_to_check->socket_fd, &read_fd_set);		    client_port_to_check->socket_fd = -1;		    delete client_port_to_check;		    client_ports->delete_current_node();		} else {		    if (client_port_to_check->blocking) {			if (client_port_to_check->threadId > 0) {			    rcs_print_debug(PRINT_SERVER_THREAD_ACTIVITY,				"Data recieved from %s:%d when it should be blocking (bytes_ready=%d).\n",				inet_ntoa				(client_port_to_check->address.				    sin_addr),				client_port_to_check->socket_fd, bytes_ready);			    rcs_print_debug(PRINT_SERVER_THREAD_ACTIVITY,				"Killing handler %d.\n",				client_port_to_check->threadId);			    blocking_thread_kill				(client_port_to_check->threadId);#if 0			    *((u_long *) temp_buffer) =				htonl(client_port_to_check->serial_number);			    *((u_long *) temp_buffer + 1) =				htonl((unsigned long)				CMS_SERVER_SIDE_ERROR);			    *((u_long *) temp_buffer + 2) = htonl(0);	/* size 									 */			    *((u_long *) temp_buffer + 3) = htonl(0);	/* write_id 									 */			    *((u_long *) temp_buffer + 4) = htonl(0);	/* was_read 									 */			    sendn(client_port_to_check->socket_fd,				temp_buffer, 20, 0, dtimeout);#endif			    client_port_to_check->threadId = 0;			    client_port_to_check->blocking = 0;			}		    }		    handle_request(client_port_to_check);		}		ready_descriptors--;	    } else {		FD_SET(client_port_to_check->socket_fd, &read_fd_set);	    }	    client_port_to_check =		(CLIENT_TCP_PORT *) client_ports->get_next();	}	if (FD_ISSET(connection_socket, &read_fd_set)	    && ready_descriptors > 0) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -