📄 smpd_handle_command.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "smpd.h"#ifdef HAVE_STDARG_H#include <stdarg.h>#endif#ifdef HAVE_WINDOWS_H#include "smpd_service.h"#include <Ntdsapi.h>#endif/* prototype local functions */static int write_to_stdout(const char *buffer, size_t num_bytes);static int write_to_stderr(const char *buffer, size_t num_bytes);static int smpd_do_abort_job(char *name, int rank, char *error_str, int exit_code);static int create_process_group(int nproc, char *kvs_name, smpd_process_group_t **pg_pptr);static int get_name_key_value(char *str, char *name, char *key, char *value);/* FIXME: smpd_get_hostname() does the same thing */#undef FCNAME#define FCNAME "get_hostname"static int get_hostname(char *host, int hostlen){ int retval = SMPD_SUCCESS;#ifdef HAVE_WINDOWS_H DWORD len = hostlen;#endif smpd_enter_fn(FCNAME); if(host == NULL){ smpd_err_printf("host == NULL\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; }#ifdef HAVE_WINDOWS_H if(!GetComputerNameEx(ComputerNameDnsFullyQualified, host, &len)){ smpd_err_printf("GetComputerNameEx failed, error = %d\n", GetLastError()); smpd_exit_fn(FCNAME); return SMPD_FAIL; }#else if(gethostname(host, hostlen)){ smpd_err_printf("gethostname failed, error = %d\n", errno); smpd_exit_fn(FCNAME); return SMPD_FAIL; }#endif smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_do_abort_job"static int smpd_do_abort_job(char *name, int rank, char *error_str, int exit_code){ int result; smpd_command_t *cmd_ptr; smpd_process_group_t *pg; int i; smpd_enter_fn(FCNAME); pg = smpd_process.pg_list; while (pg) { if (strcmp(pg->kvs, name) == 0) { if (rank >= pg->num_procs) { smpd_err_printf("invalid abort_job command - rank %d out of range, number of processes = %d", rank, pg->num_procs); smpd_exit_fn(FCNAME); return SMPD_FAIL; } break; } pg = pg->next; } if (pg == NULL) { smpd_err_printf("no process group structure found to match the abort_job command: pg <%s>\n", name); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* save the abort message */ pg->processes[rank].errmsg = MPIU_Strdup(error_str); if (pg->aborted == SMPD_TRUE) { smpd_dbg_printf("job already aborted.\n"); smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } smpd_process.use_abort_exit_code = SMPD_TRUE; smpd_process.abort_exit_code = exit_code; pg->aborted = SMPD_TRUE; pg->num_pending_suspends = 0; for (i=0; i<pg->num_procs; i++) { if (!pg->processes[i].exited) { /* create the suspend command */ result = smpd_create_command("suspend", 0, pg->processes[i].node_id, SMPD_TRUE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a suspend command.\n"); goto do_abort_failure; } result = smpd_add_command_arg(cmd_ptr, "name", name); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the kvs name to the suspend command: '%s'\n", name); goto do_abort_failure; } result = smpd_add_command_int_arg(cmd_ptr, "rank", i); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the rank %d to the suspend command\n", i); goto do_abort_failure; } result = smpd_add_command_int_arg(cmd_ptr, "exit_code", exit_code); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the exit_code %d to the suspend command\n", exit_code); goto do_abort_failure; } if (pg->processes[i].ctx_key[0] != '\0') { result = smpd_add_command_arg(cmd_ptr, "ctx_key", pg->processes[i].ctx_key); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the ctx_key to the suspend command: '%s'\n", pg->processes[i].ctx_key); goto do_abort_failure; } /* send the suspend command */ result = smpd_post_write_command(smpd_process.left_context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write for the suspend command: rank %d\n", rank); goto do_abort_failure; } } else { pg->processes[i].suspend_cmd = cmd_ptr; } pg->num_pending_suspends++; } } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;do_abort_failure: smpd_exit_fn(FCNAME); return SMPD_FAIL;}#undef FCNAME#define FCNAME "smpd_abort_job"int smpd_abort_job(char *name, int rank, char *fmt, ...){ int result; char error_str[2048] = ""; smpd_command_t *cmd_ptr; smpd_context_t *context; va_list list; smpd_enter_fn(FCNAME); va_start(list, fmt); vsnprintf(error_str, 2048, fmt, list); va_end(list); smpd_command_destination(0, &context); if (context == NULL) { result = smpd_do_abort_job(name, rank, error_str, 123); smpd_exit_fn(FCNAME); return result; } result = smpd_create_command("abort_job", smpd_process.id, 0, SMPD_FALSE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create an abort_job command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_add_command_arg(cmd_ptr, "name", name); if (result != SMPD_SUCCESS) { smpd_err_printf("Unable to add the job name to the abort_job command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_add_command_int_arg(cmd_ptr, "rank", rank); if (result != SMPD_SUCCESS) { smpd_err_printf("Unable to add the rank to the abort_job command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_add_command_arg(cmd_ptr, "error", error_str); if (result != SMPD_SUCCESS) { smpd_err_printf("Unable to add the error string to the abort_job command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("sending abort_job command to %s context: \"%s\"\n", smpd_get_context_str(context), cmd_ptr->cmd); result = smpd_post_write_command(context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the abort_job command to the %s context.\n", smpd_get_context_str(context)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_stdin_command"int smpd_handle_stdin_command(smpd_context_t *context){ char data[SMPD_MAX_STDOUT_LENGTH]; smpd_command_t *cmd; smpd_process_t *piter; smpd_stdin_write_node_t *node, *iter; int result; MPIU_Size_t num_written, num_decoded; int nd; smpd_enter_fn(FCNAME); cmd = &context->read_cmd; if (MPIU_Str_get_string_arg(cmd->cmd, "data", data, SMPD_MAX_STDOUT_LENGTH) == MPIU_STR_SUCCESS) { smpd_decode_buffer(data, data, SMPD_MAX_STDOUT_LENGTH, &nd); num_decoded = nd; /*printf("[%d]", rank);*/ piter = smpd_process.process_list; while (piter) { if (piter->rank == 0 || smpd_process.stdin_toall) { if (piter->stdin_write_list != NULL) { node = (smpd_stdin_write_node_t*)MPIU_Malloc(sizeof(smpd_stdin_write_node_t)); if (node == NULL) smpd_write(piter->in->sock, data, num_decoded); else { node->buffer = (char*)MPIU_Malloc(num_decoded); if (node->buffer == NULL) { MPIU_Free(node); smpd_write(piter->in->sock, data, num_decoded); } else { /* add the node to the end of the write list */ node->length = num_decoded; memcpy(node->buffer, data, num_decoded); node->next = NULL; iter = piter->stdin_write_list; while (iter->next != NULL) iter = iter->next; iter->next = node; } } } else { /* attempt to write the data immediately */ num_written = 0; result = MPIDU_Sock_write(piter->in->sock, data, num_decoded, &num_written); if (result != MPI_SUCCESS) { smpd_err_printf("unable to write data to the stdin context of process %d\n", piter->rank); } else { if (num_written != num_decoded) { /* If not all the data is written, copy it to a temp buffer and post a write for the remaining data */ node = (smpd_stdin_write_node_t*)MPIU_Malloc(sizeof(smpd_stdin_write_node_t)); if (node == NULL) smpd_write(piter->in->sock, &data[num_written], num_decoded-num_written); else { node->buffer = (char*)MPIU_Malloc(num_decoded-num_written); if (node->buffer == NULL) { MPIU_Free(node); smpd_write(piter->in->sock, &data[num_written], num_decoded-num_written); } else { /* add the node to write list */ node->length = num_decoded - num_written; memcpy(node->buffer, &data[num_written], num_decoded-num_written); node->next = NULL; piter->stdin_write_list = node; piter->in->write_state = SMPD_WRITING_DATA_TO_STDIN; result = MPIDU_Sock_post_write(piter->in->sock, node->buffer, node->length, node->length, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a write of %d bytes to stdin for rank %d\n", node->length, piter->rank); } } } } } } } piter = piter->next; } } else { smpd_err_printf("unable to get the data from the stdin command: '%s'\n", cmd->cmd); } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_close_stdin_command"int smpd_handle_close_stdin_command(smpd_context_t *context){ smpd_process_t *piter; int result; smpd_enter_fn(FCNAME); SMPD_UNREFERENCED_ARG(context); piter = smpd_process.process_list; while (piter) { if ((piter->rank == 0 || smpd_process.stdin_toall) && piter->in != NULL) { piter->in->state = SMPD_CLOSING; piter->in->process = NULL; /* NULL this out so the closing of the stdin context won't access it after it has been freed */ result = MPIDU_Sock_post_close(piter->in->sock); if (result == MPI_SUCCESS) { piter->in = NULL; } } piter = piter->next; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "write_to_stdout"static int write_to_stdout(const char *buffer, size_t num_bytes){#ifdef HAVE_WINDOWS_H HANDLE hStdout; DWORD num_written; smpd_enter_fn(FCNAME); /* MT - This should acquire the hLaunchProcessMutex so that it doesn't acquire a redirected handle. */ /* But since the code is not multi-threaded yet this doesn't matter */ hStdout = GetStdHandle(STD_OUTPUT_HANDLE); WriteFile(hStdout, buffer, num_bytes, &num_written, NULL);#else smpd_enter_fn(FCNAME); fwrite(buffer, 1, num_bytes, stdout); fflush(stdout);#endif smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_stdout_command"int smpd_handle_stdout_command(smpd_context_t *context){ int rank; char data[SMPD_MAX_STDOUT_LENGTH]; smpd_command_t *cmd; int num_decoded = 0; int first; char prefix[20]; size_t prefix_length; char *token; SMPD_BOOL ends_in_cr = SMPD_FALSE; smpd_enter_fn(FCNAME); cmd = &context->read_cmd; if (MPIU_Str_get_int_arg(cmd->cmd, "rank", &rank) != MPIU_STR_SUCCESS) { rank = -1; smpd_err_printf("no rank in the stdout command: '%s'\n", cmd->cmd);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -