📄 mpiexec_rsh.c
字号:
p = getenv("MPIEXEC_RSH_NO_ESCAPE"); if (p != NULL) { if (smpd_is_affirmative(p) || strcmp(p, "1") == 0) { escape_escape = SMPD_FALSE; } } result = MPIDU_Sock_create_set(&set); if (result != MPI_SUCCESS) { smpd_err_printf("unable to create a set for the mpiexec_rsh.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } if (smpd_process.use_pmi_server) { result = start_pmi_server(smpd_process.launch_list->nproc, root_host, 100, &root_port); if (result != SMPD_SUCCESS) { smpd_err_printf("mpiexec_rsh is unable to start the local pmi server.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } smpd_dbg_printf("the pmi server is listening on %s:%d\n", root_host, root_port); } else { /* start the root smpd */ result = PMIX_Start_root_smpd(smpd_process.launch_list->nproc, root_host, 100, &root_port); if (result != PMI_SUCCESS) { smpd_err_printf("mpiexec_rsh is unable to start the root smpd.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } smpd_dbg_printf("the root smpd is listening on %s:%d\n", root_host, root_port); /* create a connection to the root smpd used to abort the job */ result = ConnectToHost(root_host, root_port, SMPD_CONNECTING_RPMI, set, &abort_sock, &abort_context); if (result != SMPD_SUCCESS) { smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } } processes = (smpd_process_t**)MPIU_Malloc(sizeof(smpd_process_t*) * smpd_process.launch_list->nproc); if (processes == NULL) { smpd_err_printf("unable to allocate process array.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } smpd_process.nproc = smpd_process.launch_list->nproc; launch_node_ptr = smpd_process.launch_list; for (i=0; i<smpd_process.launch_list->nproc; i++) { if (launch_node_ptr == NULL) { smpd_err_printf("Error: not enough launch nodes.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } /* initialize process structure */ result = smpd_create_process_struct(i, &process); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a process structure.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } /* no need for a pmi context */ if (process->pmi) smpd_free_context(process->pmi); process->pmi = NULL; /* change stdout and stderr to rsh behavior: * write stdout/err directly to stdout/err instead of creating * an smpd stdout/err command */ if (process->out != NULL) { process->out->type = SMPD_CONTEXT_STDOUT_RSH; } if (process->err != NULL) { process->err->type = SMPD_CONTEXT_STDERR_RSH; } MPIU_Strncpy(process->clique, launch_node_ptr->clique, SMPD_MAX_CLIQUE_LENGTH); MPIU_Strncpy(process->dir, launch_node_ptr->dir, SMPD_MAX_DIR_LENGTH); MPIU_Strncpy(process->domain_name, smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN); MPIU_Strncpy(process->env, launch_node_ptr->env, SMPD_MAX_ENV_LENGTH); if (escape_escape == SMPD_TRUE && smpd_process.mpiexec_run_local != SMPD_TRUE) { /* convert \ to \\ to make cygwin ssh happy */ iter1 = launch_node_ptr->exe; iter2 = exe; while (*iter1) { if (*iter1 == '\\') { *iter2 = *iter1; iter2++; *iter2 = *iter1; } else { *iter2 = *iter1; } iter1++; iter2++; } *iter2 = '\0'; /*printf("[%s] -> [%s]\n", launch_node_ptr->exe, exe);*/ } else { MPIU_Strncpy(exe, launch_node_ptr->exe, SMPD_MAX_EXE_LENGTH); } /* Two samples for testing on the local machine */ /* static rPMI initialization */ /*sprintf(process->exe, "env PMI_RANK=%d PMI_SIZE=%d PMI_KVS=%s PMI_ROOT_HOST=%s PMI_ROOT_PORT=8888 PMI_ROOT_LOCAL=1 PMI_APPNUM=%d %s", launch_node_ptr->iproc, launch_node_ptr->nproc, smpd_process.kvs_name, root_host, launch_node_ptr->appnum, exe);*/ /* dynamic rPMI initialization */ /*sprintf(process->exe, "env PMI_RANK=%d PMI_SIZE=%d PMI_KVS=%s PMI_ROOT_HOST=%s PMI_ROOT_PORT=%d PMI_ROOT_LOCAL=0 PMI_APPNUM=%d %s", launch_node_ptr->iproc, launch_node_ptr->nproc, smpd_process.kvs_name, root_host, root_port, launch_node_ptr->appnum, exe);*/ if (smpd_process.mpiexec_run_local == SMPD_TRUE) { /* -localonly option and dynamic rPMI initialization */ env_str = &process->env[strlen(process->env)]; maxlen = (int)(SMPD_MAX_ENV_LENGTH - strlen(process->env)); MPIU_Str_add_int_arg(&env_str, &maxlen, "PMI_RANK", launch_node_ptr->iproc); MPIU_Str_add_int_arg(&env_str, &maxlen, "PMI_SIZE", launch_node_ptr->nproc); MPIU_Str_add_string_arg(&env_str, &maxlen, "PMI_KVS", smpd_process.kvs_name); MPIU_Str_add_string_arg(&env_str, &maxlen, "PMI_ROOT_HOST", root_host); MPIU_Str_add_int_arg(&env_str, &maxlen, "PMI_ROOT_PORT", root_port); MPIU_Str_add_string_arg(&env_str, &maxlen, "PMI_ROOT_LOCAL", "0"); MPIU_Str_add_int_arg(&env_str, &maxlen, "PMI_APPNUM", launch_node_ptr->appnum); MPIU_Strncpy(process->exe, exe, SMPD_MAX_EXE_LENGTH); } else { /* ssh and dynamic rPMI initialization */ char fmtEnv[SMPD_MAX_ENV_LENGTH]; int fmtEnvLen = SMPD_MAX_ENV_LENGTH; char *pExe = process->exe; int curLen = 0; MPIU_Snprintf(pExe, SMPD_MAX_EXE_LENGTH, "%s %s env", ssh_cmd, launch_node_ptr->hostname); curLen = strlen(process->exe); pExe = process->exe + curLen; if(FmtEnvVarsForSSH(launch_node_ptr->env, fmtEnv, fmtEnvLen)){ /* Add user specified env vars */ MPIU_Snprintf(pExe, SMPD_MAX_EXE_LENGTH - curLen, "%s", fmtEnv); curLen = strlen(process->exe); pExe = process->exe + curLen; } MPIU_Snprintf(pExe, SMPD_MAX_EXE_LENGTH - curLen, " \"PMI_RANK=%d\" \"PMI_SIZE=%d\" \"PMI_KVS=%s\" \"PMI_ROOT_HOST=%s\" \"PMI_ROOT_PORT=%d\" \"PMI_ROOT_LOCAL=0\" \"PMI_APPNUM=%d\" \"%s\"", launch_node_ptr->iproc, launch_node_ptr->nproc, smpd_process.kvs_name, root_host, root_port, launch_node_ptr->appnum, exe); } MPIU_Strncpy(process->kvs_name, smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN); process->nproc = launch_node_ptr->nproc; MPIU_Strncpy(process->path, launch_node_ptr->path, SMPD_MAX_PATH_LENGTH); /* call smpd_launch_process */ smpd_dbg_printf("launching: %s\n", process->exe); result = smpd_launch_process(process, SMPD_DEFAULT_PRIORITY_CLASS, SMPD_DEFAULT_PRIORITY, SMPD_FALSE, set); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to launch process %d <%s>.\n", i, process->exe); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } /* save the new process in the list */ process->next = smpd_process.process_list; smpd_process.process_list = process; if (i == 0) { /* start the stdin redirection thread to the first process */ setup_stdin_redirection(process, set); } smpd_process.nproc_launched++; processes[i] = process; launch_node_ptr = launch_node_ptr->next; } if (launch_node_ptr != NULL) { smpd_err_printf("Error: too many launch nodes.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } /* Start the timeout mechanism if specified */ if (smpd_process.timeout > 0) { smpd_context_t *reader_context; MPIDU_Sock_t sock_reader; MPIDU_SOCK_NATIVE_FD reader, writer;#ifdef HAVE_WINDOWS_H /*SOCKET reader, writer;*/ smpd_make_socket_loop((SOCKET*)&reader, (SOCKET*)&writer);#else /*int reader, writer;*/ int pair[2]; socketpair(AF_UNIX, SOCK_STREAM, 0, pair); reader = pair[0]; writer = pair[1];#endif result = MPIDU_Sock_native_to_sock(set, reader, NULL, &sock_reader); result = MPIDU_Sock_native_to_sock(set, writer, NULL, &smpd_process.timeout_sock); result = smpd_create_context(SMPD_CONTEXT_TIMEOUT, set, sock_reader, -1, &reader_context); reader_context->read_state = SMPD_READING_TIMEOUT; result = MPIDU_Sock_post_read(sock_reader, &reader_context->read_cmd.cmd, 1, 1, NULL);#ifdef HAVE_WINDOWS_H /* create a Windows thread to sleep until the timeout expires */ smpd_process.timeout_thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)timeout_thread, NULL, 0, NULL); if (smpd_process.timeout_thread == NULL) { printf("Error: unable to create a timeout thread, errno %d.\n", GetLastError()); smpd_exit_fn("mp_parse_command_args"); return SMPD_FAIL; }#else#ifdef SIGALRM /* create an alarm to signal mpiexec when the timeout expires */ smpd_signal(SIGALRM, timeout_function); alarm(smpd_process.timeout);#else#ifdef HAVE_PTHREAD_H /* create a pthread to sleep until the timeout expires */ result = pthread_create(&smpd_process.timeout_thread, NULL, timeout_thread, NULL); if (result != 0) { printf("Error: unable to create a timeout thread, errno %d.\n", result); smpd_exit_fn("mp_parse_command_args"); return SMPD_FAIL; }#else /* no timeout mechanism available */#endif#endif#endif } result = smpd_enter_at_state(set, SMPD_IDLE); if (result != SMPD_SUCCESS) { smpd_err_printf("mpiexec_rsh state machine failed.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } if (smpd_process.use_pmi_server) { result = stop_pmi_server(); if (result != SMPD_SUCCESS) { smpd_err_printf("mpiexec_rsh unable to stop the pmi server.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } } else { /* Send an abort command to the root_smpd thread/process to insure that it exits. * This only needs to be sent when there is an error or failed process of some sort * but it is safe to send it in all cases. */ result = smpd_create_command("abort", 0, 0, SMPD_FALSE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create an abort command.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } result = smpd_post_write_command(abort_context, cmd_ptr); if (result != SMPD_SUCCESS) { /* Only print this as a debug message instead of an error because the root_smpd thread/process may have already exited. */ smpd_dbg_printf("unable to post a write of the abort command to the %s context.\n", smpd_get_context_str(abort_context)); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } result = PMIX_Stop_root_smpd(); if (result != PMI_SUCCESS) { smpd_err_printf("mpiexec_rsh unable to stop the root smpd.\n"); smpd_exit_fn("mpiexec_rsh"); return SMPD_FAIL; } } smpd_exit_fn("mpiexec_rsh"); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -