📄 pls_bproc.c
字号:
/* -*- C -*- * * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2007 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * *//** * @file: * Part of the bproc launcher. See pls_bproc.h for an overview of how it works. */#include "orte_config.h"#if HAVE_SYS_TYPES_H#include <sys/types.h>#endif /* HAVE_SYS_TYPES_H */#ifdef HAVE_SYS_STAT_H#include <sys/stat.h>#endif /* HAVE_SYS_STAT_H */#ifdef HAVE_UNISTD_H#include <unistd.h>#endif /* HAVE_UNISTD_H */#include <errno.h>#include <signal.h>#ifdef HAVE_FCNTL_H#include <fcntl.h>#endif /* HAVE_FCNTL_H */#ifdef HAVE_STRING_H#include <string.h>#endif /* HAVE_STRING_H */#ifdef HAVE_SYS_TIME_H#include <sys/time.h>#endif#include "opal/mca/installdirs/installdirs.h"#include "opal/class/opal_list.h"#include "opal/class/opal_list.h"#include "opal/event/event.h"#include "opal/mca/base/mca_base_param.h"#include "opal/util/argv.h"#include "opal/util/output.h"#include "opal/util/opal_environ.h"#include "opal/util/path.h"#include "opal/util/os_path.h"#include "opal/util/show_help.h"#include "opal/util/trace.h"#include "orte/dss/dss.h"#include "orte/util/sys_info.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/iof/iof.h"#include "orte/mca/gpr/gpr.h"#include "orte/mca/ns/ns.h"#include "orte/mca/sds/base/base.h"#include "orte/mca/oob/base/base.h"#include "orte/mca/ras/ras.h"#include "orte/mca/rmgr/rmgr.h"#include "orte/mca/rmaps/rmaps.h"#include "orte/mca/rml/rml.h"#include "orte/mca/schema/schema_types.h"#include "orte/mca/smr/smr.h"#include "orte/runtime/orte_wait.h"#include "orte/runtime/runtime.h"#include "orte/runtime/params.h"#include "orte/mca/pls/base/pls_private.h"#include "pls_bproc.h"static bool daemons_launched;static bool bynode;#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDSint orte_pls_bproc_launch_threaded(orte_jobid_t);#endif/** * Initialization of the bproc module with all the needed function pointers */orte_pls_base_module_t orte_pls_bproc_module = {#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS orte_pls_bproc_launch_threaded,#else orte_pls_bproc_launch,#endif orte_pls_bproc_terminate_job, orte_pls_bproc_terminate_orteds, orte_pls_bproc_terminate_proc, orte_pls_bproc_signal_job, orte_pls_bproc_signal_proc, orte_pls_bproc_cancel_operation, orte_pls_bproc_finalize};static int orte_pls_bproc_node_list(orte_job_map_t *map, int *node_array, int * num_nodes, int num_procs);static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io, int node_rank, int app_context);static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data);static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data);#ifdef MCA_pls_bproc_scyld/* compatibility functions for scyld bproc and pre 3.2.0 LANL bproc */static int bproc_vexecmove_io(int nnodes, int *nodes, int *pids, struct bproc_io_t *io, int iolen, const char *cmd, char * const argv[], char * envp[]);static int bproc_vexecmove(int nnodes, int *nodes, int *pids, const char *cmd, char * const argv[], char * envp[]);#endifstatic void orte_pls_bproc_setup_env(char *** env);static int orte_pls_bproc_launch_daemons(orte_job_map_t *map, char ***envp);static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots, orte_vpid_t vpid_start, int app_context);/** * Creates a list of nodes from a job map that should participate in the next launch cycle. * @param map a pointer to the job map * @param node_array a pointer to an integer array that will contain the node names * @param num_nodes a pointer to the place where we will store the number of nodes in the array * @param num_procs the number of processes that a node must have to be placed on the list */static int orte_pls_bproc_node_list(orte_job_map_t *map, int *node_array, int *num_nodes, int num_procs){ opal_list_item_t *item; orte_mapped_node_t *node; OPAL_TRACE(1); /* initialize all */ *num_nodes = 0; memset((void*)node_array, -1, sizeof(int) * map->num_nodes); /* build the node list */ for(item = opal_list_get_first(&map->nodes); item != opal_list_get_end(&map->nodes); item = opal_list_get_next(item)) { node = (orte_mapped_node_t*)item; if (node->num_procs >= num_procs) { node_array[(*num_nodes)++] = atoi(node->nodename); } } return ORTE_SUCCESS;}/** * Sets up the bproc io structs for the specified rank on the nodes * * @param jobid * @param io A pointer to an array of 3 bproc_io_t structs * @param node_rank the rank on the node we are setting up the structs for * @param app_context the application context number * @retval ORTE_SUCCESS * @retval error */static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io, int node_rank, int app_context) { char *frontend = NULL, *path = NULL, *job = NULL; int rc, i; OPAL_TRACE(1); /* ensure that system info is set */ orte_sys_info(); if (NULL == orte_system_info.user) { /* error condition */ return ORTE_ERROR; } if (NULL == orte_universe_info.name) { /* error condition */ return ORTE_ERROR; } rc = orte_ns.convert_jobid_to_string(&job, jobid); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } /* build the directory tree the io files will be in */ if (0 > asprintf(&frontend, OPAL_PATH_SEP"tmp"OPAL_PATH_SEP"openmpi-bproc-%s"OPAL_PATH_SEP"%s"OPAL_PATH_SEP"%s-%d"OPAL_PATH_SEP"%d", orte_system_info.user, orte_universe_info.name, job, app_context, node_rank)) { rc = ORTE_ERR_OUT_OF_RESOURCE; ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); goto cleanup; } for(i = 0; i < 3; i++) { if(0 > asprintf(&path, "%s"OPAL_PATH_SEP"%d", frontend, i)) { rc = ORTE_ERR_OUT_OF_RESOURCE; ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); goto cleanup; } if (mca_pls_bproc_component.debug) { opal_output(0, "mpirun bproc io setup. Path: %s\n", path); } io[i].fd = i; io[i].type = BPROC_IO_FILE;#if defined BPROC_API_VERSION && BPROC_API_VERSION >= 4 io[i].flags = 0;#else io[i].send_info = 0;#endif if(0 == i) { io[i].d.file.flags = O_RDONLY; } else { io[i].d.file.flags = O_WRONLY; } io[i].d.file.offset = 0; io[i].d.file.mode = 0; strncpy(io[i].d.file.name, path, 256); free(path); } cleanup: if (NULL != frontend) { free(frontend); } if (NULL != job) { free(job); } return rc;}/** * Callback for orte_wait_cb. This function ONLY gets called for * normal termination, or termination caused by a signal. If the * process abnormally terminates by other than a signal, we go through * another function so it can tell us that it was abnormal. * Bproc doesn't really let us do it through here. * @param wpid the process's pid * @param status tells why the process died * @param data a pointer to the process's name */static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) { orte_process_name_t * proc = (orte_process_name_t*) data; int rc; OPAL_TRACE(1); /* set the state of this process */ if(WIFEXITED(status)) { rc = orte_smr.set_proc_state(proc, ORTE_PROC_STATE_TERMINATED, status); } else { rc = orte_smr.set_proc_state(proc, ORTE_PROC_STATE_ABORTED, status); } if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } free(proc);}/** * Callback for orte_wait_cb for the daemons. If a daemon unexpectedly dies * before we are done launching, we abort the job. * @param wpid the daemons's pid * @param status tells why the daemon died * @param data a pointer to the node the daemon was on */static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data) { OPAL_TRACE(1); if(!daemons_launched) { /* if a daemon exits before we are done launching the user apps we send a * message to ourself so we will break out of the receive loop and exit */ orte_buffer_t ack; int rc; int src[4] = {-1, -1}; src[2] = wpid; src[3] = *(int*)data; if(WIFSIGNALED(status)) { src[1] = WTERMSIG(status); } OBJ_CONSTRUCT(&ack, orte_buffer_t); rc = orte_dss.pack(&ack, &src, 4, ORTE_INT); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } rc = mca_oob_send_packed(ORTE_PROC_MY_NAME, &ack, ORTE_RML_TAG_BPROC, 0); if(0 > rc) { ORTE_ERROR_LOG(rc); } } OPAL_THREAD_LOCK(&mca_pls_bproc_component.lock); if(0 < mca_pls_bproc_component.num_daemons) { mca_pls_bproc_component.num_daemons--; } opal_condition_signal(&mca_pls_bproc_component.condition); OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock); if(0 < mca_pls_bproc_component.debug) { opal_output(0, "in orte_pls_bproc_waitpid_daemon_cb, %d daemons left\n", mca_pls_bproc_component.num_daemons); }}#ifdef MCA_pls_bproc_scyld/** * compatibility function for scyld bproc and pre 3.2.0 LANL bproc. See the * bproc documentation for details */static int bproc_vexecmove_io(int nnodes, int *nodes, int *pids, struct bproc_io_t *io, int iolen, const char *cmd, char * const argv[], char * envp[]) { int i; char * rank; OPAL_TRACE(1); for(i = 0; i < nnodes; i++) { pids[i] = fork(); if(0 == pids[i]) { /* set BPROC_RANK so the proc can get its name */ if (0 > asprintf(&rank, "%d", i)) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); exit(-1); } opal_setenv("BPROC_RANK", rank, true, &envp); bproc_execmove_io(nodes[i], io, iolen, cmd, argv, envp); /* if we get here, there was an error */ opal_show_help("help-pls-bproc.txt", "bproc-vexecmove-launch", true, cmd, nodes[i], errno); ORTE_ERROR_LOG(ORTE_ERROR); exit(-1); } else if(-1 == pids[i]) { opal_show_help("help-pls-bproc.txt", "bproc-vexecmove-fork", true, errno); ORTE_ERROR_LOG(ORTE_ERROR); return -1; } } return nnodes;}/** * compatibility function for scyld bproc and pre 3.2.0 LANL bproc. See the * bproc documentation for details */static int bproc_vexecmove(int nnodes, int *nodes, int *pids, const char *cmd, char * const argv[], char * envp[]) { return bproc_vexecmove_io(nnodes, nodes, pids, NULL, 0, cmd, argv, envp);}#endif/**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -