📄 mpiexec_rsh.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include <stdio.h>#include "mpiexec.h"#include "smpd.h"#define MAX_RSH_CMD_LENGTH 8192#define MAX_OUTPUT_LENGTH 4096#define MAX_INPUT_LENGTH 4096static char root_host[100];static int root_port;#undef FCNAME#define FCNAME "FmtEnvVarsForSSH"/* This function formats the environment strings of the form, * foo1=bar1 foo2=bar2 foo3=bar3 * TO * "foo1=bar1" "foo2=bar2" "foo3=bar3" * and returns the length of the formatted string * Note: The formatted string contains a space in the front */static int FmtEnvVarsForSSH(char *bEnv, char *fmtEnv, int fmtEnvLen){ char name[SMPD_MAX_ENV_LENGTH], equals[3], value[SMPD_MAX_ENV_LENGTH]; int curLen = 0, totLen = 0; smpd_enter_fn(FCNAME); if(fmtEnv && bEnv){ for (;;) { name[0] = '\0'; equals[0] = '\0'; value[0] = '\0'; if(fmtEnvLen <= 0) break; if (MPIU_Str_get_string(&bEnv, name, SMPD_MAX_ENV_LENGTH) != MPIU_STR_SUCCESS) break; if (name[0] == '\0') break; if (MPIU_Str_get_string(&bEnv, equals, 3) != MPIU_STR_SUCCESS) break; if (equals[0] == '\0') break; if (MPIU_Str_get_string(&bEnv, value, SMPD_MAX_ENV_LENGTH) != MPIU_STR_SUCCESS) break; MPIU_Snprintf(fmtEnv, fmtEnvLen, " \"%s=%s\"", name, value); curLen = strlen(fmtEnv); totLen += curLen; fmtEnv += curLen; fmtEnvLen -= curLen; } } smpd_exit_fn(FCNAME); return totLen;}static int setup_stdin_redirection(smpd_process_t *process, MPIDU_Sock_set_t set){ int result; smpd_context_t *context_in; MPIDU_SOCK_NATIVE_FD stdin_fd; MPIDU_Sock_t insock;#ifdef HAVE_WINDOWS_H DWORD dwThreadID; SOCKET hWrite;#endif smpd_enter_fn("setup_stdin_redirection"); /* get a handle to stdin */#ifdef HAVE_WINDOWS_H result = smpd_make_socket_loop((SOCKET*)&stdin_fd, &hWrite); if (result) { smpd_err_printf("Unable to make a local socket loop to forward stdin.\n"); smpd_exit_fn("setup_stdin_redirection"); return SMPD_FAIL; }#else stdin_fd = fileno(stdin);#endif /* convert the native handle to a sock */ result = MPIDU_Sock_native_to_sock(set, stdin_fd, NULL, &insock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to create a sock from stdin,\nsock error: %s\n", get_sock_error_string(result)); smpd_exit_fn("setup_stdin_redirection"); return SMPD_FAIL; } /* create a context for reading from stdin */ result = smpd_create_context(SMPD_CONTEXT_MPIEXEC_STDIN_RSH, set, insock, -1, &context_in); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a context for stdin.\n"); smpd_exit_fn("setup_stdin_redirection"); return SMPD_FAIL; } MPIDU_Sock_set_user_ptr(insock, context_in); context_in->process = process;#ifdef HAVE_WINDOWS_H /* unfortunately, we cannot use stdin directly as a sock. So, use a thread to read and forward stdin to a sock */ smpd_process.hCloseStdinThreadEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (smpd_process.hCloseStdinThreadEvent == NULL) { smpd_err_printf("Unable to create the stdin thread close event, error %d\n", GetLastError()); smpd_exit_fn("setup_stdin_redirection"); return SMPD_FAIL; } smpd_process.hStdinThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)smpd_stdin_thread, (void*)hWrite, 0, &dwThreadID); if (smpd_process.hStdinThread == NULL) { smpd_err_printf("Unable to create a thread to read stdin, error %d\n", GetLastError()); smpd_exit_fn("setup_stdin_redirection"); return SMPD_FAIL; }#endif /* set this variable first before posting the first read to avoid a race condition? */ smpd_process.stdin_redirecting = SMPD_TRUE; /* post a read for a user command from stdin */ context_in->read_state = SMPD_READING_STDIN; result = MPIDU_Sock_post_read(insock, context_in->read_cmd.cmd, 1, 1, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a read on stdin for an incoming user command, error:\n%s\n", get_sock_error_string(result)); smpd_exit_fn("setup_stdin_redirection"); return SMPD_FAIL; } smpd_exit_fn("setup_stdin_redirection"); return SMPD_SUCCESS;}#ifdef HAVE_WINDOWS_HBOOL WINAPI mpiexec_rsh_handler(DWORD type){ static int first = 1; char ch = -1; MPIU_Size_t num_written; switch (type) { case CTRL_LOGOFF_EVENT: break; case CTRL_C_EVENT: case CTRL_BREAK_EVENT: case CTRL_CLOSE_EVENT: case CTRL_SHUTDOWN_EVENT: if (smpd_process.timeout_sock != MPIDU_SOCK_INVALID_SOCK) { if (first == 1) { first = 0; MPIDU_Sock_write(smpd_process.timeout_sock, &ch, 1, &num_written); return TRUE; } } if (smpd_process.hCloseStdinThreadEvent) { SetEvent(smpd_process.hCloseStdinThreadEvent); } if (smpd_process.hStdinThread != NULL) { /* close stdin so the input thread will exit */ CloseHandle(GetStdHandle(STD_INPUT_HANDLE)); if (WaitForSingleObject(smpd_process.hStdinThread, 3000) != WAIT_OBJECT_0) { TerminateThread(smpd_process.hStdinThread, 321); } CloseHandle(smpd_process.hStdinThread); } /* if (smpd_process.hCloseStdinThreadEvent) { CloseHandle(smpd_process.hCloseStdinThreadEvent); smpd_process.hCloseStdinThreadEvent = NULL; } */ smpd_exit(-1); return TRUE; } return FALSE;}#endifstatic int ConnectToHost(char *host, int port, smpd_state_t state, MPIDU_Sock_set_t set, MPIDU_Sock_t *sockp, smpd_context_t **contextpp){ int result; char error_msg[MPI_MAX_ERROR_STRING]; int len; /*printf("posting a connect to %s:%d\n", host, port);fflush(stdout);*/ result = smpd_create_context(SMPD_CONTEXT_PMI, set, MPIDU_SOCK_INVALID_SOCK/**sockp*/, -1, contextpp); if (result != SMPD_SUCCESS) { smpd_err_printf("ConnectToHost failed: unable to create a context to connect to %s:%d with.\n", host, port); return SMPD_FAIL; } result = MPIDU_Sock_post_connect(set, *contextpp, host, port, sockp); if (result != MPI_SUCCESS) { len = MPI_MAX_ERROR_STRING; PMPI_Error_string(result, error_msg, &len); smpd_err_printf("ConnectToHost failed: unable to post a connect to %s:%d, error: %s\n", host, port, error_msg); return SMPD_FAIL; } (*contextpp)->sock = *sockp; (*contextpp)->state = state; result = smpd_enter_at_state(set, state); if (result != MPI_SUCCESS) { smpd_err_printf("ConnectToHost failed: unable to connect to %s:%d.\n", host, port); return SMPD_FAIL; } return SMPD_SUCCESS;}#ifdef HAVE_WINDOWS_H#define popen _popen#define pclose _pclose#endifstatic FILE *pmi_server = NULL;int start_pmi_server(int nproc, char *host, int len, int *port){ char cmd[100]; char line[1024]; char *argv0 = NULL; if (smpd_process.mpiexec_argv0 != NULL) { argv0 = smpd_process.mpiexec_argv0; } sprintf(cmd, "%s -pmiserver %d", argv0, nproc); pmi_server = popen(cmd, "r"); if (pmi_server == NULL) { smpd_err_printf("popen failed: %d\n", errno); return SMPD_FAIL; } fgets(line, 1024, pmi_server); strtok(line, "\r\n"); strncpy(host, line, len); /*printf("read host: %s\n", host);*/ fgets(line, 1024, pmi_server); *port = atoi(line); /*printf("read port: %d\n", *port);*/ fgets(line, 1024, pmi_server); strtok(line, "\r\n"); strncpy(smpd_process.kvs_name, line, SMPD_MAX_DBS_NAME_LEN); /*printf("read kvs: %s\n", smpd_process.kvs_name);*/ return SMPD_SUCCESS;}int stop_pmi_server(){ if (pmi_server != NULL) { if (pclose(pmi_server) == -1) { smpd_err_printf("pclose failed: %d\n", errno); return SMPD_FAIL; } } return SMPD_SUCCESS;}int mpiexec_rsh(){ int i; smpd_launch_node_t *launch_node_ptr; smpd_process_t *process, **processes; int result; char *iter1, *iter2; char exe[SMPD_MAX_EXE_LENGTH]; char *p; char ssh_cmd[100] = "ssh -x"; MPIDU_Sock_set_t set; SMPD_BOOL escape_escape = SMPD_TRUE; char *env_str; int maxlen; MPIDU_Sock_t abort_sock; smpd_context_t *abort_context = NULL; smpd_command_t *cmd_ptr; smpd_enter_fn("mpiexec_rsh");#ifdef HAVE_WINDOWS_H SetConsoleCtrlHandler(mpiexec_rsh_handler, TRUE);#else /* setup a signall hander? */#endif p = getenv("MPIEXEC_RSH"); if (p != NULL && strlen(p) > 0) { strncpy(ssh_cmd, p, 100); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -