📄 smpd_ipmi.c
字号:
strcat(buffer, key); } result = smpd_add_command_arg(cmd_ptr, "nkeyvals", buffer); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to add nkeyvals(%s) to the spawn command.\n", buffer); return PMI_FAIL; } } free(info_keyval_sizes); /* add the pre-put keyvals */ result = smpd_add_command_int_arg(cmd_ptr, "npreput", preput_keyval_size); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to add npreput=%d to the spawn command.\n", preput_keyval_size); return PMI_FAIL; } if (preput_keyval_size > 0 && preput_keyval_vector) { buffer[0] = '\0'; iter = buffer; maxlen = SMPD_MAX_CMD_LENGTH; for (i=0; i<preput_keyval_size; i++) { keyval_buf[0] = '\0'; iter2 = keyval_buf; maxlen2 = SMPD_MAX_CMD_LENGTH; result = MPIU_Str_add_string_arg(&iter2, &maxlen2, preput_keyval_vector[i].key, preput_keyval_vector[i].val); if (result != MPIU_STR_SUCCESS) { pmi_err_printf("unable to add %s=%s to the spawn command.\n", preput_keyval_vector[i].key, preput_keyval_vector[i].val); return PMI_FAIL; } if (iter2 > keyval_buf) { iter2--; *iter2 = '\0'; /* remove the trailing space */ } sprintf(key, "%d", i); result = MPIU_Str_add_string_arg(&iter, &maxlen, key, keyval_buf); if (result != MPIU_STR_SUCCESS) { pmi_err_printf("unable to add %s=%s to the spawn command.\n", key, keyval_buf); return PMI_FAIL; } } result = smpd_add_command_arg(cmd_ptr, "preput", buffer); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to add preput(%s) to the spawn command.\n", buffer); return PMI_FAIL; } } /*printf("spawn command: <%s>\n", cmd_ptr->cmd);*/ /* post the write of the command */ /* printf("posting write of spawn command to %s context, sock %d: '%s'\n", smpd_get_context_str(pmi_process.context), MPIDU_Sock_get_sock_id(pmi_process.context->sock), cmd_ptr->cmd); fflush(stdout); */ result = smpd_post_write_command(pmi_process.context, cmd_ptr); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to post a write of the spawn command.\n"); return PMI_FAIL; } /* post a read for the result*/ result = smpd_post_read_command(pmi_process.context); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to post a read of the next command on the pmi context.\n"); return PMI_FAIL; } /* let the state machine send the command and receive the result */ result = smpd_enter_at_state(pmi_process.set, SMPD_WRITING_CMD); if (result != SMPD_SUCCESS) { /*printf("PMI_Spawn_multiple returning failure.\n");fflush(stdout);*/ pmi_err_printf("the state machine logic failed to get the result of the spawn command.\n"); return PMI_FAIL; } for (i=0; i<total_num_processes; i++) { errors[i] = PMI_SUCCESS; } /*printf("PMI_Spawn_multiple returning success.\n");fflush(stdout);*/ return PMI_SUCCESS;}int iPMI_Parse_option(int num_args, char *args[], int *num_parsed, PMI_keyval_t **keyvalp, int *size){ if (num_args < 1) return PMI_ERR_INVALID_NUM_ARGS; if (args == NULL) return PMI_ERR_INVALID_ARGS; if (num_parsed == NULL) return PMI_ERR_INVALID_NUM_PARSED; if (keyvalp == NULL) return PMI_ERR_INVALID_KEYVALP; if (size == NULL) return PMI_ERR_INVALID_SIZE; *num_parsed = 0; *keyvalp = NULL; *size = 0; return PMI_SUCCESS;}int iPMI_Args_to_keyval(int *argcp, char *((*argvp)[]), PMI_keyval_t **keyvalp, int *size){ if (argcp == NULL || argvp == NULL || keyvalp == NULL || size == NULL) return PMI_ERR_INVALID_ARG; return PMI_SUCCESS;}int iPMI_Free_keyvals(PMI_keyval_t keyvalp[], int size){ if (keyvalp == NULL || size < 0) return PMI_ERR_INVALID_ARG; if (size == 0) return PMI_SUCCESS; /* free stuff */ return PMI_SUCCESS;}static char * namepub_kvs = NULL;static int setup_name_service(){ int result; char *pmi_namepub_kvs; if (namepub_kvs != NULL) { /* FIXME: Should it be an error to call setup_name_service twice? */ MPIU_Free(namepub_kvs); } namepub_kvs = (char*)MPIU_Malloc(PMI_MAX_KVS_NAME_LENGTH); if (!namepub_kvs) { pmi_err_printf("unable to allocate memory for the name publisher kvs.\n"); return PMI_FAIL; } pmi_namepub_kvs = getenv("PMI_NAMEPUB_KVS"); if (pmi_namepub_kvs) { strncpy(namepub_kvs, pmi_namepub_kvs, PMI_MAX_KVS_NAME_LENGTH); } else { /*result = PMI_KVS_Create(namepub_kvs, PMI_MAX_KVS_NAME_LENGTH);*/ result = iPMI_Get_kvs_domain_id(namepub_kvs, PMI_MAX_KVS_NAME_LENGTH); if (result != PMI_SUCCESS) { pmi_err_printf("unable to get the name publisher kvs name.\n"); return result; } } /*printf("namepub kvs: <%s>\n", namepub_kvs);fflush(stdout);*/ return PMI_SUCCESS;}int iPMI_Publish_name( const char service_name[], const char port[] ){ int result; if (service_name == NULL || port == NULL) return PMI_ERR_INVALID_ARG; if (namepub_kvs == NULL) { result = setup_name_service(); if (result != PMI_SUCCESS) return result; } /*printf("publish kvs: <%s>\n", namepub_kvs);fflush(stdout);*/ result = iPMI_KVS_Put(namepub_kvs, service_name, port); if (result != PMI_SUCCESS) { pmi_err_printf("unable to put the service name and port into the name publisher kvs.\n"); return result; } result = iPMI_KVS_Commit(namepub_kvs); if (result != PMI_SUCCESS) { pmi_err_printf("unable to commit the name publisher kvs.\n"); return result; } return PMI_SUCCESS;}int iPMI_Unpublish_name( const char service_name[] ){ int result; if (service_name == NULL) return PMI_ERR_INVALID_ARG; if (namepub_kvs == NULL) { result = setup_name_service(); if (result != PMI_SUCCESS) return result; } /*printf("unpublish kvs: <%s>\n", namepub_kvs);fflush(stdout);*/ /* This assumes you can put the same key more than once which breaks the PMI specification */ result = iPMI_KVS_Put(namepub_kvs, service_name, ""); if (result != PMI_SUCCESS) { pmi_err_printf("unable to put the blank service name and port into the name publisher kvs.\n"); return result; } result = iPMI_KVS_Commit(namepub_kvs); if (result != PMI_SUCCESS) { pmi_err_printf("unable to commit the name publisher kvs.\n"); return result; } return PMI_SUCCESS;}int iPMI_Lookup_name( const char service_name[], char port[] ){ int result; if (service_name == NULL || port == NULL) return PMI_ERR_INVALID_ARG; if (namepub_kvs == NULL) { result = setup_name_service(); if (result != PMI_SUCCESS) return result; } /*printf("lookup kvs: <%s>\n", namepub_kvs);fflush(stdout);*/ silence = 1; result = iPMI_KVS_Get(namepub_kvs, service_name, port, MPI_MAX_PORT_NAME); silence = 0; if (result != PMI_SUCCESS) { /* fail silently */ /*pmi_err_printf("unable to get the service name and port from the name publisher kvs.\n");*/ return result; } if (port[0] == '\0') { return MPI_ERR_NAME; } return PMI_SUCCESS;}#ifndef HAVE_WINDOWS_Hstatic int writebuf(int fd, void *buffer, int length){ unsigned char *buf; int num_written; buf = (unsigned char *)buffer; while (length) { num_written = write(fd, buf, length); if (num_written < 0) { if (errno != EINTR) { return errno; } num_written = 0; } buf = buf + num_written; length = length - num_written; } return 0;}static int readbuf(int fd, void *buffer, int length){ unsigned char *buf; int num_read; buf = (unsigned char *)buffer; while (length) { num_read = read(fd, buf, length); if (num_read < 0) { if (errno != EINTR) { return errno; } num_read = 0; } else if (num_read == 0) { return -1; } buf = buf + num_read; length = length - num_read; } return 0;}#endifint PMIX_Start_root_smpd(int nproc, char *host, int len, int *port){#ifdef HAVE_WINDOWS_H DWORD dwLength = len;#else int pipe_fd[2]; int result;#endif pmi_process.nproc = nproc;#ifdef HAVE_WINDOWS_H pmi_process.hRootThreadReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (pmi_process.hRootThreadReadyEvent == NULL) { pmi_err_printf("unable to create the root listener synchronization event, error: %d\n", GetLastError()); return PMI_FAIL; } pmi_process.hRootThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)root_smpd, NULL, 0, NULL); if (pmi_process.hRootThread == NULL) { pmi_err_printf("unable to create the root listener thread: error %d\n", GetLastError()); return PMI_FAIL; } if (WaitForSingleObject(pmi_process.hRootThreadReadyEvent, 60000) != WAIT_OBJECT_0) { pmi_err_printf("the root process thread failed to initialize.\n"); return PMI_FAIL; } /*GetComputerName(host, &dwLength);*/ GetComputerNameEx(ComputerNameDnsFullyQualified, host, &dwLength);#else pipe(pipe_fd); result = fork(); if (result == -1) { pmi_err_printf("unable to fork the root listener, errno %d\n", errno); return PMI_FAIL; } if (result == 0) { close(pipe_fd[0]); /* close the read end of the pipe */ result = root_smpd(&pipe_fd[1]); exit(result); } /* close the write end of the pipe */ close(pipe_fd[1]); /* read the port from the root_smpd process */ readbuf(pipe_fd[0], &pmi_process.root_port, sizeof(int)); /* read the kvs name */ readbuf(pipe_fd[0], smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN); /* close the read end of the pipe */ close(pipe_fd[0]); pmi_process.root_pid = result; gethostname(host, len);#endif *port = pmi_process.root_port; return PMI_SUCCESS;}int PMIX_Stop_root_smpd(){#ifdef HAVE_WINDOWS_H DWORD result;#else int status;#endif#ifdef HAVE_WINDOWS_H result = WaitForSingleObject(pmi_process.hRootThread, INFINITE); if (result != WAIT_OBJECT_0) { return PMI_FAIL; }#else kill(pmi_process.root_pid, SIGKILL); /* if (waitpid(pmi_process.root_pid, &status, WUNTRACED) == -1) { return PMI_FAIL; } */#endif return PMI_SUCCESS;}static int root_smpd(void *p){ int result; MPIDU_Sock_set_t set; MPIDU_Sock_t listener; smpd_process_group_t *pg; int i;#ifndef HAVE_WINDOWS_H int send_kvs = 0; int pipe_fd;#endif /* unreferenced parameter */ SMPD_UNREFERENCED_ARG(p); smpd_process.id = 1; smpd_process.root_smpd = SMPD_FALSE; smpd_process.map0to1 = SMPD_TRUE; result = MPIDU_Sock_create_set(&set); if (result != MPI_SUCCESS) { pmi_mpi_err_printf(result, "MPIDU_Sock_create_set failed.\n"); return PMI_FAIL; } smpd_process.set = set; smpd_dbg_printf("created a set for the listener: %d\n", MPIDU_Sock_get_sock_set_id(set)); result = MPIDU_Sock_listen(set, NULL, &pmi_process.root_port, &listener); if (result != MPI_SUCCESS) { pmi_mpi_err_printf(result, "MPIDU_Sock_listen failed.\n"); return PMI_FAIL; } smpd_dbg_printf("smpd listening on port %d\n", pmi_process.root_port); result = smpd_create_context(SMPD_CONTEXT_LISTENER, set, listener, -1, &smpd_process.listener_context); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to create a context for the smpd listener.\n"); return PMI_FAIL; } result = MPIDU_Sock_set_user_ptr(listener, smpd_process.listener_context); if (result != MPI_SUCCESS) { pmi_mpi_err_printf(result, "MPIDU_Sock_set_user_ptr failed.\n"); return PMI_FAIL; } smpd_process.listener_context->state = SMPD_SMPD_LISTENING; smpd_dbs_init(); smpd_process.have_dbs = SMPD_TRUE; if (smpd_process.kvs_name[0] != '\0') { result = smpd_dbs_create_name_in(smpd_process.kvs_name); } else { result = smpd_dbs_create(smpd_process.kvs_name);#ifndef HAVE_WINDOWS_H send_kvs = 1;#endif } if (result != SMPD_DBS_SUCCESS) { pmi_err_printf("unable to create a kvs database: name = <%s>.\n", smpd_process.kvs_name); return PMI_FAIL; } /* Set up the process group */ /* initialize a new process group structure */ pg = (smpd_process_group_t*)malloc(sizeof(smpd_process_group_t)); if (pg == NULL) { pmi_err_printf("unable to allocate memory for a process group structure.\n"); return PMI_FAIL; } pg->aborted = SMPD_FALSE; pg->any_init_received = SMPD_FALSE; pg->any_noinit_process_exited = SMPD_FALSE; strncpy(pg->kvs, smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN); pg->num_procs = pmi_process.nproc; pg->processes = (smpd_exit_process_t*)malloc(pmi_process.nproc * sizeof(smpd_exit_process_t)); if (pg->processes == NULL) { pmi_err_printf("unable to allocate an array of %d process exit structures.\n", pmi_process.nproc); return PMI_FAIL; } for (i=0; i<pmi_process.nproc; i++) { pg->processes[i].ctx_key[0] = '\0'; pg->processes[i].errmsg = NULL; pg->processes[i].exitcode = -1; pg->processes[i].exited = SMPD_FALSE; pg->processes[i].finalize_called = SMPD_FALSE; pg->processes[i].init_called = SMPD_FALSE; pg->processes[i].node_id = i+1; pg->processes[i].host[0] = '\0'; pg->processes[i].suspended = SMPD_FALSE; pg->process
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -