⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pls_bproc.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- C -*- * * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * Copyright (c) 2007      Los Alamos National Security, LLC.  All rights *                         reserved.  * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * *//** * @file: * Part of the bproc launcher. See pls_bproc.h for an overview of how it works. */#include "orte_config.h"#if HAVE_SYS_TYPES_H#include <sys/types.h>#endif  /* HAVE_SYS_TYPES_H */#ifdef HAVE_SYS_STAT_H#include <sys/stat.h>#endif  /* HAVE_SYS_STAT_H */#ifdef HAVE_UNISTD_H#include <unistd.h>#endif  /* HAVE_UNISTD_H */#include <errno.h>#include <signal.h>#ifdef HAVE_FCNTL_H#include <fcntl.h>#endif  /* HAVE_FCNTL_H */#ifdef HAVE_STRING_H#include <string.h>#endif  /* HAVE_STRING_H */#ifdef HAVE_SYS_TIME_H#include <sys/time.h>#endif#include "opal/mca/installdirs/installdirs.h"#include "opal/class/opal_list.h"#include "opal/class/opal_list.h"#include "opal/event/event.h"#include "opal/mca/base/mca_base_param.h"#include "opal/util/argv.h"#include "opal/util/output.h"#include "opal/util/opal_environ.h"#include "opal/util/path.h"#include "opal/util/os_path.h"#include "opal/util/show_help.h"#include "opal/util/trace.h"#include "orte/dss/dss.h"#include "orte/util/sys_info.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/iof/iof.h"#include "orte/mca/gpr/gpr.h"#include "orte/mca/ns/ns.h"#include "orte/mca/sds/base/base.h"#include "orte/mca/oob/base/base.h"#include "orte/mca/ras/ras.h"#include "orte/mca/rmgr/rmgr.h"#include "orte/mca/rmaps/rmaps.h"#include "orte/mca/rml/rml.h"#include "orte/mca/schema/schema_types.h"#include "orte/mca/smr/smr.h"#include "orte/runtime/orte_wait.h"#include "orte/runtime/runtime.h"#include "orte/runtime/params.h"#include "orte/mca/pls/base/pls_private.h"#include "pls_bproc.h"static bool daemons_launched;static bool bynode;#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDSint orte_pls_bproc_launch_threaded(orte_jobid_t);#endif/** * Initialization of the bproc module with all the needed function pointers */orte_pls_base_module_t orte_pls_bproc_module = {#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS    orte_pls_bproc_launch_threaded,#else    orte_pls_bproc_launch,#endif    orte_pls_bproc_terminate_job,    orte_pls_bproc_terminate_orteds,    orte_pls_bproc_terminate_proc,    orte_pls_bproc_signal_job,    orte_pls_bproc_signal_proc,    orte_pls_bproc_cancel_operation,    orte_pls_bproc_finalize};static int orte_pls_bproc_node_list(orte_job_map_t *map,                                    int *node_array, int * num_nodes,                                    int num_procs);static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,                                   int node_rank, int app_context);static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data);static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data);#ifdef MCA_pls_bproc_scyld/* compatibility functions for scyld bproc and pre 3.2.0 LANL bproc */static int bproc_vexecmove_io(int nnodes, int *nodes, int *pids,                              struct bproc_io_t *io, int iolen, const char *cmd,                              char * const argv[], char * envp[]);static int bproc_vexecmove(int nnodes, int *nodes, int *pids, const char *cmd,                           char * const argv[], char * envp[]);#endifstatic void orte_pls_bproc_setup_env(char *** env);static int orte_pls_bproc_launch_daemons(orte_job_map_t *map, char ***envp);static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots,                                     orte_vpid_t vpid_start, int app_context);/** * Creates a list of nodes from a job map that should participate in the next launch cycle. * @param map a pointer to the job map * @param node_array a pointer to an integer array that will contain the node names * @param num_nodes a pointer to the place where we will store the number of nodes in the array * @param num_procs the number of processes that a node must have to be placed on the list */static int orte_pls_bproc_node_list(orte_job_map_t *map, int *node_array, int *num_nodes, int num_procs){    opal_list_item_t *item;    orte_mapped_node_t *node;        OPAL_TRACE(1);        /* initialize all */    *num_nodes = 0;    memset((void*)node_array, -1, sizeof(int) * map->num_nodes);    /* build the node list */    for(item = opal_list_get_first(&map->nodes);        item != opal_list_get_end(&map->nodes);        item = opal_list_get_next(item)) {        node = (orte_mapped_node_t*)item;                if (node->num_procs >= num_procs) {            node_array[(*num_nodes)++] = atoi(node->nodename);        }    }    return ORTE_SUCCESS;}/** * Sets up the bproc io structs for the specified rank on the nodes * * @param jobid * @param io A pointer to an array of 3 bproc_io_t structs * @param node_rank the rank on the node we are setting up the structs for * @param app_context the application context number * @retval ORTE_SUCCESS * @retval error */static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,                                   int node_rank, int app_context) {    char *frontend = NULL, *path = NULL, *job = NULL;    int rc, i;    OPAL_TRACE(1);        /* ensure that system info is set */    orte_sys_info();    if (NULL == orte_system_info.user) { /* error condition */        return ORTE_ERROR;    }    if (NULL == orte_universe_info.name) {  /* error condition */        return ORTE_ERROR;    }    rc = orte_ns.convert_jobid_to_string(&job, jobid);    if(ORTE_SUCCESS != rc) {        ORTE_ERROR_LOG(rc);        goto cleanup;    }    /* build the directory tree the io files will be in */    if (0 > asprintf(&frontend, OPAL_PATH_SEP"tmp"OPAL_PATH_SEP"openmpi-bproc-%s"OPAL_PATH_SEP"%s"OPAL_PATH_SEP"%s-%d"OPAL_PATH_SEP"%d",                     orte_system_info.user, orte_universe_info.name, job,                     app_context, node_rank)) {        rc = ORTE_ERR_OUT_OF_RESOURCE;        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        goto cleanup;    }    for(i = 0; i < 3; i++)  {        if(0 > asprintf(&path, "%s"OPAL_PATH_SEP"%d", frontend, i)) {            rc = ORTE_ERR_OUT_OF_RESOURCE;            ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);            goto cleanup;        }        if (mca_pls_bproc_component.debug) {            opal_output(0, "mpirun bproc io setup. Path: %s\n", path);        }        io[i].fd = i;        io[i].type = BPROC_IO_FILE;#if defined BPROC_API_VERSION && BPROC_API_VERSION >= 4        io[i].flags = 0;#else        io[i].send_info = 0;#endif        if(0 == i) {            io[i].d.file.flags = O_RDONLY;        } else {            io[i].d.file.flags = O_WRONLY;        }        io[i].d.file.offset = 0;        io[i].d.file.mode = 0;        strncpy(io[i].d.file.name, path, 256);        free(path);    } cleanup:    if (NULL != frontend) {       free(frontend);    }    if (NULL != job) {        free(job);    }    return rc;}/** * Callback for orte_wait_cb. This function ONLY gets called for * normal termination, or termination caused by a signal. If the * process abnormally terminates by other than a signal, we go through * another function so it can tell us that it was abnormal. * Bproc doesn't really let us do it through here. * @param wpid the process's pid * @param status tells why the process died * @param data a pointer to the process's name */static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {    orte_process_name_t * proc = (orte_process_name_t*) data;    int rc;        OPAL_TRACE(1);        /* set the state of this process */    if(WIFEXITED(status)) {            rc = orte_smr.set_proc_state(proc, ORTE_PROC_STATE_TERMINATED, status);    } else {        rc = orte_smr.set_proc_state(proc, ORTE_PROC_STATE_ABORTED, status);    }    if(ORTE_SUCCESS != rc) {        ORTE_ERROR_LOG(rc);    }    free(proc);}/** * Callback for orte_wait_cb for the daemons. If a daemon unexpectedly dies * before we are done launching, we abort the job. * @param wpid the daemons's pid * @param status tells why the daemon died * @param data a pointer to the node the daemon was on */static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data) {    OPAL_TRACE(1);         if(!daemons_launched) {        /* if a daemon exits before we are done launching the user apps we send a         * message to ourself so we will break out of the receive loop and exit */        orte_buffer_t ack;        int rc;        int src[4] = {-1, -1};        src[2] = wpid;        src[3] = *(int*)data;        if(WIFSIGNALED(status)) {            src[1] = WTERMSIG(status);        }        OBJ_CONSTRUCT(&ack, orte_buffer_t);        rc = orte_dss.pack(&ack, &src, 4, ORTE_INT);        if(ORTE_SUCCESS != rc) {            ORTE_ERROR_LOG(rc);        }        rc = mca_oob_send_packed(ORTE_PROC_MY_NAME, &ack, ORTE_RML_TAG_BPROC, 0);        if(0 > rc) {            ORTE_ERROR_LOG(rc);        }    }    OPAL_THREAD_LOCK(&mca_pls_bproc_component.lock);    if(0 < mca_pls_bproc_component.num_daemons) {        mca_pls_bproc_component.num_daemons--;    }    opal_condition_signal(&mca_pls_bproc_component.condition);    OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);    if(0 < mca_pls_bproc_component.debug) {        opal_output(0, "in orte_pls_bproc_waitpid_daemon_cb, %d daemons left\n",                    mca_pls_bproc_component.num_daemons);    }}#ifdef MCA_pls_bproc_scyld/** * compatibility function for scyld bproc and pre 3.2.0 LANL bproc. See the * bproc documentation for details */static int bproc_vexecmove_io(int nnodes, int *nodes, int *pids,                              struct bproc_io_t *io, int iolen, const char *cmd,                              char * const argv[], char * envp[]) {    int i;    char * rank;    OPAL_TRACE(1);        for(i = 0; i < nnodes; i++) {        pids[i] = fork();        if(0 == pids[i]) {            /* set BPROC_RANK so the proc can get its name */            if (0 > asprintf(&rank, "%d", i)) {                ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                exit(-1);            }            opal_setenv("BPROC_RANK", rank, true, &envp);            bproc_execmove_io(nodes[i], io, iolen, cmd, argv, envp);            /* if we get here, there was an error */            opal_show_help("help-pls-bproc.txt", "bproc-vexecmove-launch", true,                           cmd, nodes[i], errno);            ORTE_ERROR_LOG(ORTE_ERROR);            exit(-1);        } else if(-1 == pids[i]) {            opal_show_help("help-pls-bproc.txt", "bproc-vexecmove-fork", true,                           errno);            ORTE_ERROR_LOG(ORTE_ERROR);            return -1;        }    }    return nnodes;}/** * compatibility function for scyld bproc and pre 3.2.0 LANL bproc. See the * bproc documentation for details */static int bproc_vexecmove(int nnodes, int *nodes, int *pids, const char *cmd,                           char * const argv[], char * envp[]) {    return bproc_vexecmove_io(nnodes, nodes, pids, NULL, 0, cmd, argv, envp);}#endif/**

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -