⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pls_tm_module.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,  *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * Copyright (c) 2006      Cisco Systems, Inc.  All rights reserved. * Copyright (c) 2007      Los Alamos National Security, LLC.  All rights *                         reserved.  * $COPYRIGHT$ *  * Additional copyrights may follow *  * $HEADER$ * * These symbols are in a file by themselves to provide nice linker * semantics.  Since linkers generally pull in symbols by object * files, keeping these symbols as the only symbols in this file * prevents utility programs such as "ompi_info" from having to import * entire components just to query their version and parameters. */#include "orte_config.h"#include "orte/orte_constants.h"#if HAVE_UNISTD_H#include <unistd.h>#endif#include <signal.h>#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#ifdef HAVE_SYS_STAT_H#include <sys/stat.h>#endif#ifdef HAVE_SYS_WAIT_H#include <sys/wait.h>#endif#ifdef HAVE_SCHED_H#include <sched.h>#endif#ifdef HAVE_SYS_TIME_H#include <sys/time.h>#endif#include <errno.h>#include <tm.h>#include "opal/mca/installdirs/installdirs.h"#include "opal/threads/condition.h"#include "opal/event/event.h"#include "opal/util/argv.h"#include "opal/util/output.h"#include "opal/util/opal_environ.h"#include "opal/util/show_help.h"#include "opal/util/path.h"#include "opal/util/basename.h"#include "opal/mca/base/mca_base_param.h"#include "opal/runtime/opal_progress.h"#include "orte/orte_types.h"#include "orte/runtime/runtime.h"#include "orte/runtime/orte_wait.h"#include "orte/mca/pls/pls.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/smr/smr.h"#include "orte/mca/gpr/gpr.h"#include "orte/mca/sds/base/base.h"#include "orte/mca/rmaps/rmaps.h"#include "orte/mca/rml/rml.h"#include "orte/mca/ns/ns.h"#include "orte/mca/pls/base/base.h"#include "orte/mca/pls/base/pls_private.h"#include "pls_tm.h"/* * Local functions */static int pls_tm_launch_job(orte_jobid_t jobid);static int pls_tm_terminate_job(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs);static int pls_tm_terminate_orteds(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs);static int pls_tm_terminate_proc(const orte_process_name_t *name);static int pls_tm_signal_job(orte_jobid_t jobid, int32_t signal, opal_list_t *attrs);static int pls_tm_signal_proc(const orte_process_name_t *name, int32_t signal);static int pls_tm_cancel_operation(void);static int pls_tm_finalize(void);static int pls_tm_connect(void);static int pls_tm_disconnect(void);static int pls_tm_check_path(char *exe, char **env);/* * Local variables *//* * Global variable */orte_pls_base_module_t orte_pls_tm_module = {    pls_tm_launch_job,    pls_tm_terminate_job,    pls_tm_terminate_orteds,    pls_tm_terminate_proc,    pls_tm_signal_job,    pls_tm_signal_proc,    pls_tm_cancel_operation,    pls_tm_finalize};static int pls_tm_launch_job(orte_jobid_t jobid){    orte_job_map_t *map;    opal_list_item_t *item;    size_t num_nodes;    orte_vpid_t vpid;    int node_name_index;    int proc_name_index;    char *jobid_string;    char *uri, *param;    char **env;    char *var;    char **argv;    int argc;    int rc;    bool connected = false;    uint launched = 0, i;     char *bin_base = NULL, *lib_base = NULL;    tm_event_t *tm_events = NULL;    tm_task_id *tm_task_ids = NULL;    int local_err;    tm_event_t event;    opal_list_t daemons;    orte_pls_daemon_info_t *dmn;    struct timeval launchstart, launchstop, completionstart, completionstop;    struct timeval jobstart, jobstop;    int maxtime=0, mintime=99999999, maxiter = 0, miniter = 0, deltat;    float avgtime=0.0;    mode_t current_umask;        /* check for timing request - get start time if so */    if (mca_pls_tm_component.timing) {        if (0 != gettimeofday(&jobstart, NULL)) {            opal_output(0, "pls_tm: could not obtain job start time");        }    }        /* Query the map for this job.     * We need the entire mapping for a couple of reasons:     *  - need the prefix to start with.     *  - need to know if we are launching on a subset of the allocated nodes     */    rc = orte_rmaps.get_job_map(&map, jobid);    if (ORTE_SUCCESS != rc) {        ORTE_ERROR_LOG(rc);        return rc;    }    /* if the user requested that we re-use daemons,     * launch the procs on any existing, re-usable daemons     */    if (orte_pls_base.reuse_daemons) {        if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map))) {            ORTE_ERROR_LOG(rc);            OBJ_RELEASE(map);            return rc;        }    }        num_nodes = opal_list_get_size(&map->nodes);    if (0 == num_nodes) {        /* must have been launched on existing daemons - just return */        OBJ_RELEASE(map);        return ORTE_SUCCESS;    }        /*     * Allocate a range of vpids for the daemons.     */    rc = orte_ns.reserve_range(0, num_nodes, &vpid);    if (ORTE_SUCCESS != rc) {        goto cleanup;    }    /* setup the orted triggers for passing their launch info */    if (ORTE_SUCCESS != (rc = orte_smr.init_orted_stage_gates(jobid, num_nodes, NULL, NULL))) {        ORTE_ERROR_LOG(rc);        goto cleanup;    }        /* setup a list that will contain the info for all the daemons     * so we can store it on the registry when done     */    OBJ_CONSTRUCT(&daemons, opal_list_t);        /* Allocate a bunch of TM events to use for tm_spawn()ing */    tm_events = malloc(sizeof(tm_event_t) * num_nodes);    if (NULL == tm_events) {        rc = ORTE_ERR_OUT_OF_RESOURCE;        goto cleanup;    }    tm_task_ids = malloc(sizeof(tm_task_id) * num_nodes);    if (NULL == tm_task_ids) {        rc = ORTE_ERR_OUT_OF_RESOURCE;        goto cleanup;    }    /* need integer value for command line parameter */    asprintf(&jobid_string, "%lu", (unsigned long) jobid);    /* add the daemon command (as specified by user) */    argv = opal_argv_split(mca_pls_tm_component.orted, ' ');    argc = opal_argv_count(argv);    opal_argv_append(&argc, &argv, "--no-daemonize");        /* check for debug flags */    orte_pls_base_mca_argv(&argc, &argv);    /* proxy information */    opal_argv_append(&argc, &argv, "--bootproxy");    opal_argv_append(&argc, &argv, jobid_string);    opal_argv_append(&argc, &argv, "--name");    proc_name_index = argc;    opal_argv_append(&argc, &argv, "");    /* tell the daemon how many procs are in the daemon's job */    opal_argv_append(&argc, &argv, "--num_procs");    asprintf(&param, "%lu", (unsigned long)(vpid + num_nodes));    opal_argv_append(&argc, &argv, param);    free(param);    /* tell the daemon the starting vpid of the daemon's job */    opal_argv_append(&argc, &argv, "--vpid_start");    opal_argv_append(&argc, &argv, "0");        opal_argv_append(&argc, &argv, "--nodename");    node_name_index = argc;    opal_argv_append(&argc, &argv, "");    /* pass along the universe name and location info */    opal_argv_append(&argc, &argv, "--universe");    asprintf(&param, "%s@%s:%s", orte_universe_info.uid,                orte_universe_info.host, orte_universe_info.name);    opal_argv_append(&argc, &argv, param);    free(param);        /* setup ns contact info */    opal_argv_append(&argc, &argv, "--nsreplica");    if (NULL != orte_process_info.ns_replica_uri) {        uri = strdup(orte_process_info.ns_replica_uri);    } else {        uri = orte_rml.get_uri();    }    asprintf(&param, "\"%s\"", uri);    opal_argv_append(&argc, &argv, param);    free(uri);    free(param);    /* setup gpr contact info */    opal_argv_append(&argc, &argv, "--gprreplica");    if (NULL != orte_process_info.gpr_replica_uri) {        uri = strdup(orte_process_info.gpr_replica_uri);    } else {        uri = orte_rml.get_uri();    }    asprintf(&param, "\"%s\"", uri);    opal_argv_append(&argc, &argv, param);    free(uri);    free(param);    if (mca_pls_tm_component.debug) {        param = opal_argv_join(argv, ' ');        if (NULL != param) {            opal_output(0, "pls:tm: final top-level argv:");            opal_output(0, "pls:tm:     %s", param);            free(param);        }    }    rc = pls_tm_connect();    if (ORTE_SUCCESS != rc) {        goto cleanup;    }    connected = true;    /* Figure out the basenames for the libdir and bindir.  There is a       lengthy comment about this in pls_rsh_module.c explaining all       the rationale for how / why we're doing this. */    lib_base = opal_basename(opal_install_dirs.libdir);    bin_base = opal_basename(opal_install_dirs.bindir);    /* setup environment */    env = opal_argv_copy(environ);    var = mca_base_param_environ_variable("seed",NULL,NULL);    opal_setenv(var, "0", true, &env);    /* clean out any MCA component selection directives that     * won't work on remote nodes     */    orte_pls_base_purge_mca_params(&env);    /* add our umask -- see big note in orted.c */    current_umask = umask(0);    umask(current_umask);    asprintf(&var, "0%o", current_umask);    opal_setenv("ORTE_DAEMON_UMASK_VALUE", var, true, &env);    free(var);        /* If we have a prefix, then modify the PATH and        LD_LIBRARY_PATH environment variables. We only allow        a single prefix to be specified. Since there will        always be at least one app_context, we take it from        there    */    if (NULL != map->apps[0]->prefix_dir) {        char *newenv;                for (i = 0; NULL != env && NULL != env[i]; ++i) {            /* Reset PATH */            if (0 == strncmp("PATH=", env[i], 5)) {                asprintf(&newenv, "%s/%s:%s",                             map->apps[0]->prefix_dir, bin_base, env[i] + 5);                if (mca_pls_tm_component.debug) {                    opal_output(0, "pls:tm: resetting PATH: %s",                                 newenv);                }                opal_setenv("PATH", newenv, true, &env);                free(newenv);            }                         /* Reset LD_LIBRARY_PATH */            else if (0 == strncmp("LD_LIBRARY_PATH=", env[i], 16)) {                asprintf(&newenv, "%s/%s:%s",                             map->apps[0]->prefix_dir, lib_base, env[i] + 16);                if (mca_pls_tm_component.debug) {                    opal_output(0, "pls:tm: resetting LD_LIBRARY_PATH: %s",                                 newenv);                }                opal_setenv("LD_LIBRARY_PATH", newenv, true, &env);                free(newenv);            }         }    }        /* Do a quick sanity check to ensure that we can find the        orted in the PATH */        if (ORTE_SUCCESS !=         (rc = pls_tm_check_path(argv[0], env))) {        ORTE_ERROR_LOG(rc);        opal_show_help("help-pls-tm.txt", "daemon-not-found",                        true, argv[0]);        goto cleanup;    }            /* Iterate through each of the nodes and spin     * up a daemon.     */    for (item =  opal_list_get_first(&map->nodes);         item != opal_list_get_end(&map->nodes);         item =  opal_list_get_next(item)) {        orte_mapped_node_t* node = (orte_mapped_node_t*)item;        orte_process_name_t* name;        char* name_string;                /* new daemon - setup to record its info */        dmn = OBJ_NEW(orte_pls_daemon_info_t);        dmn->active_job = jobid;        opal_list_append(&daemons, &dmn->super);                /* setup node name */        free(argv[node_name_index]);        argv[node_name_index] = strdup(node->nodename);                /* record the node name in the daemon struct */        dmn->cell = node->cell;        dmn->nodename = strdup(node->nodename);                /* initialize daemons process name */        rc = orte_ns.create_process_name(&name, node->cell, 0, vpid);        if (ORTE_SUCCESS != rc) {            ORTE_ERROR_LOG(rc);            goto cleanup;        }                /* save it in the daemon struct */        if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(dmn->name), name, ORTE_NAME))) {            ORTE_ERROR_LOG(rc);            goto cleanup;        }                /* setup per-node options */        if (mca_pls_tm_component.debug ||

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -