📄 tcpmem.cc
字号:
bytes_to_throw_away = 0; timedout_request = NO_REMOTE_CMS_REQUEST; consecutive_timeouts = 0; waiting_for_message = 0; waiting_message_size = 0; waiting_message_id = 0; recvd_bytes = 0; return status;}CMS_STATUS TCPMEM::read(){ long message_size, id; REMOTE_CMS_REQUEST_TYPE last_timedout_request; /* Produce error message if process does not have permission to read. */ if (!read_permission_flag) { rcs_print_error("CMS: %s was not configured to read %s\n", ProcessName, BufferName); return (status = CMS_PERMISSIONS_ERROR); } if (reconnect_needed && autoreconnect) { reconnect(); } if (reconnect_needed) { return (status = CMS_MISC_ERROR); } disable_sigpipe(); if (subscription_type != CMS_NO_SUBSCRIPTION) { set_socket_fds(read_socket_fd); timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; if (subscription_count < 1) { serial_number++; } handle_old_replies(); check_id(timedout_request_writeid); if (status == CMS_READ_OK) { serial_number++; } subscription_count++; reenable_sigpipe(); return status; } if (timedout_request == NO_REMOTE_CMS_REQUEST) { set_socket_fds(read_socket_fd); } if (fatal_error_occurred) { if (status >= 0) { status = CMS_MISC_ERROR; } reenable_sigpipe(); return (status); } if (socket_fd <= 0) { rcs_print_error("TCPMEM::read: Invalid socket descriptor. (%d)\n", socket_fd); fatal_error_occurred = 1; reconnect_needed = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } last_timedout_request = timedout_request; if (((int) handle_old_replies()) < 0) { reenable_sigpipe(); return status; } if (polling && last_timedout_request == REMOTE_CMS_READ_REQUEST_TYPE) { check_id(timedout_request_writeid); reenable_sigpipe(); return status; } set_socket_fds(read_socket_fd); *((u_long *) temp_buffer) = htonl((u_long) serial_number); *((u_long *) temp_buffer + 1) = htonl((u_long) REMOTE_CMS_READ_REQUEST_TYPE); *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number); *((u_long *) temp_buffer + 3) = htonl((u_long) CMS_READ_ACCESS); *((u_long *) temp_buffer + 4) = htonl((u_long) in_buffer_id); int send_header_size = 20; if (total_subdivisions > 1) { *((u_long *) temp_buffer + 5) = htonl((u_long) current_subdivision); send_header_size = 24; } if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { rcs_print_error("TCPMEM: Can't send READ request to server.\n"); reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } serial_number++; rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM sending request: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n", socket_fd, serial_number, ntohl(*((u_long *) temp_buffer + 1)), buffer_number); if (recvn(socket_fd, temp_buffer, 20, 0, timeout, &recvd_bytes) < 20) { if (recvn_timedout) { timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; if (polling) { return (status = CMS_READ_OLD); } else { consecutive_timeouts = 1; reenable_sigpipe(); return (status = CMS_TIMED_OUT); } } else { recvd_bytes = 0; reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } } recvd_bytes = 0; returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer)); rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n", socket_fd, returned_serial_number, buffer_number); if (returned_serial_number != serial_number) { rcs_print_error ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", returned_serial_number, serial_number); reconnect_needed = 1; if (subscription_type == CMS_NO_SUBSCRIPTION) { fatal_error_occurred = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } } status = (CMS_STATUS) ntohl(*((u_long *) temp_buffer + 1)); message_size = ntohl(*((u_long *) temp_buffer + 2)); id = ntohl(*((u_long *) temp_buffer + 3)); header.was_read = ntohl(*((u_long *) temp_buffer + 4)); if (message_size > max_encoded_message_size) { rcs_print_error("Recieved message is too big. (%ld > %ld)\n", message_size, max_encoded_message_size); fatal_error_occurred = 1; reconnect_needed = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } if (message_size > 0) { if (recvn (socket_fd, encoded_data, message_size, 0, timeout, &recvd_bytes) < 0) { if (recvn_timedout) { if (!waiting_for_message) { waiting_message_id = id; waiting_message_size = message_size; } waiting_for_message = 1; timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; if (polling) { reenable_sigpipe(); return (status = CMS_READ_OLD); } else { reenable_sigpipe(); return (status = CMS_TIMED_OUT); } } else { recvd_bytes = 0; fatal_error_occurred = 1; reconnect_needed = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } } } recvd_bytes = 0; check_id(id); reenable_sigpipe(); return (status);}CMS_STATUS TCPMEM::blocking_read(double _blocking_timeout){ blocking_timeout = _blocking_timeout; long message_size, id; REMOTE_CMS_REQUEST_TYPE last_timedout_request; long timeout_millis; int orig_print_recvn_timeout_errors = print_recvn_timeout_errors; print_recvn_timeout_errors = 0;/* Produce error message if process does not have permission to read. */ if (!read_permission_flag) { rcs_print_error("CMS: %s was not configured to read %s\n", ProcessName, BufferName); return (status = CMS_PERMISSIONS_ERROR); } if (blocking_timeout < 0) { timeout_millis = -1; } else { timeout_millis = (u_long) (blocking_timeout * 1000.0); } if (reconnect_needed && autoreconnect) { reconnect(); } if (reconnect_needed) { print_recvn_timeout_errors = orig_print_recvn_timeout_errors; return (status = CMS_MISC_ERROR); } disable_sigpipe(); double orig_timeout = timeout; if (subscription_type != CMS_NO_SUBSCRIPTION) { if (blocking_timeout < -1e-6 || blocking_timeout > 1e-6) { make_tcp_socket_blocking(read_socket_fd); timeout = blocking_timeout; } set_socket_fds(read_socket_fd); if (subscription_count < 1) { serial_number++; } timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; handle_old_replies(); check_id(timedout_request_writeid); if (status == CMS_READ_OK) { serial_number++; } subscription_count++; reenable_sigpipe(); if (blocking_timeout < -1e-6 || blocking_timeout > 1e-6) { make_tcp_socket_nonblocking(read_socket_fd); timeout = orig_timeout; } print_recvn_timeout_errors = orig_print_recvn_timeout_errors; return status; } if (timedout_request == NO_REMOTE_CMS_REQUEST) { set_socket_fds(read_socket_fd); } if (fatal_error_occurred) { if (status >= 0) { status = CMS_MISC_ERROR; } reenable_sigpipe(); print_recvn_timeout_errors = orig_print_recvn_timeout_errors; return (status); } if (socket_fd <= 0) { rcs_print_error("TCPMEM::read: Invalid socket descriptor. (%d)\n", socket_fd); fatal_error_occurred = 1; reconnect_needed = 1; reenable_sigpipe(); print_recvn_timeout_errors = orig_print_recvn_timeout_errors; return (status = CMS_MISC_ERROR); } last_timedout_request = timedout_request; if (((int) handle_old_replies()) < 0) { reenable_sigpipe(); print_recvn_timeout_errors = orig_print_recvn_timeout_errors; return status; } if (polling && last_timedout_request == REMOTE_CMS_READ_REQUEST_TYPE) { check_id(timedout_request_writeid); reenable_sigpipe(); print_recvn_timeout_errors = orig_print_recvn_timeout_errors; return status; } set_socket_fds(read_socket_fd); *((u_long *) temp_buffer) = htonl((u_long) serial_number); *((u_long *) temp_buffer + 1) = htonl((u_long) REMOTE_CMS_BLOCKING_READ_REQUEST_TYPE); *((u_long *) temp_buffer + 2) = htonl((u_long) buffer_number); *((u_long *) temp_buffer + 3) = htonl((u_long) CMS_READ_ACCESS); *((u_long *) temp_buffer + 4) = htonl((u_long) in_buffer_id); *((u_long *) temp_buffer + 5) = htonl((u_long) timeout_millis); int send_header_size = 24; if (total_subdivisions > 1) { *((u_long *) temp_buffer + 6) = htonl((u_long) current_subdivision); send_header_size = 28; } if (sendn(socket_fd, temp_buffer, send_header_size, 0, blocking_timeout) < 0) { rcs_print_error ("TCPMEM: Can't send BLOCKING_READ request to server.\n"); reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); print_recvn_timeout_errors = orig_print_recvn_timeout_errors; return (status = CMS_MISC_ERROR); } serial_number++; rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM sending request: fd = %d, serial_number=%d, " "request_type=%d, buffer_number=%d\n", socket_fd, serial_number, ntohl(*((u_long *) temp_buffer + 1)), buffer_number); if (recvn(socket_fd, temp_buffer, 20, 0, blocking_timeout, &recvd_bytes) < 0) { print_recvn_timeout_errors = orig_print_recvn_timeout_errors; if (recvn_timedout) { timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; if (polling) { return (status = CMS_READ_OLD); } else { consecutive_timeouts = 1; reenable_sigpipe(); return (status = CMS_TIMED_OUT); } } else { recvd_bytes = 0; reconnect_needed = 1; fatal_error_occurred = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } } print_recvn_timeout_errors = orig_print_recvn_timeout_errors; recvd_bytes = 0; returned_serial_number = (CMS_STATUS) ntohl(*((u_long *) temp_buffer)); rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, "TCPMEM recieved_reply: fd = %d, serial_number=%d, buffer_number=%d\n", socket_fd, returned_serial_number, buffer_number); if (returned_serial_number != serial_number) { rcs_print_error ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", returned_serial_number, serial_number); reconnect_needed = 1; if (subscription_type == CMS_NO_SUBSCRIPTION) { fatal_error_occurred = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } } status = (CMS_STATUS) ntohl(*((u_long *) temp_buffer + 1)); message_size = ntohl(*((u_long *) temp_buffer + 2)); id = ntohl(*((u_long *) temp_buffer + 3)); header.was_read = ntohl(*((u_long *) temp_buffer + 4)); if (message_size > max_encoded_message_size) { rcs_print_error("Recieved message is too big. (%ld > %ld)\n", message_size, max_encoded_message_size); fatal_error_occurred = 1; reconnect_needed = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } if (message_size > 0) { if (recvn (socket_fd, encoded_data, message_size, 0, blocking_timeout, &recvd_bytes) < 0) { if (recvn_timedout) { if (!waiting_for_message) { waiting_message_id = id; waiting_message_size = message_size; } waiting_for_message = 1; timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; if (polling) { reenable_sigpipe(); return (status = CMS_READ_OLD); } else { reenable_sigpipe(); return (status = CMS_TIMED_OUT); } } else { recvd_bytes = 0; fatal_error_occurred = 1; reconnect_needed = 1; reenable_sigpipe(); return (status = CMS_MISC_ERROR); } } } recvd_bytes = 0; check_id(id); reenable_sigpipe(); return (status);}void TCPMEM::reenable_sigpipe(){ if (old_handler != ((void (*)(int)) SIG_ERR)) { signal(SIGPIPE, old_handler); } old_handler = (void (*)(int)) SIG_ERR; if (tcpmem_sigpipe_count > sigpipe_count) { sigpipe_count = tcpmem_sigpipe_count; reconnect_needed = 1; }}void TCPMEM::disable_sigpipe(){ if (!autoreconnect) { return; } old_handler = signal(SIGPIPE, tcpmem_sigpipe_handler); if (tcpmem_sigpipe_count > sigpipe_count) { sigpipe_count = tcpmem_sigpipe_count; }}CMS_STATUS TCPMEM::peek(){ /* Produce error message if process does not have permission to read. */ if (!read_permission_flag) { rcs_print_error("CMS: %s was not configured to read %s\n", ProcessName, BufferName); return (status = CMS_PERMISSIONS_ERROR); } if (reconnect_needed && autoreconnect) { reconnect(); } if (reconnect_needed) { return (status = CMS_MISC_ERROR); } disable_sigpipe(); long message_size, id; REMOTE_CMS_REQUEST_TYPE last_timedout_request; if (subscription_type != CMS_NO_SUBSCRIPTION) { set_socket_fds(read_socket_fd); timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; if (subscription_count < 1) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -