⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpmem.cc

📁 Source code for an Numeric Cmputer
💻 CC
📖 第 1 页 / 共 5 页
字号:
/********************************************************************* Description: tcpmem.cc**   Derived from a work by Fred Proctor & Will Shackleford** Author:* License: GPL Version 2* System: Linux*    * Copyright (c) 2004 All rights reserved.** Last change: * $Revision: 1.6 $* $Author: paul_c $* $Date: 2005/06/13 14:38:48 $********************************************************************/#ifdef __cplusplusextern "C" {#endif#include <stdlib.h>		// strtol()#include <unistd.h>#include <string.h>		// strstr()#include <errno.h>		// errno, strerror()#include <signal.h>		// signal, SIG_ERR, SIGPIPE#include <ctype.h>		// isdigit()#include <arpa/inet.h>		/* inet_ntoa */#include <sys/socket.h>#include <sys/time.h>           /* struct timeval */#include <netdb.h>#include <math.h>		/* fmod() */#ifdef __cplusplus}#endif#include "rem_msg.hh"		/* REMOTE_CMS_READ_REQUEST_TYPE, etc. */#include "rcs_print.hh"		/* rcs_print_error() */#include "cmsdiag.hh"#define DEFAULT_MAX_CONSECUTIVE_TIMEOUTS (-1)#include "timer.hh"		/* esleep() */#include "tcpmem.hh"#include "recvn.h"		/* recvn() */#include "sendn.h"		/* sendn() */#include "tcp_opts.hh"		/* SET_TCP_NODELAY */int tcpmem_sigpipe_count = 0;int last_sig = 0;void tcpmem_sigpipe_handler(int sig){    last_sig = sig;    tcpmem_sigpipe_count++;}TCPMEM::TCPMEM(char *_bufline, char *_procline):CMS(_bufline, _procline){    max_consecutive_timeouts = DEFAULT_MAX_CONSECUTIVE_TIMEOUTS;    char *max_consecutive_timeouts_string;    max_consecutive_timeouts_string = strstr(ProcessLine, "max_timeouts=");    polling = (NULL != strstr(proclineupper, "POLL"));    socket_fd = 0;    reconnect_needed = 0;    autoreconnect = 1;    old_handler = (void (*)(int)) SIG_ERR;    sigpipe_count = 0;    subscription_count = 0;    read_serial_number = 0;    write_serial_number = 0;    read_socket_fd = 0;    write_socket_fd = 0;    if (NULL != max_consecutive_timeouts_string) {	max_consecutive_timeouts_string += strlen("max_timeouts=");	if (!strncmp(max_consecutive_timeouts_string, "INF", 3)) {	    max_consecutive_timeouts = -1;	} else {	    max_consecutive_timeouts =		strtol(max_consecutive_timeouts_string, (char **) NULL, 0);	}    }    char *sub_info_string = NULL;    poll_interval_millis = 30000;    subscription_type = CMS_NO_SUBSCRIPTION;    sub_info_string = strstr(ProcessLine, "sub=");    if (NULL != sub_info_string) {	if (!strncmp(sub_info_string + 4, "none", 4)) {	    subscription_type = CMS_NO_SUBSCRIPTION;	} else if (!strncmp(sub_info_string + 4, "var", 3)) {	    subscription_type = CMS_VARIABLE_SUBSCRIPTION;	} else {	    poll_interval_millis =		((int) (atof(sub_info_string + 4) * 1000.0));	    subscription_type = CMS_POLLED_SUBSCRIPTION;	}    }    if (NULL != strstr(ProcessLine, "noreconnect")) {	autoreconnect = 0;    }    server_host_entry = NULL;    /* Set up the socket address stucture. */    memset(&server_socket_address, 0, sizeof(server_socket_address));    server_socket_address.sin_family = AF_INET;    server_socket_address.sin_port = htons(((u_short) tcp_port_number));    int hostname_was_address = 0;    char bufferhost_first_char = BufferHost[0];    if (bufferhost_first_char >= '0' && bufferhost_first_char <= '9') {	server_socket_address.sin_addr.s_addr = inet_addr(BufferHost);	if (server_socket_address.sin_addr.s_addr != 0 &&	    ((long) server_socket_address.sin_addr.s_addr) != -1) {	    hostname_was_address = 1;	}    }    if (!hostname_was_address) {	/* Get the IP address of the server using it's BufferHost. */	server_host_entry = gethostbyname(BufferHost);	if (NULL == server_host_entry) {	    status = CMS_CONFIG_ERROR;	    autoreconnect = 0;	    rcs_print_error("TCPMEM: Couldn't get host address for (%s).\n",		BufferHost);	    return;	}	server_socket_address.sin_addr.s_addr =	    *((int *) server_host_entry->h_addr_list[0]);	server_socket_address.sin_family = server_host_entry->h_addrtype;    }    rcs_print_debug(PRINT_CMS_CONFIG_INFO,	"Using server on %s with IP address %s and port %d.\n",	BufferHost,	inet_ntoa(server_socket_address.sin_addr), tcp_port_number);    reconnect();    if (status >= 0 &&	(min_compatible_version > 2.58 || min_compatible_version < 1e-6)) {	verify_bufname();	if (status < 0) {	    rcs_print_error("TCPMEM: verify_bufname() failed.\n");	}    }    if (status >= 0 && enable_diagnostics &&	(min_compatible_version > 3.71 || min_compatible_version < 1e-6)) {	send_diag_info();    }}void  TCPMEM::send_diag_info(){    if (polling) {	return;    }    if (NULL == dpi) {	return;    }    disable_sigpipe();    set_socket_fds(read_socket_fd);    memset(diag_info_buf, 0, 88);    *((u_long *) diag_info_buf) = htonl((u_long) serial_number);    *((u_long *) diag_info_buf + 1) =	htonl((u_long) REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE);    *((u_long *) diag_info_buf + 2) = htonl((u_long) buffer_number);    strncpy(diag_info_buf + 20, dpi->name, 16);    strncpy(diag_info_buf + 36, dpi->host_sysinfo, 32);    *((u_long *) (diag_info_buf + 68)) = htonl((u_long) dpi->pid);    *((u_long *) (diag_info_buf + 72)) = htonl((u_long) connection_number);    memcpy(diag_info_buf + 76, &(dpi->rcslib_ver), 8);    *((u_long *) (diag_info_buf + 84)) = 0x11223344;    if (sendn(socket_fd, diag_info_buf, 88, 0, timeout) < 0) {	reconnect_needed = 1;	fatal_error_occurred = 1;	reenable_sigpipe();	status = CMS_MISC_ERROR;	return;    }    serial_number++;    rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,	"TCPMEM sending request: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n",	socket_fd, serial_number,	ntohl(*((u_long *) diag_info_buf + 1)), buffer_number);    reenable_sigpipe();}void TCPMEM::verify_bufname(){    if (polling) {	return;    }    disable_sigpipe();    set_socket_fds(read_socket_fd);    *((u_long *) temp_buffer) = htonl((u_long) serial_number);    *((u_long *) temp_buffer + 1) =	htonl((u_long) REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE);    *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number);    if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) {	reconnect_needed = 1;	fatal_error_occurred = 1;	reenable_sigpipe();	status = CMS_MISC_ERROR;	return;    }    serial_number++;    rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,	"TCPMEM sending request: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n",	socket_fd, serial_number,	ntohl(*((u_long *) temp_buffer + 1)), buffer_number);    if (recvn(socket_fd, temp_buffer, 40, 0, timeout, &recvd_bytes) < 0) {	if (recvn_timedout) {	    bytes_to_throw_away = 40;	    return;	}    }    returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer));    rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,	"TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n",	socket_fd, returned_serial_number, buffer_number);    if (returned_serial_number != serial_number) {	rcs_print_error	    ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n",	    returned_serial_number, serial_number);	reconnect_needed = 1;	fatal_error_occurred = 1;	reenable_sigpipe();	status = CMS_MISC_ERROR;	return;    }    status = (CMS_STATUS) ntohl(*((u_long *) temp_buffer + 1));    if (status < 0) {	return;    }    if (strncmp(temp_buffer + 8, BufferName, 31)) {	rcs_print_error	    ("TCPMEM: The buffer (%s) is registered on TCP port %d of host %s with buffer number %d.\n",	    ((char *) temp_buffer + 8), tcp_port_number, BufferHost,	    buffer_number);	rcs_print_error	    ("TCPMEM: However, this process (%s) is attempting to connect to the buffer %s at the same location.\n",	    ProcessName, BufferName);	status = CMS_RESOURCE_CONFLICT_ERROR;	return;    }    reenable_sigpipe();}CMS_DIAGNOSTICS_INFO *TCPMEM::get_diagnostics_info(){    if (polling) {	return (NULL);    }    disable_sigpipe();    if (((int) handle_old_replies()) < 0) {	reenable_sigpipe();	return (NULL);    }    set_socket_fds(read_socket_fd);    *((u_long *) temp_buffer) = htonl((u_long) serial_number);    *((u_long *) temp_buffer + 1) =	htonl((u_long) REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE);    *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number);    if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) {	reconnect_needed = 1;	fatal_error_occurred = 1;	reenable_sigpipe();	status = CMS_MISC_ERROR;	return (NULL);    }    memset(temp_buffer, 0, 0x2000);    serial_number++;    rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,	"TCPMEM sending request: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n",	socket_fd, serial_number,	ntohl(*((u_long *) temp_buffer + 1)), buffer_number);    if (recvn(socket_fd, temp_buffer, 32, 0, -1.0, &recvd_bytes) < 0) {	if (recvn_timedout) {	    bytes_to_throw_away = 32;	}	return (NULL);    }    recvd_bytes = 0;    returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer));    rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,	"TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n",	socket_fd, returned_serial_number, buffer_number);    if (returned_serial_number != serial_number) {	rcs_print_error	    ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n",	    returned_serial_number, serial_number);	reconnect_needed = 1;	fatal_error_occurred = 1;	reenable_sigpipe();	status = CMS_MISC_ERROR;	return (NULL);    }    status = (CMS_STATUS) ntohl(*((u_long *) temp_buffer + 1));    if (status < 0) {	return (NULL);    }    if (NULL == di) {	di = new CMS_DIAGNOSTICS_INFO();	di->dpis = new LinkedList();    } else {	di->dpis->delete_members();    }    di->last_writer_dpi = NULL;    di->last_reader_dpi = NULL;    di->last_writer = ntohl(*((u_long *) temp_buffer + 2));    di->last_reader = ntohl(*((u_long *) temp_buffer + 3));    double server_time;    memcpy(&server_time, temp_buffer + 16, 8);    double local_time = etime();    double diff_time = local_time - server_time;    int dpi_count = ntohl(*((u_long *) temp_buffer + 6));    int dpi_max_size = ntohl(*((u_long *) temp_buffer + 7));    if (dpi_max_size > 32 && dpi_max_size < 0x2000) {	if (recvn	    (socket_fd, temp_buffer + 32, dpi_max_size - 32, 0, -1.0,		&recvd_bytes) < 0) {	    if (recvn_timedout) {		bytes_to_throw_away = dpi_max_size - 32;		return (NULL);	    }	}	recvd_bytes = 0;	int dpi_offset = 32;	CMS_DIAG_PROC_INFO cms_dpi;	for (int i = 0; i < dpi_count && dpi_offset < dpi_max_size; i++) {	    memset(&cms_dpi, 0, sizeof(CMS_DIAG_PROC_INFO));	    memcpy(cms_dpi.name, temp_buffer + dpi_offset, 16);	    dpi_offset += 16;	    memcpy(cms_dpi.host_sysinfo, temp_buffer + dpi_offset, 32);	    dpi_offset += 32;	    cms_dpi.pid =		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    memcpy(&(cms_dpi.rcslib_ver), temp_buffer + dpi_offset, 8);	    dpi_offset += 8;	    cms_dpi.access_type = (CMS_INTERNAL_ACCESS_TYPE)		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    cms_dpi.msg_id =		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    cms_dpi.msg_size =		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    cms_dpi.msg_type =		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    cms_dpi.number_of_accesses =		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    cms_dpi.number_of_new_messages =		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    memcpy(&(cms_dpi.bytes_moved), temp_buffer + dpi_offset, 8);	    dpi_offset += 8;	    memcpy(&(cms_dpi.bytes_moved_across_socket),		temp_buffer + dpi_offset, 8);	    dpi_offset += 8;	    memcpy(&(cms_dpi.last_access_time), temp_buffer + dpi_offset, 8);	    if (cmsdiag_timebias_set) {		cms_dpi.last_access_time += diff_time - cmsdiag_timebias;	    }	    dpi_offset += 8;	    memcpy(&(cms_dpi.first_access_time), temp_buffer + dpi_offset, 8);	    if (cmsdiag_timebias_set) {		cms_dpi.first_access_time += diff_time - cmsdiag_timebias;	    }	    dpi_offset += 8;	    memcpy(&(cms_dpi.min_difference), temp_buffer + dpi_offset, 8);	    dpi_offset += 8;	    memcpy(&(cms_dpi.max_difference), temp_buffer + dpi_offset, 8);	    dpi_offset += 8;	    di->dpis->store_at_tail(&cms_dpi, sizeof(CMS_DIAG_PROC_INFO), 1);	    int is_last_writer =		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    if (is_last_writer) {		di->last_writer_dpi =		    (CMS_DIAG_PROC_INFO *) di->dpis->get_tail();	    }	    int is_last_reader =		ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset)));	    dpi_offset += 4;	    if (is_last_reader) {		di->last_reader_dpi =		    (CMS_DIAG_PROC_INFO *) di->dpis->get_tail();	    }	}    }    reenable_sigpipe();    return di;}void TCPMEM::reconnect(){    if (socket_fd > 0) {	disconnect();    }    subscription_count = 0;    timedout_request = NO_REMOTE_CMS_REQUEST;    bytes_to_throw_away = 0;    recvd_bytes = 0;    socket_fd = 0;    waiting_for_message = 0;    waiting_message_size = 0;    waiting_message_id = 0;    serial_number = 0;    rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Creating socket . . .\n");    socket_fd = socket(AF_INET, SOCK_STREAM, 0);    if (socket_fd < 0) {	rcs_print_error("TCPMEM: Error from socket() (errno = %d:%s)\n",	    errno, strerror(errno));	status = CMS_CREATE_ERROR;	return;    }    rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Setting socket options . . . \n");    if (set_tcp_socket_options(socket_fd) < 0) {	return;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -