📄 smpd_state_machine.c
字号:
return SMPD_FAIL; } } else if (result == SMPD_CLOSE || result == SMPD_EXITING) { smpd_dbg_printf("not posting read for another command because %s returned\n", result == SMPD_CLOSE ? "SMPD_CLOSE" : "SMPD_EXITING"); smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } else if (result == SMPD_EXIT) { result = smpd_post_read_command(context); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a read for the next command on %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* The last process has exited, create a close command to tear down the tree */ smpd_process.closing = SMPD_TRUE; result = smpd_create_command("close", 0, 1, SMPD_FALSE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create the close command to tear down the job tree.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_post_write_command(smpd_process.left_context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the close command to tear down the job tree.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else if (result == SMPD_CONNECTED) { result = smpd_post_read_command(context); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a read for the next command on %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* mark the node as connected */ context->connect_to->connected = SMPD_TRUE; left = context->connect_to->left; right = context->connect_to->right; while (left != NULL || right != NULL) { if (left != NULL) { smpd_dbg_printf("creating connect command for left node\n"); host_node = left; left = NULL; } else { smpd_dbg_printf("creating connect command for right node\n"); host_node = right; right = NULL; } smpd_dbg_printf("creating connect command to '%s'\n", host_node->host); /* create a connect command to be sent to the parent */ result = smpd_create_command("connect", 0, host_node->parent, SMPD_TRUE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a connect command.\n"); smpd_exit_fn(FCNAME); return result; } host_node->connect_cmd_tag = cmd_ptr->tag; result = smpd_add_command_arg(cmd_ptr, "host", host_node->host); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the host parameter to the connect command for host %s\n", host_node->host); smpd_exit_fn(FCNAME); return result; } result = smpd_add_command_int_arg(cmd_ptr, "id", host_node->id); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the id parameter to the connect command for host %s\n", host_node->host); smpd_exit_fn(FCNAME); return result; } if (smpd_process.plaintext) { /* propagate the plaintext option to the manager doing the connect */ result = smpd_add_command_arg(cmd_ptr, "plaintext", "yes"); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the plaintext parameter to the connect command for host %s\n", host_node->host); smpd_exit_fn(FCNAME); return result; } } /* post a write of the command */ result = smpd_post_write_command(context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the connect command.\n"); smpd_exit_fn(FCNAME); return result; } } host_node = smpd_process.host_list; while (host_node != NULL) { if (host_node->connected == SMPD_FALSE) { smpd_dbg_printf("not connected yet: %s not connected\n", host_node->host); break; } host_node = host_node->next; } if (host_node == NULL) { context->connect_to = NULL; /* Everyone connected, send the start_dbs command */ /* create the start_dbs command to be sent to the first host */ result = smpd_create_command("start_dbs", 0, 1, SMPD_TRUE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a start_dbs command.\n"); smpd_exit_fn(FCNAME); return result; } if (context->spawn_context) { smpd_dbg_printf("spawn_context found, adding preput values to the start_dbs command.\n"); result = smpd_add_command_int_arg(cmd_ptr, "npreput", context->spawn_context->npreput); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the npreput value to the start_dbs command for a spawn command.\n"); smpd_exit_fn(FCNAME); return result; } result = smpd_add_command_arg(cmd_ptr, "preput", context->spawn_context->preput); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the npreput value to the start_dbs command for a spawn command.\n"); smpd_exit_fn(FCNAME); return result; } } /* post a write of the command */ result = smpd_post_write_command(context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the start_dbs command.\n"); smpd_exit_fn(FCNAME); return result; } } } else if (result == SMPD_DBS_RETURN) { /* printf("SMPD_DBS_RETURN returned, not posting read for the next command.\n"); fflush(stdout); */ smpd_exit_fn(FCNAME); return SMPD_DBS_RETURN; } else if (result == SMPD_ABORT) { result = smpd_post_read_command(context); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a read for the next command on %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_post_abort_command(""); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post an abort command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else { smpd_err_printf("unable to handle the command: \"%s\"\n", context->read_cmd.cmd); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_writing_cmd"int smpd_state_writing_cmd(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_command_t *cmd_ptr, *cmd_iter; smpd_enter_fn(FCNAME); if (event_ptr->error != MPI_SUCCESS) { smpd_err_printf("unable to write the command, %s.\n", get_sock_error_string(event_ptr->error)); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("wrote command\n"); if (context->write_list == NULL) { smpd_err_printf("data written on a context with no write command posted.\n"); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } context->write_state = SMPD_IDLE; cmd_ptr = context->write_list; context->write_list = context->write_list->next; smpd_dbg_printf("command written to %s: \"%s\"\n", smpd_get_context_str(context), cmd_ptr->cmd); if (strcmp(cmd_ptr->cmd_str, "closed") == 0) { smpd_dbg_printf("closed command written, posting close of the sock.\n"); smpd_dbg_printf("MPIDU_Sock_post_close(%d)\n", MPIDU_Sock_get_sock_id(context->sock)); context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close of the sock after writing a 'closed' command,\nsock error: %s\n", get_sock_error_string(result)); smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else if (strcmp(cmd_ptr->cmd_str, "down") == 0) { smpd_dbg_printf("down command written, posting a close of the %s context\n", smpd_get_context_str(context)); if (smpd_process.builtin_cmd == SMPD_CMD_RESTART) context->state = SMPD_RESTARTING; else context->state = SMPD_EXITING; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close of the sock after writing a 'down' command,\nsock error: %s\n", get_sock_error_string(result)); smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); smpd_exit(0); } smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } else if (strcmp(cmd_ptr->cmd_str, "done") == 0) { smpd_dbg_printf("done command written, posting a close of the %s context\n", smpd_get_context_str(context)); context->state = SMPD_DONE; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close of the sock after writing a 'done' command,\nsock error: %s\n", get_sock_error_string(result)); smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } if (cmd_ptr->wait) { /* If this command expects a reply, move it to the wait list */ smpd_dbg_printf("moving '%s' command to the wait_list.\n", cmd_ptr->cmd_str); if (context->wait_list) { cmd_iter = context->wait_list; while (cmd_iter->next) cmd_iter = cmd_iter->next; cmd_iter->next = cmd_ptr; } else { context->wait_list = cmd_ptr; } cmd_ptr->next = NULL; } else { /* otherwise free the command immediately. */ result = smpd_free_command(cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to free the written command.\n"); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } } cmd_ptr = context->write_list; if (cmd_ptr) { context->write_state = SMPD_WRITING_CMD; cmd_ptr->iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)cmd_ptr->cmd_hdr_str; cmd_ptr->iov[0].MPID_IOV_LEN = SMPD_CMD_HDR_LENGTH; cmd_ptr->iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)cmd_ptr->cmd; cmd_ptr->iov[1].MPID_IOV_LEN = cmd_ptr->length; smpd_dbg_printf("smpd_handle_written: posting write(%d bytes) for command: \"%s\"\n", cmd_ptr->iov[0].MPID_IOV_LEN + cmd_ptr->iov[1].MPID_IOV_LEN, cmd_ptr->cmd); result = MPIDU_Sock_post_writev(context->sock, cmd_ptr->iov, 2, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a write for the next command,\nsock error: %s\n", get_sock_error_string(result)); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_smpd_listening"int smpd_state_smpd_listening(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr, MPIDU_Sock_set_t set){ int result; MPIDU_Sock_t new_sock; smpd_context_t *new_context; char phrase[SMPD_PASSPHRASE_MAX_LENGTH]; smpd_enter_fn(FCNAME); SMPD_UNREFERENCED_ARG(event_ptr); result = MPIDU_Sock_accept(context->sock, set, NULL, &new_sock); if (result != MPI_SUCCESS) { smpd_err_printf("error accepting socket: %s\n", get_sock_error_string(result)); smpd_exit_fn(FCNAME); return SMPD
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -