📄 smpd_state_machine.c
字号:
{ if (MPIU_Str_get_int_arg(context->read_cmd.cmd, "tag", &cmd_ptr->tag) != MPIU_STR_SUCCESS) { smpd_dbg_printf("adding tag %d to connect command.\n", smpd_process.cur_tag); smpd_add_command_int_arg(cmd_ptr, "tag", smpd_process.cur_tag); cmd_ptr->tag = smpd_process.cur_tag; smpd_process.cur_tag++; } cmd_ptr->wait = SMPD_TRUE; } if (strcmp(cmd_ptr->cmd_str, "set") == 0 || strcmp(cmd_ptr->cmd_str, "delete") == 0 || strcmp(cmd_ptr->cmd_str, "stat") == 0 || strcmp(cmd_ptr->cmd_str, "get") == 0) { if (MPIU_Str_get_int_arg(context->read_cmd.cmd, "tag", &cmd_ptr->tag) != MPIU_STR_SUCCESS) { smpd_dbg_printf("adding tag %d to %s command.\n", smpd_process.cur_tag, cmd_ptr->cmd_str); smpd_add_command_int_arg(cmd_ptr, "tag", smpd_process.cur_tag); cmd_ptr->tag = smpd_process.cur_tag; smpd_process.cur_tag++; } cmd_ptr->wait = SMPD_TRUE; } smpd_dbg_printf("command read from stdin, forwarding to left_child smpd\n"); result = smpd_post_write_command(smpd_process.left_context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the command read from stdin: \"%s\"\n", cmd_ptr->cmd); smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("posted write of command: \"%s\"\n", cmd_ptr->cmd); } context->read_cmd.stdin_read_offset = 0; } else { context->read_cmd.stdin_read_offset++; } } result = MPIDU_Sock_post_read(context->sock, &context->read_cmd.cmd[context->read_cmd.stdin_read_offset], 1, 1, NULL); if (result != MPI_SUCCESS) { /* if (result != SOCK_EOF) { smpd_dbg_printf("MPIDU_Sock_post_read failed (%s), assuming %s is closed, calling sock_post_close(%d).\n", get_sock_error_string(result), smpd_get_context_str(context), sock_getid(context->sock)); } */ context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close on a broken %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_smpd_writing_data_to_stdin"int smpd_state_smpd_writing_data_to_stdin(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_stdin_write_node_t *node; smpd_enter_fn(FCNAME); SMPD_UNREFERENCED_ARG(event_ptr); node = context->process->stdin_write_list; if (node == NULL) { smpd_err_printf("write completed to process stdin context with no write posted in the list.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("wrote %d bytes to stdin of rank %d\n", node->length, context->process->rank); MPIU_Free(node->buffer); MPIU_Free(node); context->process->stdin_write_list = context->process->stdin_write_list->next; if (context->process->stdin_write_list != NULL) { context->process->in->write_state = SMPD_WRITING_DATA_TO_STDIN; result = MPIDU_Sock_post_write(context->process->in->sock, node->buffer, node->length, node->length, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a write of %d bytes to stdin for rank %d\n", node->length, context->process->rank); smpd_exit_fn(FCNAME); } } else { context->process->in->write_state = SMPD_IDLE; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_reading_stdouterr"int smpd_state_reading_stdouterr(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_command_t *cmd_ptr; MPIU_Size_t num_read; char buffer[SMPD_MAX_CMD_LENGTH]; int num_encoded; SMPD_BOOL ends_in_cr = SMPD_FALSE; smpd_enter_fn(FCNAME); if (context->state == SMPD_CLOSING) { smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } if (event_ptr->error != MPI_SUCCESS) { if (MPIR_ERR_GET_CLASS(event_ptr->error) != MPIDU_SOCK_ERR_CONN_CLOSED) { smpd_dbg_printf("reading failed(%s), assuming %s is closed.\n", get_sock_error_string(event_ptr->error), smpd_get_context_str(context)); } /* Return an error an then handle_sock_op_read will post a close context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close on a broken %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } */ smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("read from %s\n", smpd_get_context_str(context)); /* one byte read, attempt to read up to the buffer size */ result = MPIDU_Sock_read(context->sock, &context->read_cmd.cmd[1], (SMPD_MAX_STDOUT_LENGTH/2)-2, &num_read); if (result != MPI_SUCCESS) { num_read = 0; smpd_dbg_printf("MPIDU_Sock_read(%d) failed (%s), assuming %s is closed.\n", MPIDU_Sock_get_sock_id(context->sock), get_sock_error_string(result), smpd_get_context_str(context)); } /* Use num_read instead of num_read-1 because one byte was already read before increasing the buffer length by one */ if (context->read_cmd.cmd[num_read] == '\n') { ends_in_cr = SMPD_TRUE; } smpd_dbg_printf("%d bytes read from %s\n", num_read+1, smpd_get_context_str(context)); if (context->type == SMPD_CONTEXT_STDOUT_RSH || context->type == SMPD_CONTEXT_STDERR_RSH) { size_t total; size_t num_written; char *buffer; total = num_read+1; buffer = context->read_cmd.cmd; while (total > 0) { num_written = fwrite(buffer, 1, total, context->type == SMPD_CONTEXT_STDOUT_RSH ? stdout : stderr); if (num_written < 1) { num_read = 0; total = 0; smpd_err_printf("fwrite failed: error %d\n", ferror(context->type == SMPD_CONTEXT_STDOUT_RSH ? stdout : stderr)); } else { total = total - num_written; buffer = buffer + num_written; } } } else { smpd_encode_buffer(buffer, SMPD_MAX_CMD_LENGTH, context->read_cmd.cmd, num_read+1, &num_encoded); buffer[num_encoded*2] = '\0'; /*smpd_dbg_printf("encoded %d characters: %d '%s'\n", num_encoded, strlen(buffer), buffer);*/ /* create an output command */ result = smpd_create_command( smpd_get_context_str(context), smpd_process.id, 0 /* output always goes to node 0? */, SMPD_FALSE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create an output command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_add_command_int_arg(cmd_ptr, "rank", context->rank); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the rank to the %s command.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* Use the context->first_output_stdout/err flag to indicate that this is the first output since * an end of a line or the very first output of the application. mpiexec uses this flag when the * -l option is specified to prefix lines with process information. This flag only handles end of * line situations where the end of line is the last entry in the output. mpiexec handles end of * line characters in the middle of the output. */ switch (context->type) { case SMPD_CONTEXT_STDOUT: if (context->first_output_stdout == SMPD_TRUE) { result = smpd_add_command_int_arg(cmd_ptr, "first", 1); } else { result = smpd_add_command_int_arg(cmd_ptr, "first", 0); } context->first_output_stdout = ends_in_cr; break; case SMPD_CONTEXT_STDERR: if (context->first_output_stderr == SMPD_TRUE) { result = smpd_add_command_int_arg(cmd_ptr, "first", 1); } else { result = smpd_add_command_int_arg(cmd_ptr, "first", 0); } context->first_output_stderr = ends_in_cr; break; default: result = smpd_add_command_int_arg(cmd_ptr, "first", 0); break; } if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the first flag to the %s command.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_add_command_arg(cmd_ptr, "data", buffer); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the data to the %s command.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* send the stdout command */ result = smpd_post_write_command(smpd_process.parent_context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the %s command.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } /* post a read for the next byte of data */ result = MPIDU_Sock_post_read(context->sock, &context->read_cmd.cmd, 1, 1, NULL); if (result != MPI_SUCCESS) { /* if (result != SOCK_EOF) { smpd_dbg_printf("MPIDU_Sock_post_read failed (%s), assuming %s is closed, calling sock_post_close(%d).\n", get_sock_error_string(result), smpd_get_context_str(context), MPIDU_Sock_get_sock_id(context->sock)); } */ context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close on a broken %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_reading_cmd_header"int smpd_state_reading_cmd_header(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_enter_fn(FCNAME); if (event_ptr->error != MPI_SUCCESS) { smpd_err_printf("unable to read the cmd header on the %s context, %s.\n", smpd_get_context_str(context), get_sock_error_string(event_ptr->error)); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("read command header\n"); context->read_cmd.length = atoi(context->read_cmd.cmd_hdr_str); if ((context->read_cmd.length < 1) || (context->read_cmd.length > SMPD_MAX_CMD_LENGTH)) { smpd_err_printf("unable to read the command, invalid command length: %d\n", context->read_cmd.length); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("command header read, posting read for data: %d bytes\n", context->read_cmd.length); context->read_cmd.state = SMPD_CMD_READING_CMD; context->read_state = SMPD_READING_CMD; result = MPIDU_Sock_post_read(context->sock, context->read_cmd.cmd, context->read_cmd.length, context->read_cmd.length, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a read for the command string,\nsock error: %s\n", get_sock_error_string(result)); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_reading_cmd"int smpd_state_reading_cmd(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_command_t *cmd_ptr; smpd_host_node_t *left, *right, *host_node; smpd_enter_fn(FCNAME); if (event_ptr->error != MPI_SUCCESS) { smpd_err_printf("unable to read the command, %s.\n", get_sock_error_string(event_ptr->error)); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("read command\n"); result = smpd_parse_command(&context->read_cmd); if (result != SMPD_SUCCESS)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -