📄 tcpmem.cc
字号:
/********************************************************************* Description: tcpmem.cc** Derived from a work by Fred Proctor & Will Shackleford** Author:* License: GPL Version 2* System: Linux* * Copyright (c) 2004 All rights reserved.** Last change: * $Revision: 1.6 $* $Author: paul_c $* $Date: 2005/06/13 14:38:48 $********************************************************************/#ifdef __cplusplusextern "C" {#endif#include <stdlib.h> // strtol()#include <unistd.h>#include <string.h> // strstr()#include <errno.h> // errno, strerror()#include <signal.h> // signal, SIG_ERR, SIGPIPE#include <ctype.h> // isdigit()#include <arpa/inet.h> /* inet_ntoa */#include <sys/socket.h>#include <sys/time.h> /* struct timeval */#include <netdb.h>#include <math.h> /* fmod() */#ifdef __cplusplus}#endif#include "rem_msg.hh" /* REMOTE_CMS_READ_REQUEST_TYPE, etc. */#include "rcs_print.hh" /* rcs_print_error() */#include "cmsdiag.hh"#define DEFAULT_MAX_CONSECUTIVE_TIMEOUTS (-1)#include "timer.hh" /* esleep() */#include "tcpmem.hh"#include "recvn.h" /* recvn() */#include "sendn.h" /* sendn() */#include "tcp_opts.hh" /* SET_TCP_NODELAY */int tcpmem_sigpipe_count = 0;int last_sig = 0;void tcpmem_sigpipe_handler(int sig){ last_sig = sig; tcpmem_sigpipe_count++;}TCPMEM::TCPMEM(char *_bufline, char *_procline):CMS(_bufline, _procline){ max_consecutive_timeouts = DEFAULT_MAX_CONSECUTIVE_TIMEOUTS; char *max_consecutive_timeouts_string; max_consecutive_timeouts_string = strstr(ProcessLine, "max_timeouts="); polling = (NULL != strstr(proclineupper, "POLL")); socket_fd = 0; reconnect_needed = 0; autoreconnect = 1; old_handler = (void (*)(int)) SIG_ERR; sigpipe_count = 0; subscription_count = 0; read_serial_number = 0; write_serial_number = 0; read_socket_fd = 0; write_socket_fd = 0; if (NULL != max_consecutive_timeouts_string) { max_consecutive_timeouts_string += strlen("max_timeouts="); if (!strncmp(max_consecutive_timeouts_string, "INF", 3)) { max_consecutive_timeouts = -1; } else { max_consecutive_timeouts = strtol(max_consecutive_timeouts_string, (char **) NULL, 0); } } char *sub_info_string = NULL; poll_interval_millis = 30000; subscription_type = CMS_NO_SUBSCRIPTION; sub_info_string = strstr(ProcessLine, "sub="); if (NULL != sub_info_string) { if (!strncmp(sub_info_string + 4, "none", 4)) { subscription_type = CMS_NO_SUBSCRIPTION; } else if (!strncmp(sub_info_string + 4, "var", 3)) { subscription_type = CMS_VARIABLE_SUBSCRIPTION; } else { poll_interval_millis = ((int) (atof(sub_info_string + 4) * 1000.0)); subscription_type = CMS_POLLED_SUBSCRIPTION; } } if (NULL != strstr(ProcessLine, "noreconnect")) { autoreconnect = 0; } server_host_entry = NULL; /* Set up the socket address stucture. */ memset(&server_socket_address, 0, sizeof(server_socket_address)); server_socket_address.sin_family = AF_INET; server_socket_address.sin_port = htons(((u_short) tcp_port_number)); int hostname_was_address = 0; char bufferhost_first_char = BufferHost[0]; if (bufferhost_first_char >= '0' && bufferhost_first_char <= '9') { server_socket_address.sin_addr.s_addr = inet_addr(BufferHost); if (server_socket_address.sin_addr.s_addr != 0 && ((long) server_socket_address.sin_addr.s_addr) != -1) { hostname_was_address = 1; } } if (!hostname_was_address) { /* Get the IP address of the server using it's BufferHost. */ server_host_entry = gethostbyname(BufferHost); if (NULL == server_host_entry) { status = CMS_CONFIG_ERROR; autoreconnect = 0; rcs_print_error("TCPMEM: Couldn't get host address for (%s).\n", BufferHost); return; } server_socket_address.sin_addr.s_addr = *((int *) server_host_entry->h_addr_list[0]); server_socket_address.sin_family = server_host_entry->h_addrtype; } rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Using server on %s with IP address %s and port %d.\n", BufferHost, inet_ntoa(server_socket_address.sin_addr), tcp_port_number); reconnect(); if (status >= 0 && (min_compatible_version > 2.58 || min_compatible_version < 1e-6)) { verify_bufname(); if (status < 0) { rcs_print_error("TCPMEM: verify_bufname() failed.\n"); } } if (status >= 0 && enable_diagnostics && (min_compatible_version > 3.71 || min_compatible_version < 1e-6)) { send_diag_info(); }}void TCPMEM::send_diag_info(){ if (polling) { return; } if (NULL == dpi) { return; } disable_sigpipe(); set_socket_fds(read_socket_fd); memset(diag_info_buf, 0, 88); *((u_long *) diag_info_buf) = htonl((u_long) serial_number); *((u_long *) diag_info_buf + 1) = htonl((u_long) REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE); *((u_long *) diag_info_buf + 2) = htonl((u_long) buffer_number); strncpy(diag_info_buf + 20, dpi->name, 16); strncpy(diag_info_buf + 36, dpi->host_sysinfo, 32); *((u_long *) (diag_info_buf + 68)) = htonl((u_long) dpi->pid); *((u_long *) (diag_info_buf + 72)) = htonl((u_long) connection_number); memcpy(diag_info_buf + 76, &(dpi->rcslib_ver), 8); *((u_long *) (diag_info_buf + 84)) = 0x11223344; if (sendn(socket_fd, diag_info_buf, 88, 0, timeout) < 0) { reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); status = CMS_MISC_ERROR; return; } serial_number++; rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM sending request: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n", socket_fd, serial_number, ntohl(*((u_long *) diag_info_buf + 1)), buffer_number); reenable_sigpipe();}void TCPMEM::verify_bufname(){ if (polling) { return; } disable_sigpipe(); set_socket_fds(read_socket_fd); *((u_long *) temp_buffer) = htonl((u_long) serial_number); *((u_long *) temp_buffer + 1) = htonl((u_long) REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE); *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number); if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) { reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); status = CMS_MISC_ERROR; return; } serial_number++; rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM sending request: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n", socket_fd, serial_number, ntohl(*((u_long *) temp_buffer + 1)), buffer_number); if (recvn(socket_fd, temp_buffer, 40, 0, timeout, &recvd_bytes) < 0) { if (recvn_timedout) { bytes_to_throw_away = 40; return; } } returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer)); rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n", socket_fd, returned_serial_number, buffer_number); if (returned_serial_number != serial_number) { rcs_print_error ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", returned_serial_number, serial_number); reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); status = CMS_MISC_ERROR; return; } status = (CMS_STATUS) ntohl(*((u_long *) temp_buffer + 1)); if (status < 0) { return; } if (strncmp(temp_buffer + 8, BufferName, 31)) { rcs_print_error ("TCPMEM: The buffer (%s) is registered on TCP port %d of host %s with buffer number %d.\n", ((char *) temp_buffer + 8), tcp_port_number, BufferHost, buffer_number); rcs_print_error ("TCPMEM: However, this process (%s) is attempting to connect to the buffer %s at the same location.\n", ProcessName, BufferName); status = CMS_RESOURCE_CONFLICT_ERROR; return; } reenable_sigpipe();}CMS_DIAGNOSTICS_INFO *TCPMEM::get_diagnostics_info(){ if (polling) { return (NULL); } disable_sigpipe(); if (((int) handle_old_replies()) < 0) { reenable_sigpipe(); return (NULL); } set_socket_fds(read_socket_fd); *((u_long *) temp_buffer) = htonl((u_long) serial_number); *((u_long *) temp_buffer + 1) = htonl((u_long) REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE); *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number); if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) { reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); status = CMS_MISC_ERROR; return (NULL); } memset(temp_buffer, 0, 0x2000); serial_number++; rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM sending request: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n", socket_fd, serial_number, ntohl(*((u_long *) temp_buffer + 1)), buffer_number); if (recvn(socket_fd, temp_buffer, 32, 0, -1.0, &recvd_bytes) < 0) { if (recvn_timedout) { bytes_to_throw_away = 32; } return (NULL); } recvd_bytes = 0; returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer)); rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n", socket_fd, returned_serial_number, buffer_number); if (returned_serial_number != serial_number) { rcs_print_error ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", returned_serial_number, serial_number); reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); status = CMS_MISC_ERROR; return (NULL); } status = (CMS_STATUS) ntohl(*((u_long *) temp_buffer + 1)); if (status < 0) { return (NULL); } if (NULL == di) { di = new CMS_DIAGNOSTICS_INFO(); di->dpis = new LinkedList(); } else { di->dpis->delete_members(); } di->last_writer_dpi = NULL; di->last_reader_dpi = NULL; di->last_writer = ntohl(*((u_long *) temp_buffer + 2)); di->last_reader = ntohl(*((u_long *) temp_buffer + 3)); double server_time; memcpy(&server_time, temp_buffer + 16, 8); double local_time = etime(); double diff_time = local_time - server_time; int dpi_count = ntohl(*((u_long *) temp_buffer + 6)); int dpi_max_size = ntohl(*((u_long *) temp_buffer + 7)); if (dpi_max_size > 32 && dpi_max_size < 0x2000) { if (recvn (socket_fd, temp_buffer + 32, dpi_max_size - 32, 0, -1.0, &recvd_bytes) < 0) { if (recvn_timedout) { bytes_to_throw_away = dpi_max_size - 32; return (NULL); } } recvd_bytes = 0; int dpi_offset = 32; CMS_DIAG_PROC_INFO cms_dpi; for (int i = 0; i < dpi_count && dpi_offset < dpi_max_size; i++) { memset(&cms_dpi, 0, sizeof(CMS_DIAG_PROC_INFO)); memcpy(cms_dpi.name, temp_buffer + dpi_offset, 16); dpi_offset += 16; memcpy(cms_dpi.host_sysinfo, temp_buffer + dpi_offset, 32); dpi_offset += 32; cms_dpi.pid = ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; memcpy(&(cms_dpi.rcslib_ver), temp_buffer + dpi_offset, 8); dpi_offset += 8; cms_dpi.access_type = (CMS_INTERNAL_ACCESS_TYPE) ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; cms_dpi.msg_id = ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; cms_dpi.msg_size = ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; cms_dpi.msg_type = ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; cms_dpi.number_of_accesses = ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; cms_dpi.number_of_new_messages = ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; memcpy(&(cms_dpi.bytes_moved), temp_buffer + dpi_offset, 8); dpi_offset += 8; memcpy(&(cms_dpi.bytes_moved_across_socket), temp_buffer + dpi_offset, 8); dpi_offset += 8; memcpy(&(cms_dpi.last_access_time), temp_buffer + dpi_offset, 8); if (cmsdiag_timebias_set) { cms_dpi.last_access_time += diff_time - cmsdiag_timebias; } dpi_offset += 8; memcpy(&(cms_dpi.first_access_time), temp_buffer + dpi_offset, 8); if (cmsdiag_timebias_set) { cms_dpi.first_access_time += diff_time - cmsdiag_timebias; } dpi_offset += 8; memcpy(&(cms_dpi.min_difference), temp_buffer + dpi_offset, 8); dpi_offset += 8; memcpy(&(cms_dpi.max_difference), temp_buffer + dpi_offset, 8); dpi_offset += 8; di->dpis->store_at_tail(&cms_dpi, sizeof(CMS_DIAG_PROC_INFO), 1); int is_last_writer = ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; if (is_last_writer) { di->last_writer_dpi = (CMS_DIAG_PROC_INFO *) di->dpis->get_tail(); } int is_last_reader = ntohl(*((u_long *) ((char *) temp_buffer + dpi_offset))); dpi_offset += 4; if (is_last_reader) { di->last_reader_dpi = (CMS_DIAG_PROC_INFO *) di->dpis->get_tail(); } } } reenable_sigpipe(); return di;}void TCPMEM::reconnect(){ if (socket_fd > 0) { disconnect(); } subscription_count = 0; timedout_request = NO_REMOTE_CMS_REQUEST; bytes_to_throw_away = 0; recvd_bytes = 0; socket_fd = 0; waiting_for_message = 0; waiting_message_size = 0; waiting_message_id = 0; serial_number = 0; rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Creating socket . . .\n"); socket_fd = socket(AF_INET, SOCK_STREAM, 0); if (socket_fd < 0) { rcs_print_error("TCPMEM: Error from socket() (errno = %d:%s)\n", errno, strerror(errno)); status = CMS_CREATE_ERROR; return; } rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Setting socket options . . . \n"); if (set_tcp_socket_options(socket_fd) < 0) { return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -