📄 smpd_handle_command.c
字号:
#undef FCNAME#define FCNAME "smpd_handle_result"int smpd_handle_result(smpd_context_t *context){ int result, ret_val; char str[SMPD_MAX_CMD_LENGTH]; char err_msg[SMPD_MAX_ERROR_LEN]; smpd_command_t *iter, *trailer, *cmd_ptr; int match_tag; char ctx_key[100]; int process_id; smpd_context_t *pmi_context; smpd_process_t *piter; int rank; MPIDU_Sock_t insock; MPIDU_SOCK_NATIVE_FD stdin_fd; smpd_context_t *context_in;#ifdef HAVE_WINDOWS_H DWORD dwThreadID; SOCKET hWrite;#endif#ifdef USE_PTHREAD_STDIN_REDIRECTION int fd[2];#endif char pg_id[SMPD_MAX_DBS_NAME_LEN+1] = ""; char pg_ctx[100] = ""; int pg_rank = -1; smpd_process_group_t *pg; char host_description[256]; int listener_port; char context_str[20]; smpd_context_t *orig_context; int num_decoded; smpd_enter_fn(FCNAME); if (context->type != SMPD_CONTEXT_PMI && MPIU_Str_get_string_arg(context->read_cmd.cmd, "ctx_key", ctx_key, 100) == MPIU_STR_SUCCESS) { process_id = atoi(ctx_key); smpd_dbg_printf("forwarding the dbs result command to the pmi context %d.\n", process_id); pmi_context = NULL; piter = smpd_process.process_list; while (piter) { if (piter->id == process_id) { pmi_context = piter->pmi; break; } piter = piter->next; } if (pmi_context == NULL) { smpd_err_printf("received dbs result for a pmi context that doesn't exist: unmatched id = %d\n", process_id); smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } result = smpd_forward_command(context, pmi_context); smpd_exit_fn(FCNAME); return result; } if (MPIU_Str_get_int_arg(context->read_cmd.cmd, "cmd_tag", &match_tag) != MPIU_STR_SUCCESS) { smpd_err_printf("result command received without a cmd_tag field: '%s'\n", context->read_cmd.cmd); smpd_exit_fn(FCNAME); return SMPD_FAIL; } trailer = iter = context->wait_list; while (iter) { if (iter->tag == match_tag) { ret_val = SMPD_SUCCESS; if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "result", str, SMPD_MAX_CMD_LENGTH) == MPIU_STR_SUCCESS) { if (strcmp(iter->cmd_str, "connect") == 0) { if (strcmp(str, SMPD_SUCCESS_STR) == 0) { ret_val = SMPD_CONNECTED; switch (context->state) { case SMPD_MPIEXEC_CONNECTING_TREE: smpd_dbg_printf("successful connect, state: connecting tree.\n"); { smpd_host_node_t *host_node_iter; /* find the host node that this connect command was issued for and set the connect_to variable to to point to it */ host_node_iter = smpd_process.host_list; context->connect_to = NULL; while (host_node_iter != NULL) { if (host_node_iter->connect_cmd_tag == match_tag) { context->connect_to = host_node_iter; break; } host_node_iter = host_node_iter->next; } } break; case SMPD_MPIEXEC_CONNECTING_SMPD: smpd_dbg_printf("successful connect, state: connecting smpd.\n"); break; case SMPD_CONNECTING: smpd_dbg_printf("successful connect, state: connecting.\n"); break; default: break; } } else { smpd_err_printf("connect failed: %s\n", str); ret_val = SMPD_ABORT; } } else if (strcmp(iter->cmd_str, "launch") == 0) { smpd_process.nproc_launched++; if (strcmp(str, SMPD_SUCCESS_STR) == 0) { MPIU_Str_get_string_arg(context->read_cmd.cmd, "pg_id", pg_id, SMPD_MAX_DBS_NAME_LEN); MPIU_Str_get_int_arg(context->read_cmd.cmd, "pg_rank", &pg_rank); MPIU_Str_get_string_arg(context->read_cmd.cmd, "pg_ctx", pg_ctx, 100); pg = smpd_process.pg_list; while (pg) { if (strcmp(pg->kvs, pg_id) == 0) { MPIU_Strncpy(pg->processes[pg_rank].ctx_key, pg_ctx, 100); break; } pg = pg->next; } if (context->spawn_context) { smpd_dbg_printf("successfully spawned: '%s'\n", iter->cmd); /*printf("successfully spawned: '%s'\n", iter->cmd);fflush(stdout);*/ context->spawn_context->num_outstanding_launch_cmds--; smpd_dbg_printf("%d outstanding spawns.\n", context->spawn_context->num_outstanding_launch_cmds); if (context->spawn_context->num_outstanding_launch_cmds == 0) { /* add the result string */ result = smpd_add_command_arg(context->spawn_context->result_cmd, "result", SMPD_SUCCESS_STR); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the result string to the result command.\n"); smpd_exit_fn(FCNAME); return result; } /* send the spawn result command */ result = smpd_post_write_command(context, context->spawn_context->result_cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the spawn result command.\n"); smpd_exit_fn(FCNAME); return result; } smpd_process.spawning = SMPD_FALSE; result = smpd_handle_delayed_spawn_command(); if (result != SMPD_SUCCESS) { smpd_err_printf("An error occurred handling delayed spawn commands.\n"); smpd_exit_fn(FCNAME); return result; } } } else { smpd_dbg_printf("successfully launched: '%s'\n", iter->cmd); if (!smpd_process.stdin_redirecting) { rank = 0; MPIU_Str_get_int_arg(iter->cmd, "i", &rank); if (rank == 0) { smpd_dbg_printf("root process launched, starting stdin redirection.\n"); /* get a handle to stdin */#ifdef HAVE_WINDOWS_H result = smpd_make_socket_loop((SOCKET*)&stdin_fd, &hWrite); if (result) { smpd_err_printf("Unable to make a local socket loop to forward stdin.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; }#elif defined(USE_PTHREAD_STDIN_REDIRECTION) socketpair(AF_UNIX, SOCK_STREAM, 0, fd); smpd_process.stdin_read = fd[0]; smpd_process.stdin_write = fd[1]; stdin_fd = fd[0];#else stdin_fd = fileno(stdin);#endif /* convert the native handle to a sock */ /*printf("stdin native sock %d\n", stdin_fd);fflush(stdout);*/ result = MPIDU_Sock_native_to_sock(smpd_process.set, stdin_fd, NULL, &insock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to create a sock from stdin,\nsock error: %s\n", get_sock_error_string(result)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* create a context for reading from stdin */ result = smpd_create_context(SMPD_CONTEXT_MPIEXEC_STDIN, smpd_process.set, insock, -1, &context_in); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a context for stdin.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } MPIDU_Sock_set_user_ptr(insock, context_in);#ifdef HAVE_WINDOWS_H /* unfortunately, we cannot use stdin directly as a sock. So, use a thread to read and forward stdin to a sock */ smpd_process.hCloseStdinThreadEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (smpd_process.hCloseStdinThreadEvent == NULL) { smpd_err_printf("Unable to create the stdin thread close event, error %d\n", GetLastError()); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_process.hStdinThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)smpd_stdin_thread, (void*)hWrite, 0, &dwThreadID); if (smpd_process.hStdinThread == NULL) { smpd_err_printf("Unable to create a thread to read stdin, error %d\n", GetLastError()); smpd_exit_fn(FCNAME); return SMPD_FAIL; }#elif defined(USE_PTHREAD_STDIN_REDIRECTION) if (pthread_create(&smpd_process.stdin_thread, NULL, smpd_pthread_stdin_thread, NULL) != 0) { smpd_err_printf("Unable to create a thread to read stdin, error %d\n", errno); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /*pthread_detach(smpd_process.stdin_thread);*/#endif /* set this variable first before posting the first read to avoid a race condition? */ smpd_process.stdin_redirecting = SMPD_TRUE; /* post a read for a user command from stdin */ context_in->read_state = SMPD_READING_STDIN; result = MPIDU_Sock_post_read(insock, context_in->read_cmd.cmd, 1, 1, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a read on stdin for an incoming user command, error:\n%s\n", get_sock_error_string(result)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } } } if (pg && pg->processes[pg_rank].suspend_cmd != NULL) { /* send the delayed suspend command */ result = smpd_post_write_command(context, pg->processes[pg_rank].suspend_cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the suspend command.\n"); smpd_exit_fn(FCNAME); return result; } } } else { /* FIXME: We shouldn't abort if a process fails to launch as part of a spawn command */ smpd_process.nproc_exited++; if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "error", err_msg, SMPD_MAX_ERROR_LEN) == MPIU_STR_SUCCESS) { smpd_err_printf("launch failed: %s\n", err_msg); } else { smpd_err_printf("launch failed: %s\n", str); } ret_val = SMPD_ABORT; } } else if (strcmp(iter->cmd_str, "start_dbs") == 0) { if (strcmp(str, SMPD_SUCCESS_STR) == 0) { if (context->spawn_context) { if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "kvs_name", context->spawn_context->kvs_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS) { if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "domain_name", context->spawn_context->domain_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS) { smpd_dbg_printf("start_dbs succeeded, kvs_name: '%s', domain_name: '%s'\n", context->spawn_context->kvs_name, context->spawn_context->domain_name); ret_val = smpd_launch_processes(context->spawn_context->launch_list, context->spawn_context->kvs_name, context->spawn_context->domain_name, context->spawn_context); } else { smpd_err_printf("invalid start_dbs result returned, no domain_name specified: '%s'\n", context->read_cmd.cmd); ret_val = SMPD_FAIL; } } else { smpd_err_printf("invalid start_dbs result returned, no kvs_name specified: '%s'\n", context->read_cmd.cmd); ret_val = SMPD_FAIL; } } else { if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "kvs_name", smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS) { if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "domain_name", smpd_process.domain_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS) { smpd_dbg_printf("start_dbs succeeded, kvs_name: '%s', domain_name: '%s'\n", smpd_process.kvs_name, smpd_process.domain_name); if (smpd_process.launch_list != NULL) { ret_val = smpd_launch_processes(smpd_process.launch_list, smpd_process.kvs_name, smpd_process.domain_name, NULL); } else { /* mpiexec connected to an smpd without any processes to launch. This means it is running -pmiserver */ create_process_group(smpd_process.nproc, smpd_process.kvs_name, &pg); result = smpd_create_command("pmi_listen", 0, 1, SMPD_TRUE, &cmd_ptr); if (result == SMPD_SUCCESS) { result = smpd_add_command_int_arg(cmd_ptr, "nproc", smpd_process.nproc); if (result == SMPD_SUCCESS) { result = smpd_post_write_command(smpd_process.left_context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write for the pmi_listen command\n"); ret_val = SMPD_FAIL; } } else { smpd_err_printf("unable to add the nproc to the pmi_listen command\n"); ret_val = SMPD_FAIL; } } else { smpd_err_printf("unable to create a pmi_listen command.\n"); ret_val = SMPD_FAIL; } } } else { smpd_err_printf("invalid start_dbs result returned, no domain_name specified: '%s'\n", context->read_cmd.cmd); ret_val = SMPD_FAIL; } } else { smpd_err_printf("invalid start_dbs result returned, no kvs_name specified: '%s'\n", context->read_cmd.cmd); ret_val = SMPD_FAIL; } } } else { smpd_err_printf("start_dbs failed: %s\n", str); ret_val = SMPD_ABORT; } } else if (strcmp(iter->cmd_str, "pmi_listen") == 0) { MPIU_Str_get_string_arg(context->read_cmd.cmd, "host_description", host_description, 256); MPIU_Str_get_int_arg(context->read_cmd.cmd, "listener_port", &listener_port); /* SINGLETON: If no port to connect back print this information else connect to the port * and execute the SINGLETON INIT protocol */ if(smpd_process.singleton_client_port > 0){ smpd_context_t *p_singleton_mpiexec_context; char connect_to_host[SMPD_SINGLETON_MAX_HOST_NAME_LEN]; /* Create a context */ result = smpd_create_context(SMPD_CONTEXT_SINGLETON_INIT_MPIEXEC, context->set, MPIDU_SOCK_INVALID_SOCK, -1, &p_singleton_mpiexec_context); if(result != SMPD_SUCCESS){ context->state = SMPD_DONE; smpd_err_printf("smpd_create_context failed, error = %d\n", result); return SMPD_FAIL; } p_singleton_mpiexec_context->state = SMPD_SINGLETON_MPIEXEC_CONNECTING; p_singleton_mpiexec_context->singleton_init_pm_port = listener_port; strncpy(p_singleton_mpiexec_context->singleton_init_hostname, host_description, SMPD_SINGLETON_MAX_HOST_NAME_LEN); strncpy(p_singleton_mpiexec_context->singleton_init_kvsname, smpd_process.kvs_name, SMPD_SINGLETON_MAX_KVS_NAME_LEN); /* Post a connect */ /* FIXME : mismatch btw size of connect_to->host and max host name len */ result = get_hostname(connect_to_host, SMPD_SINGLETON_MAX_HOST_NAME_LEN); if(result != SMPD_SUCCESS){ context->state = SMPD_DONE; smpd_err_printf("gethostname failed, error = %d\n", result); result = smpd_free_context(p_singleton_mpiexec_context); if(result != SMPD_SUCCESS){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -