📄 smpd_handle_command.c
字号:
} if (MPIU_Str_get_int_arg(cmd->cmd, "first", &first) != MPIU_STR_SUCCESS) { first = 0; smpd_err_printf("no first flag in the stdout command: '%s'\n", cmd->cmd); } if (MPIU_Str_get_string_arg(cmd->cmd, "data", data, SMPD_MAX_STDOUT_LENGTH) == MPIU_STR_SUCCESS) { smpd_decode_buffer(data, data, SMPD_MAX_STDOUT_LENGTH, &num_decoded); data[num_decoded] = '\0'; if (data[num_decoded-1] == '\n') { ends_in_cr = SMPD_TRUE; } /*printf("[%d]", rank);*/ if (smpd_process.prefix_output == SMPD_TRUE) { if (first) { MPIU_Snprintf(prefix, 20, "[%d]", rank); prefix_length = strlen(prefix); write_to_stdout(prefix, prefix_length); } MPIU_Snprintf(prefix, 20, "\n[%d]", rank); prefix_length = strlen(prefix); token = strtok(data, "\r\n"); while (token != NULL) { write_to_stdout(token, strlen(token)); token = strtok(NULL, "\r\n"); if (token != NULL) { write_to_stdout(prefix, prefix_length); } else { if (ends_in_cr == SMPD_TRUE) { write_to_stdout("\n", 1); } } } } else { write_to_stdout(data, num_decoded); } } else { smpd_err_printf("unable to get the data from the stdout command: '%s'\n", cmd->cmd); } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "write_to_stderr"static int write_to_stderr(const char *buffer, size_t num_bytes){#ifdef HAVE_WINDOWS_H HANDLE hStderr; DWORD num_written; smpd_enter_fn(FCNAME); /* MT - This should acquire the hLaunchProcessMutex so that it doesn't acquire a redirected handle. */ /* But since the code is not multi-threaded yet this doesn't matter */ hStderr = GetStdHandle(STD_ERROR_HANDLE); WriteFile(hStderr, buffer, num_bytes, &num_written, NULL);#else smpd_enter_fn(FCNAME); fwrite(buffer, 1, num_bytes, stderr); fflush(stdout);#endif smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_stderr_command"int smpd_handle_stderr_command(smpd_context_t *context){ int rank; char data[SMPD_MAX_STDOUT_LENGTH]; smpd_command_t *cmd; int num_decoded = 0; int first; char prefix[20]; size_t prefix_length; char *token; SMPD_BOOL ends_in_cr = SMPD_FALSE; smpd_enter_fn(FCNAME); cmd = &context->read_cmd; if (MPIU_Str_get_int_arg(cmd->cmd, "rank", &rank) != MPIU_STR_SUCCESS) { rank = -1; smpd_err_printf("no rank in the stderr command: '%s'\n", cmd->cmd); } if (MPIU_Str_get_int_arg(cmd->cmd, "first", &first) != MPIU_STR_SUCCESS) { first = 0; smpd_err_printf("no first flag in the stderr command: '%s'\n", cmd->cmd); } if (MPIU_Str_get_string_arg(cmd->cmd, "data", data, SMPD_MAX_STDOUT_LENGTH) == MPIU_STR_SUCCESS) { smpd_decode_buffer(data, data, SMPD_MAX_STDOUT_LENGTH, &num_decoded); data[num_decoded] = '\0'; if (data[num_decoded-1] == '\n') { ends_in_cr = SMPD_TRUE; } /*fprintf(stderr, "[%d]", rank);*/ if (smpd_process.prefix_output == SMPD_TRUE) { if (first) { MPIU_Snprintf(prefix, 20, "[%d]", rank); prefix_length = strlen(prefix); write_to_stderr(prefix, prefix_length); } MPIU_Snprintf(prefix, 20, "\n[%d]", rank); prefix_length = strlen(prefix); token = strtok(data, "\r\n"); while (token != NULL) { write_to_stderr(token, strlen(token)); token = strtok(NULL, "\r\n"); if (token != NULL) { write_to_stderr(prefix, prefix_length); } else { write_to_stderr("\n", 1); } } } else { write_to_stderr(data, num_decoded); } } else { smpd_err_printf("unable to get the data from the stderr command: '%s'\n", cmd->cmd); } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "create_process_group"static int create_process_group(int nproc, char *kvs_name, smpd_process_group_t **pg_pptr){ int i; smpd_process_group_t *pg; smpd_enter_fn(FCNAME); /* initialize a new process group structure */ pg = (smpd_process_group_t*)MPIU_Malloc(sizeof(smpd_process_group_t)); if (pg == NULL) { smpd_err_printf("unable to allocate memory for a process group structure.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } pg->aborted = SMPD_FALSE; pg->any_init_received = SMPD_FALSE; pg->any_noinit_process_exited = SMPD_FALSE; strncpy(pg->kvs, kvs_name, SMPD_MAX_DBS_NAME_LEN); pg->num_procs = nproc; pg->processes = (smpd_exit_process_t*)MPIU_Malloc(nproc * sizeof(smpd_exit_process_t)); if (pg->processes == NULL) { smpd_err_printf("unable to allocate an array of %d process exit structures.\n", nproc); smpd_exit_fn(FCNAME); return SMPD_FAIL; } for (i=0; i<nproc; i++) { pg->processes[i].ctx_key[0] = '\0'; pg->processes[i].errmsg = NULL; pg->processes[i].exitcode = -1; pg->processes[i].exited = SMPD_FALSE; pg->processes[i].finalize_called = SMPD_FALSE; pg->processes[i].init_called = SMPD_FALSE; pg->processes[i].suspended = SMPD_FALSE; pg->processes[i].suspend_cmd = NULL; } /* add the process group to the global list */ pg->next = smpd_process.pg_list; smpd_process.pg_list = pg; *pg_pptr = pg; smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_launch_processes"int smpd_launch_processes(smpd_launch_node_t *launch_list, char *kvs_name, char *domain_name, smpd_spawn_context_t *spawn_context){ int result; smpd_command_t *cmd_ptr; smpd_launch_node_t *launch_node_ptr, *launch_iter; smpd_map_drive_node_t *map_iter; char drive_arg_str[20]; char drive_map_str[SMPD_MAX_EXE_LENGTH]; smpd_process_group_t *pg; int i; smpd_enter_fn(FCNAME); launch_node_ptr = launch_list; smpd_dbg_printf("creating a process group of size %d on node %d called %s\n", launch_node_ptr->nproc, smpd_process.id, kvs_name); result = create_process_group(launch_list->nproc, kvs_name, &pg); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a process group.\n"); goto launch_failure; } launch_iter = launch_node_ptr; for (i=0; i<launch_node_ptr->nproc; i++) { if (launch_iter == NULL) { smpd_err_printf("number of launch nodes does not match number of processes: %d < %d\n", i, launch_node_ptr->nproc); goto launch_failure; } pg->processes[launch_iter->iproc].node_id = launch_iter->host_id; MPIU_Strncpy(pg->processes[launch_iter->iproc].host, launch_iter->hostname, SMPD_MAX_HOST_LENGTH); launch_iter = launch_iter->next; } /* launch the processes */ smpd_dbg_printf("launching the processes.\n"); launch_node_ptr = launch_list; while (launch_node_ptr) { /* create the launch command */ result = smpd_create_command("launch", 0, launch_node_ptr->host_id, SMPD_TRUE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a launch command.\n"); goto launch_failure; } result = smpd_add_command_arg(cmd_ptr, "c", launch_node_ptr->exe); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the command line to the launch command: '%s'\n", launch_node_ptr->exe); goto launch_failure; } result = smpd_add_command_int_arg(cmd_ptr, "s", spawn_context ? 1 : 0); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the spawn flag to the launch command: '%s'\n", launch_node_ptr->exe); goto launch_failure; } result = smpd_add_command_int_arg(cmd_ptr, "a", launch_node_ptr->appnum); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the application number to the launch command: '%s'\n", launch_node_ptr->exe); goto launch_failure; } if (launch_node_ptr->env[0] != '\0') { result = smpd_add_command_arg(cmd_ptr, "e", launch_node_ptr->env); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the environment variables to the launch command: '%s'\n", launch_node_ptr->env); goto launch_failure; } } if (launch_node_ptr->dir[0] != '\0') { result = smpd_add_command_arg(cmd_ptr, "d", launch_node_ptr->dir); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the working directory to the launch command: '%s'\n", launch_node_ptr->dir); goto launch_failure; } } if (launch_node_ptr->path[0] != '\0') { result = smpd_add_command_arg(cmd_ptr, "p", launch_node_ptr->path); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the search path to the launch command: '%s'\n", launch_node_ptr->path); goto launch_failure; } } if (launch_node_ptr->clique[0] != '\0') { result = smpd_add_command_arg(cmd_ptr, "q", launch_node_ptr->clique); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the clique string to the launch command: '%s'\n", launch_node_ptr->clique); goto launch_failure; } } /*printf("creating launch command for rank %d\n", launch_node_ptr->iproc);*/ result = smpd_add_command_int_arg(cmd_ptr, "i", launch_node_ptr->iproc); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the rank to the launch command: %d\n", launch_node_ptr->iproc); goto launch_failure; } result = smpd_add_command_int_arg(cmd_ptr, "n", launch_node_ptr->nproc); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the nproc field to the launch command: %d\n", launch_node_ptr->nproc); goto launch_failure; } result = smpd_add_command_arg(cmd_ptr, "k", kvs_name); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the kvs name('%s') to the launch command\n", kvs_name); goto launch_failure; } result = smpd_add_command_arg(cmd_ptr, "kd", domain_name); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the domain name('%s') to the launch command\n", domain_name); goto launch_failure; } if (launch_node_ptr->priority_class != SMPD_DEFAULT_PRIORITY_CLASS) { result = smpd_add_command_int_arg(cmd_ptr, "pc", launch_node_ptr->priority_class); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the priority class field to the launch command: %d\n", launch_node_ptr->priority_class); goto launch_failure; } } if (launch_node_ptr->priority_thread != SMPD_DEFAULT_PRIORITY) { result = smpd_add_command_int_arg(cmd_ptr, "pt", launch_node_ptr->priority_thread); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the priority field to the launch command: %d\n", launch_node_ptr->priority_thread); goto launch_failure; } } map_iter = launch_node_ptr->map_list; i = 0; while (map_iter) { i++; map_iter = map_iter->next; } if (i > 0) { result = smpd_add_command_int_arg(cmd_ptr, "mn", i); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the drive mapping arg to the launch command: 'mn=%d'\n", i); goto launch_failure; } } map_iter = launch_node_ptr->map_list; i = 0; while (map_iter) { sprintf(drive_arg_str, "m%d", i); i++; sprintf(drive_map_str, "%c:%s", map_iter->drive, map_iter->share); result = smpd_add_command_arg(cmd_ptr, drive_arg_str, drive_map_str); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the drive mapping to the launch command: '%s'\n", drive_map_str); goto launch_failure; } map_iter = map_iter->next; } /* send the launch command */ result = smpd_post_write_command(smpd_process.left_context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write for the launch command:\n id: %d\n rank: %d\n cmd: '%s'\n", launch_node_ptr->host_id, launch_node_ptr->iproc, launch_node_ptr->exe); goto launch_failure; } /* increment the number of launched processes */ smpd_process.nproc++; if (spawn_context) spawn_context->num_outstanding_launch_cmds++; /* move to the next node */ launch_node_ptr = launch_node_ptr->next; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;launch_failure: smpd_exit_fn(FCNAME); return SMPD_FAIL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -