📄 smpd_ipmi.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "ipmi.h"#ifdef HAVE_CTYPE_H#include <ctype.h>#endif/* pmiimpl.h */static int root_smpd(void *p);/* Define to prevent an smpd root thread or process from being created when there is only one process. *//* Currently, defining this prevents the use of the spawn command. *//*#define SINGLE_PROCESS_OPTIMIZATION*/#define PMI_MAX_KEY_LEN 256#define PMI_MAX_VALUE_LEN 8192#define PMI_MAX_KVS_NAME_LENGTH 100#define PMI_INITIALIZED 0#define PMI_FINALIZED 1#define PMI_TRUE 1#define PMI_FALSE 0typedef struct pmi_process_t{ int rpmi;#ifdef HAVE_WINDOWS_H HANDLE hRootThread; HANDLE hRootThreadReadyEvent;#else int root_pid;#endif char root_host[100]; int root_port; int local_kvs; char kvs_name[PMI_MAX_KVS_NAME_LENGTH]; char domain_name[PMI_MAX_KVS_NAME_LENGTH]; MPIDU_Sock_t sock; MPIDU_Sock_set_t set; int iproc; int nproc; int init_finalized; int smpd_id; MPIDU_SOCK_NATIVE_FD smpd_fd; int smpd_key; smpd_context_t *context; int clique_size; int *clique_ranks; char host[100]; int port; int appnum;} pmi_process_t;/* global variables */static pmi_process_t pmi_process ={ PMI_FALSE, /* rpmi */#ifdef HAVE_WINDOWS_H NULL, /* root thread */ NULL, /* hRootThreadReadyEvent */#else 0, /* root pid */#endif "", /* root host */ 0, /* root port */ PMI_FALSE, /* local_kvs */ "", /* kvs_name */ "", /* domain_name */ MPIDU_SOCK_INVALID_SOCK, /* sock */ MPIDU_SOCK_INVALID_SET, /* set */ -1, /* iproc */ -1, /* nproc */ PMI_FINALIZED, /* init_finalized */ -1, /* smpd_id */ 0, /* smpd_fd */ 0, /* smpd_key */ NULL, /* context */ 0, /* clique_size */ NULL, /* clique_ranks */ "", /* host */ -1, /* port */ 0 /* appnum */};static int silence = 0;static int pmi_err_printf(char *str, ...){ int n=0; va_list list; if (!silence) { printf("[%d] ", pmi_process.iproc); va_start(list, str); n = vprintf(str, list); va_end(list); fflush(stdout); } return n;}static int pmi_mpi_err_printf(int mpi_errno, char *fmt, ... ){ int n; va_list list; /* convert the error code to a string */ printf("mpi_errno: %d\n", mpi_errno); printf("[%d] ", pmi_process.iproc); va_start(list, fmt); n = vprintf(fmt, list); va_end(list); fflush(stdout); MPIR_Err_return_comm(NULL, "", mpi_errno); return n;}static int pmi_create_post_command(const char *command, const char *name, const char *key, const char *value){ int result; smpd_command_t *cmd_ptr; int dest = 1; int add_id = 0; if (!pmi_process.rpmi) { if (strcmp(command, "done") == 0) { /* done commands go to the immediate smpd, not the root */ dest = pmi_process.smpd_id; } } if ((strcmp(command, "init") == 0) || (strcmp(command, "finalize") == 0)) { add_id = 1; dest = 0; } result = smpd_create_command((char*)command, pmi_process.smpd_id, dest, SMPD_TRUE, &cmd_ptr); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to create a %s command.\n", command); return PMI_FAIL; } result = smpd_add_command_int_arg(cmd_ptr, "ctx_key", pmi_process.smpd_key); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to add the key to the %s command.\n", command); return PMI_FAIL; } if (name != NULL) { result = smpd_add_command_arg(cmd_ptr, "name", (char*)name); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to add the kvs name('%s') to the %s command.\n", name, command); return PMI_FAIL; } } if (key != NULL) { result = smpd_add_command_arg(cmd_ptr, "key", (char*)key); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to add the key('%s') to the %s command.\n", key, command); return PMI_FAIL; } } if (value != NULL) { result = smpd_add_command_arg(cmd_ptr, "value", (char*)value); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to add the value('%s') to the %s command.\n", value, command); return PMI_FAIL; } } if (add_id) { result = smpd_add_command_int_arg(cmd_ptr, "node_id", pmi_process.smpd_id); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to add the node_id(%d) to the %s command.\n", pmi_process.smpd_id, command); return PMI_FAIL; } } /* post the write of the command */ /* printf("posting write of dbs command to %s context, sock %d: '%s'\n", smpd_get_context_str(pmi_process.context), MPIDU_Sock_getid(pmi_process.context->sock), cmd_ptr->cmd); fflush(stdout); */ result = smpd_post_write_command(pmi_process.context, cmd_ptr); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to post a write of the %s command.\n", command); return PMI_FAIL; } if (strcmp(command, "done")) { /* and post a read for the result if it is not a done command */ result = smpd_post_read_command(pmi_process.context); if (result != SMPD_SUCCESS) { pmi_err_printf("unable to post a read of the next command on the pmi context.\n"); return PMI_FAIL; } } /* let the state machine send the command and receive the result */ result = smpd_enter_at_state(pmi_process.set, SMPD_WRITING_CMD); if (result != SMPD_SUCCESS) { pmi_err_printf("the state machine logic failed to get the result of the %s command.\n", command); return PMI_FAIL; } return PMI_SUCCESS;}int iPMI_Initialized(PMI_BOOL *initialized){ if (initialized == NULL) return PMI_ERR_INVALID_ARG; if (pmi_process.init_finalized == PMI_INITIALIZED) { *initialized = PMI_TRUE; } else { *initialized = PMI_FALSE; } return PMI_SUCCESS;}static int parse_clique(const char *str_orig){ int count, i; char *str, *token; int first, last; /* count clique */ count = 0; str = strdup(str_orig); if (str == NULL) return PMI_FAIL; token = strtok(str, ","); while (token) { first = atoi(token); while (isdigit(*token)) token++; if (*token == '\0') count++; else { if (*token == '.') { token++; token++; last = atoi(token); count += last - first + 1; } else { pmi_err_printf("unexpected clique token: '%s'\n", token); free(str); return PMI_FAIL; } } token = strtok(NULL, ","); } free(str); /* allocate array */ pmi_process.clique_ranks = (int*)malloc(count * sizeof(int)); if (pmi_process.clique_ranks == NULL) return PMI_FAIL; pmi_process.clique_size = count; /* populate array */ count = 0; str = strdup(str_orig); if (str == NULL) return PMI_FAIL; token = strtok(str, ","); while (token) { first = atoi(token); while (isdigit(*token)) token++; if (*token == '\0') { pmi_process.clique_ranks[count] = first; count++; } else { if (*token == '.') { token++; token++; last = atoi(token); for (i=first; i<=last; i++) { pmi_process.clique_ranks[count] = i; count++; } } else { pmi_err_printf("unexpected clique token: '%s'\n", token); free(str); return PMI_FAIL; } } token = strtok(NULL, ","); } free(str); /* printf("clique: %d [", pmi_process.iproc); for (i=0; i<pmi_process.clique_size; i++) { printf("%d,", pmi_process.clique_ranks[i]); } printf("]\n"); fflush(stdout); */ return PMI_SUCCESS;}static int uPMI_ConnectToHost(char *host, int port, smpd_state_t state){ int result; char error_msg[MPI_MAX_ERROR_STRING]; int len; /*printf("posting a connect to %s:%d\n", host, port);fflush(stdout);*/ result = smpd_create_context(SMPD_CONTEXT_PMI, pmi_process.set, MPIDU_SOCK_INVALID_SOCK/*pmi_process.sock*/, smpd_process.id, &pmi_process.context); if (result != SMPD_SUCCESS) { pmi_err_printf("PMI_ConnectToHost failed: unable to create a context to connect to %s:%d with.\n", host, port); return PMI_FAIL; } result = MPIDU_Sock_post_connect(pmi_process.set, pmi_process.context, host, port, &pmi_process.sock); if (result != MPI_SUCCESS) { printf("MPIDU_Sock_post_connect failed.\n");fflush(stdout); len = MPI_MAX_ERROR_STRING; PMPI_Error_string(result, error_msg, &len); pmi_err_printf("PMI_ConnectToHost failed: unable to post a connect to %s:%d, error: %s\n", host, port, error_msg); printf("uPMI_ConnectToHost returning PMI_FAIL\n");fflush(stdout); return PMI_FAIL; } pmi_process.context->sock = pmi_process.sock; pmi_process.context->state = state; result = smpd_enter_at_state(pmi_process.set, state); if (result != MPI_SUCCESS) { pmi_mpi_err_printf(result, "PMI_ConnectToHost failed: unable to connect to %s:%d.\n", host, port); return PMI_FAIL; } if (state == SMPD_CONNECTING_RPMI) { /* remote pmi processes receive their smpd_key when they connect to the smpd pmi server */ pmi_process.smpd_key = atoi(pmi_process.context->session); } return SMPD_SUCCESS;}static int rPMI_Init(int *spawned){ char *p; int result; char rank_str[100], size_str[100]; char str[1024]; if (spawned == NULL) return PMI_ERR_INVALID_ARG; /* initialize to defaults */ smpd_process.id = 1; pmi_process.smpd_id = 1; pmi_process.rpmi = PMI_TRUE; pmi_process.iproc = 0; pmi_process.nproc = 1; p = getenv("PMI_ROOT_HOST"); if (p == NULL) { pmi_err_printf("unable to initialize the rPMI library: no PMI_ROOT_HOST specified.\n"); return PMI_FAIL; } strncpy(pmi_process.root_host, p, 100); p = getenv("PMI_ROOT_PORT"); if (p == NULL) { /* set to default port? */ pmi_err_printf("unable to initialize the rPMI library: no PMI_ROOT_PORT specified.\n"); return PMI_FAIL; } pmi_process.root_port = atoi(p); if (pmi_process.root_port < 1) { pmi_err_printf("invalid root port specified: %s\n", p); return PMI_FAIL; } smpd_process.port = pmi_process.root_port; strcpy(smpd_process.host, pmi_process.root_host); p = getenv("PMI_SPAWN"); if (p) { *spawned = atoi(p); } else { *spawned = 0; } p = getenv("PMI_KVS"); if (p != NULL) { /* use specified kvs name */ strncpy(pmi_process.kvs_name, p, PMI_MAX_KVS_NAME_LENGTH); strncpy(smpd_process.kvs_name, p, PMI_MAX_KVS_NAME_LENGTH); } else { /* use default kvs name */ strncpy(pmi_process.kvs_name, "default_mpich_kvs_name", PMI_MAX_KVS_NAME_LENGTH); strncpy(smpd_process.kvs_name, "default_mpich_kvs_name", PMI_MAX_KVS_NAME_LENGTH); } p = getenv("PMI_DOMAIN"); if (p != NULL) { strncpy(pmi_process.domain_name, p, PMI_MAX_KVS_NAME_LENGTH); strncpy(smpd_process.domain_name, p, PMI_MAX_KVS_NAME_LENGTH); } else { strncpy(pmi_process.domain_name, "mpich2", PMI_MAX_KVS_NAME_LENGTH); strncpy(smpd_process.domain_name, "mpich2", PMI_MAX_KVS_NAME_LENGTH); } p = getenv("PMI_RANK"); if (p != NULL) { pmi_process.iproc = atoi(p); if (pmi_process.iproc < 0) { pmi_err_printf("invalid rank %d\n", pmi_process.iproc); return PMI_FAIL; } } p = getenv("PMI_SIZE"); if (p != NULL) { pmi_process.nproc = atoi(p); if (pmi_process.nproc < 1) { pmi_err_printf("invalid size %d\n", pmi_process.nproc); return PMI_FAIL; } } smpd_process.nproc = pmi_process.nproc;#ifdef SINGLE_PROCESS_OPTIMIZATION/* leave this code #ifdef'd out so we can test rPMI stuff with one process */ if (pmi_process.nproc == 1) { pmi_process.local_kvs = PMI_TRUE; result = smpd_dbs_init(); if (result != SMPD_SUCCESS) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -