📄 tcpmem.cc
字号:
struct timeval tm; int socket_ret; double start_time, current_time; fd_set fds; sockaddr_in cli_addr; cli_addr.sin_family = AF_INET; cli_addr.sin_addr.s_addr = htonl(INADDR_ANY); cli_addr.sin_port = htons(0); rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Binding . . . \n"); if (bind(socket_fd, (struct sockaddr *) &cli_addr, sizeof(cli_addr)) < 0) { rcs_print_error("TCPMEM: bind error %d = %s\n", errno, strerror(errno)); status = CMS_CREATE_ERROR; } rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Connecting . . .\n"); if (connect(socket_fd, (struct sockaddr *) &server_socket_address, sizeof(server_socket_address)) < 0) { if (EINPROGRESS == errno) { tm.tv_sec = (long) timeout; tm.tv_sec = (long) (fmod(timeout, 1.0) * 1e6); FD_ZERO(&fds); FD_SET(socket_fd, &fds); start_time = etime(); while (!(socket_ret = select(socket_fd + 1, (fd_set *) NULL, &fds, (fd_set *) NULL, &tm))) { FD_SET(socket_fd, &fds); esleep(0.001); current_time = etime(); double timeleft = start_time + timeout - current_time; if (timeleft <= 0.0 && timeout >= 0.0) { if (!reconnect_needed) { rcs_print_error ("TCPMEM: Timed out waiting for connection.\n"); } status = CMS_NO_SERVER_ERROR; return; } tm.tv_sec = (long) timeleft; tm.tv_sec = (long) (fmod(timeleft, 1.0) * 1e6); } if (-1 == socket_ret) { rcs_print_error("select error: %d -- %s\n", errno, strerror(errno)); rcs_print_error("TCPMEM: Couldn't connect.\n"); status = CMS_NO_SERVER_ERROR; return; } } else { rcs_print_error("connect error: %d -- %s\n", errno, strerror(errno)); rcs_print_error ("TCPMEM: Error trying to connect to TCP port %d of host %s(%s). sin_family=%d\n", ntohs(server_socket_address.sin_port), BufferHost, inet_ntoa(server_socket_address.sin_addr), server_socket_address.sin_family); status = CMS_NO_SERVER_ERROR; return; } } read_socket_fd = socket_fd; memset(temp_buffer, 0, 32); if (total_subdivisions > 1) { subscription_type = CMS_NO_SUBSCRIPTION; } if (subscription_type != CMS_NO_SUBSCRIPTION) { verify_bufname(); if (status < 0) { rcs_print_error("TCPMEM: verify_bufname() failed\n"); return; } *((u_long *) temp_buffer) = htonl((u_long) serial_number); *((u_long *) temp_buffer + 1) = htonl((u_long) REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE); *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number); *((u_long *) temp_buffer + 3) = htonl((u_long) subscription_type); *((u_long *) temp_buffer + 4) = htonl((u_long) poll_interval_millis); if (sendn(socket_fd, temp_buffer, 20, 0, 30) < 0) { rcs_print_error("Can`t setup subscription.\n"); subscription_type = CMS_NO_SUBSCRIPTION; } else { serial_number++; rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM sending request: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n", socket_fd, serial_number, ntohl(*((u_long *) temp_buffer + 1)), buffer_number); memset(temp_buffer, 0, 20); recvd_bytes = 0; if (recvn(socket_fd, temp_buffer, 8, 0, 30, &recvd_bytes) < 0) { rcs_print_error("Can`t setup subscription.\n"); subscription_type = CMS_NO_SUBSCRIPTION; } if (!ntohl(*((u_long *) temp_buffer) + 1)) { rcs_print_error("Can`t setup subscription.\n"); subscription_type = CMS_NO_SUBSCRIPTION; } bytes_to_throw_away = 8 - recvd_bytes; if (bytes_to_throw_away < 0 || bytes_to_throw_away > 8) { bytes_to_throw_away = 0; } recvd_bytes = 0; } memset(temp_buffer, 0, 20); } if (subscription_type != CMS_NO_SUBSCRIPTION) { polling = 1; } if (polling) { make_tcp_socket_nonblocking(socket_fd); write_socket_fd = socket(AF_INET, SOCK_STREAM, 0); if (write_socket_fd < 0) { rcs_print_error("TCPMEM: Error from socket() (errno = %d:%s)\n", errno, strerror(errno)); status = CMS_CREATE_ERROR; return; } rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Setting socket options . . . \n"); if (set_tcp_socket_options(write_socket_fd) < 0) { return; } rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Binding . . . \n"); if (bind (write_socket_fd, (struct sockaddr *) &cli_addr, sizeof(cli_addr)) < 0) { rcs_print_error("TCPMEM: bind error %d = %s\n", errno, strerror(errno)); status = CMS_CREATE_ERROR; } rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Connecting . . .\n"); if (connect (write_socket_fd, (struct sockaddr *) &server_socket_address, sizeof(server_socket_address)) < 0) { if (EINPROGRESS == errno) { FD_ZERO(&fds); FD_SET(write_socket_fd, &fds); start_time = etime(); tm.tv_sec = (long) timeout; tm.tv_sec = (long) (fmod(timeout, 1.0) * 1e6); while (!(socket_ret = select(write_socket_fd + 1, (fd_set *) NULL, &fds, (fd_set *) NULL, &tm))) { FD_SET(write_socket_fd, &fds); esleep(0.001); current_time = etime(); double timeleft = start_time + timeout - current_time; if (timeleft <= 0.0 && timeout >= 0.0) { rcs_print_error ("TCPMEM: Timed out waiting for connection.\n"); status = CMS_NO_SERVER_ERROR; return; } tm.tv_sec = (long) timeleft; tm.tv_sec = (long) (fmod(timeleft, 1.0) * 1e6); } if (-1 == socket_ret) { rcs_print_error("select error: %d -- %s\n", errno, strerror(errno)); rcs_print_error("TCPMEM: Couldn't connect.\n"); status = CMS_NO_SERVER_ERROR; return; } } else { rcs_print_error("connect error: %d -- %s\n", errno, strerror(errno)); rcs_print_error ("TCPMEM: Error trying to connect to TCP port %d of host %s.\n", ntohs(server_socket_address.sin_port), BufferHost); } } timeout = 0; } else { write_socket_fd = read_socket_fd; } reconnect_needed = 0; fatal_error_occurred = 0;}TCPMEM::~TCPMEM(){ disconnect();}void TCPMEM::disconnect(){ if (write_socket_fd > 0 && write_socket_fd != socket_fd) { if (status != CMS_CONFIG_ERROR && status != CMS_CREATE_ERROR) { if (delete_totally) { *((u_long *) temp_buffer) = htonl((u_long) serial_number); *((u_long *) temp_buffer + 1) = htonl((u_long) REMOTE_CMS_CLEAN_REQUEST_TYPE); *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number); sendn(write_socket_fd, temp_buffer, 20, 0, -1); } } close(write_socket_fd); write_socket_fd = 0; } if (socket_fd > 0) { if (status != CMS_CONFIG_ERROR && status != CMS_CREATE_ERROR) { if (delete_totally) { *((u_long *) temp_buffer) = htonl((u_long) serial_number); *((u_long *) temp_buffer + 1) = htonl((u_long) REMOTE_CMS_CLEAN_REQUEST_TYPE); *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number); sendn(socket_fd, temp_buffer, 20, 0, -1); } } close(socket_fd); socket_fd = 0; }}CMS_STATUS TCPMEM::handle_old_replies(){ long message_size; timedout_request_writeid = 0; status = CMS_STATUS_NOT_SET; switch (timedout_request) { case REMOTE_CMS_READ_REQUEST_TYPE: if (!waiting_for_message) { if (recvn(socket_fd, temp_buffer, 20, 0, timeout, &recvd_bytes) < 0) { if (recvn_timedout) { if (polling) { return status; } else { consecutive_timeouts++; if (consecutive_timeouts > max_consecutive_timeouts && max_consecutive_timeouts > 0) { rcs_print_error ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", consecutive_timeouts); fatal_error_occurred = 1; reconnect_needed = 1; } return (status = CMS_TIMED_OUT); } } else { recvd_bytes = 0; fatal_error_occurred = 1; return (status = CMS_MISC_ERROR); } } recvd_bytes = 0; returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer)); rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n", socket_fd, returned_serial_number, buffer_number); if (returned_serial_number != serial_number) { rcs_print_error ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", returned_serial_number, serial_number); if (subscription_type == CMS_NO_SUBSCRIPTION) { fatal_error_occurred = 1; reconnect_needed = 1; return (status = CMS_MISC_ERROR); } else { serial_number = returned_serial_number; } } message_size = ntohl(*((u_long *) temp_buffer + 2)); timedout_request_status = (CMS_STATUS) ntohl(*((u_long *) temp_buffer + 1)); timedout_request_writeid = ntohl(*((u_long *) temp_buffer + 3)); header.was_read = ntohl(*((u_long *) temp_buffer + 4)); if (message_size > max_encoded_message_size) { rcs_print_error("Recieved message is too big. (%ld > %ld)\n", message_size, max_encoded_message_size); fatal_error_occurred = 1; reconnect_needed = 1; return (status = CMS_INSUFFICIENT_SPACE_ERROR); } } else { message_size = waiting_message_size; } if (message_size > 0) { if (recvn (socket_fd, encoded_data, message_size, 0, timeout, &recvd_bytes) < 0) { if (recvn_timedout) { if (!waiting_for_message) { waiting_message_id = timedout_request_writeid; waiting_message_size = message_size; } waiting_for_message = 1; timedout_request_writeid = 0; if (polling) { return status; } else { consecutive_timeouts++; if (consecutive_timeouts > max_consecutive_timeouts && max_consecutive_timeouts > 0) { rcs_print_error ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", consecutive_timeouts); fatal_error_occurred = 1; reconnect_needed = 1; } return (status = CMS_TIMED_OUT); } } else { recvd_bytes = 0; fatal_error_occurred = 1; reconnect_needed = 1; return (status = CMS_MISC_ERROR); } } recvd_bytes = 0; if (waiting_for_message) { timedout_request_writeid = waiting_message_id; } } break; case REMOTE_CMS_WRITE_REQUEST_TYPE: case REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE: case REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE: case REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE: case REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE: if (timedout_request == REMOTE_CMS_WRITE_REQUEST_TYPE && (min_compatible_version > 2.58 || min_compatible_version < 1e-6 || confirm_write)) { break; } if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) { if (recvn_timedout) { consecutive_timeouts++; if (consecutive_timeouts > max_consecutive_timeouts && max_consecutive_timeouts > 0) { rcs_print_error ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", consecutive_timeouts); reconnect_needed = 1; fatal_error_occurred = 1; } reconnect_needed = 1; return (status = CMS_TIMED_OUT); } else { fatal_error_occurred = 1; reconnect_needed = 1; return (status = CMS_MISC_ERROR); } } recvd_bytes = 0; returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer)); rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n", socket_fd, returned_serial_number, buffer_number); if (returned_serial_number != serial_number) { rcs_print_error ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", returned_serial_number, serial_number); reconnect_needed = 1; if (subscription_type == CMS_NO_SUBSCRIPTION) { return (status = CMS_MISC_ERROR); } } break; case REMOTE_CMS_CLEAR_REQUEST_TYPE: if (recvn(socket_fd, temp_buffer, 4, 0, timeout, &recvd_bytes) < 0) { if (recvn_timedout) { consecutive_timeouts++; reconnect_needed = 1; if (consecutive_timeouts > max_consecutive_timeouts && max_consecutive_timeouts > 0) { rcs_print_error ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", consecutive_timeouts); fatal_error_occurred = 1; } return (status = CMS_TIMED_OUT); } else { reconnect_needed = 1; fatal_error_occurred = 1; return (status = CMS_MISC_ERROR); } } recvd_bytes = 0; returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer)); rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n", socket_fd, returned_serial_number, buffer_number); if (returned_serial_number != serial_number) { rcs_print_error ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", returned_serial_number, serial_number); reconnect_needed = 1; if (subscription_type == CMS_NO_SUBSCRIPTION) { return (status = CMS_MISC_ERROR); } } break; case NO_REMOTE_CMS_REQUEST: default: break; } if (bytes_to_throw_away > 0) { if (recvn (socket_fd, encoded_data, bytes_to_throw_away, 0, timeout, &recvd_bytes) < 0) { if (recvn_timedout) { consecutive_timeouts++; if (consecutive_timeouts > max_consecutive_timeouts && max_consecutive_timeouts > 0) { rcs_print_error ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", consecutive_timeouts); fatal_error_occurred = 1; reconnect_needed = 1; } return (status = CMS_TIMED_OUT); } else { recvd_bytes = 0; fatal_error_occurred = 1; reconnect_needed = 1; return (status = CMS_MISC_ERROR); } } recvd_bytes = 0; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -