📄 smpd_state_machine.c
字号:
{ smpd_err_printf("unable to parse the read command: \"%s\"\n", context->read_cmd.cmd); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("read command: \"%s\"\n", context->read_cmd.cmd); context->read_cmd.state = SMPD_CMD_READY; result = smpd_handle_command(context); if (result == SMPD_SUCCESS) { result = smpd_post_read_command(context); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a read for the next command on %s context.\n", smpd_get_context_str(context)); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else if (result == SMPD_CLOSE || result == SMPD_EXITING) { smpd_dbg_printf("not posting read for another command because %s returned\n", result == SMPD_CLOSE ? "SMPD_CLOSE" : "SMPD_EXITING"); smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } else if (result == SMPD_EXIT) { result = smpd_post_read_command(context); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a read for the next command on %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* The last process has exited, create a close command to tear down the tree */ smpd_process.closing = SMPD_TRUE; result = smpd_create_command("close", 0, 1, SMPD_FALSE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create the close command to tear down the job tree.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_post_write_command(smpd_process.left_context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the close command to tear down the job tree.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else if (result == SMPD_CONNECTED) { result = smpd_post_read_command(context); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a read for the next command on %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* mark the node as connected */ context->connect_to->connected = SMPD_TRUE; left = context->connect_to->left; right = context->connect_to->right; while (left != NULL || right != NULL) { if (left != NULL) { smpd_dbg_printf("creating connect command for left node\n"); host_node = left; left = NULL; } else { smpd_dbg_printf("creating connect command for right node\n"); host_node = right; right = NULL; } smpd_dbg_printf("creating connect command to '%s'\n", host_node->host); /* create a connect command to be sent to the parent */ result = smpd_create_command("connect", 0, host_node->parent, SMPD_TRUE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a connect command.\n"); smpd_exit_fn(FCNAME); return result; } host_node->connect_cmd_tag = cmd_ptr->tag; result = smpd_add_command_arg(cmd_ptr, "host", host_node->host); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the host parameter to the connect command for host %s\n", host_node->host); smpd_exit_fn(FCNAME); return result; } result = smpd_add_command_int_arg(cmd_ptr, "id", host_node->id); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the id parameter to the connect command for host %s\n", host_node->host); smpd_exit_fn(FCNAME); return result; } if (smpd_process.plaintext) { /* propagate the plaintext option to the manager doing the connect */ result = smpd_add_command_arg(cmd_ptr, "plaintext", "yes"); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the plaintext parameter to the connect command for host %s\n", host_node->host); smpd_exit_fn(FCNAME); return result; } } /* post a write of the command */ result = smpd_post_write_command(context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the connect command.\n"); smpd_exit_fn(FCNAME); return result; } } host_node = smpd_process.host_list; while (host_node != NULL) { if (host_node->connected == SMPD_FALSE) { smpd_dbg_printf("not connected yet: %s not connected\n", host_node->host); break; } host_node = host_node->next; } if (host_node == NULL) { context->connect_to = NULL; /* Everyone connected, send the start_dbs command */ /* create the start_dbs command to be sent to the first host */ result = smpd_create_command("start_dbs", 0, 1, SMPD_TRUE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a start_dbs command.\n"); smpd_exit_fn(FCNAME); return result; } if (context->spawn_context) { smpd_dbg_printf("spawn_context found, adding preput values to the start_dbs command.\n"); result = smpd_add_command_int_arg(cmd_ptr, "npreput", context->spawn_context->npreput); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the npreput value to the start_dbs command for a spawn command.\n"); smpd_exit_fn(FCNAME); return result; } result = smpd_add_command_arg(cmd_ptr, "preput", context->spawn_context->preput); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the npreput value to the start_dbs command for a spawn command.\n"); smpd_exit_fn(FCNAME); return result; } } /* post a write of the command */ result = smpd_post_write_command(context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the start_dbs command.\n"); smpd_exit_fn(FCNAME); return result; } } } else if (result == SMPD_DBS_RETURN) { /* printf("SMPD_DBS_RETURN returned, not posting read for the next command.\n"); fflush(stdout); */ smpd_exit_fn(FCNAME); return SMPD_DBS_RETURN; } else if (result == SMPD_ABORT) { result = smpd_post_read_command(context); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a read for the next command on %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_post_abort_command(""); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post an abort command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else { smpd_err_printf("unable to handle the command: \"%s\"\n", context->read_cmd.cmd); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_writing_cmd"int smpd_state_writing_cmd(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_command_t *cmd_ptr, *cmd_iter; smpd_enter_fn(FCNAME); if (event_ptr->error != MPI_SUCCESS) { smpd_err_printf("unable to write the command, %s.\n", get_sock_error_string(event_ptr->error)); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("wrote command\n"); if (context->write_list == NULL) { smpd_err_printf("data written on a context with no write command posted.\n"); if (smpd_process.root_smpd) { context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_FAIL; } context->write_state = SMPD_IDLE; cmd_ptr = context->write_list; context->write_list = context->write_list->next; smpd_dbg_printf("command written to %s: \"%s\"\n", smpd_get_context_str(context), cmd_ptr->cmd); if (strcmp(cmd_ptr->cmd_str, "singinit_info") == 0){ /* Written singinit_info ... nothing more to be done... close the socket */ context->state = SMPD_SINGLETON_DONE; smpd_free_command(cmd_ptr); result = MPIDU_Sock_post_close(context->sock); if(result != MPI_SUCCESS){ smpd_err_printf("MPIDU_Sock_post_close failed, error = %s\n", get_sock_error_string(result)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } else if(strcmp(cmd_ptr->cmd_str, "die") == 0){ /* Sent 'die' command to singleton client. Now close pmi */ smpd_free_command(cmd_ptr); if(context->process){ if(context->process->pmi){ smpd_dbg_printf("Closing pmi ...\n"); result = MPIDU_Sock_post_close(context->process->pmi->sock); if(result != MPI_SUCCESS){ smpd_err_printf("Unable to post close on pmi sock\n"); } } } smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } else if(strcmp(cmd_ptr->cmd_str, "abort_job") == 0){ if(smpd_process.is_singleton_client){ /* In the case of a singleton client PM will not be able to "kill" it * - so post a read for 'die' cmd */ result = smpd_post_read_command(context); if(result != SMPD_SUCCESS){ smpd_err_printf("Unable to post a read for 'die' command\n"); if(context->process){ if(context->process->pmi){ smpd_dbg_printf("Closing pmi ...\n"); result = MPIDU_Sock_post_close(context->process->pmi->sock); if(result != MPI_SUCCESS){ smpd_err_printf("Unable to post close on pmi sock\n"); } } } smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } } } else if (strcmp(cmd_ptr->cmd_str, "closed") == 0) { smpd_dbg_printf("closed command written, posting close of the sock.\n"); smpd_dbg_printf("MPIDU_Sock_post_close(%d)\n", MPIDU_Sock_get_sock_id(context->sock)); context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close of the sock after writing a 'closed' command,\nsock error: %s\n", get_sock_error_string(result)); smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else if (strcmp(cmd_ptr->cmd_str, "down") == 0) { smpd_dbg_printf("down command written, posting a close of the %s context\n", smpd_get_context_str(context)); if (smpd_process.builtin_cmd == SMPD_CMD_RESTART) context->state = SMPD_RESTARTING; else context->state = SMPD_EXITING; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close of the sock after writing a 'down' command,\nsock error: %s\n", get_sock_error_string(result)); smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); smpd_exit(0); } smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } else if (strcmp(cmd_ptr->cmd_str, "done") == 0) { smpd_dbg_printf("done command written, posting a close of the %s context\n", smpd_get_context_str(context)); context->state = SMPD_DONE; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a close of the sock after writing a 'done' command,\nsock error: %s\n", get_sock_error_string(result)); smpd_free_command(cmd_ptr); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_free_command(cmd
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -