⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pls_rsh_module.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 4 页
字号:
            }            /* close all file descriptors w/ exception of stdin/stdout/stderr */            for(fd=3; fd<fdmax; fd++)                close(fd);            /* Set signal handlers back to the default.  Do this close                to the execve() because the event library may (and likely                will) reset them.  If we don't do this, the event                library may have left some set that, at least on some                OS's, don't get reset via fork() or exec().  Hence, the                orted could be unkillable (for example). */            set_handler_default(SIGTERM);            set_handler_default(SIGINT);            set_handler_default(SIGHUP);            set_handler_default(SIGPIPE);            set_handler_default(SIGCHLD);                        /* Unblock all signals, for many of the same reasons that                we set the default handlers, above.  This is noticable                on Linux where the event library blocks SIGTERM, but we                don't want that blocked by the orted (or, more                specifically, we don't want it to be blocked by the                orted and then inherited by the ORTE processes that it                forks, making them unkillable by SIGTERM). */            sigprocmask(0, 0, &sigs);            sigprocmask(SIG_UNBLOCK, &sigs, 0);                        /* setup environment */            env = opal_argv_copy(environ);            var = mca_base_param_environ_variable("seed",NULL,NULL);            opal_setenv(var, "0", true, &env);            /* exec the daemon */            if (mca_pls_rsh_component.debug) {                param = opal_argv_join(exec_argv, ' ');                if (NULL != param) {                    char* env_array = opal_argv_join( env, ' ' );                    opal_output(0, "pls:rsh: executing: (%s) %s [%s]",                                exec_path, param, env_array);                    free(param); free(env_array);                }            }            execve(exec_path, exec_argv, env);            opal_output(0, "pls:rsh: execv of %s failed with errno=%s(%d)\n",                        exec_path, strerror(errno), errno);            exit(-1);        } else { /* father */            OPAL_THREAD_LOCK(&mca_pls_rsh_component.lock);            /* JJH Bug:             * If we are in '--debug-daemons' we keep the ssh connection              * alive for the span of the run. If we use this option              * AND we launch on more than "num_concurrent" machines             * then we will deadlock. No connections are terminated              * until the job is complete, no job is started             * since all the orteds are waiting for all the others             * to come online, and the others ore not launched because             * we are waiting on those that have started to terminate             * their ssh tunnels. :(             */            if (mca_pls_rsh_component.num_children++ >=                mca_pls_rsh_component.num_concurrent) {                opal_condition_wait(&mca_pls_rsh_component.cond, &mca_pls_rsh_component.lock);            }            OPAL_THREAD_UNLOCK(&mca_pls_rsh_component.lock);                        /* setup callback on sigchild - wait until setup above is complete             * as the callback can occur in the call to orte_wait_cb             */            orte_wait_cb(pid, orte_pls_rsh_wait_daemon, dmn);            /* if required - add delay to avoid problems w/ X11 authentication */            if (mca_pls_rsh_component.debug && mca_pls_rsh_component.delay) {                sleep(mca_pls_rsh_component.delay);            }            vpid++;        }        free(name);    }        /* all done, so store the daemon info on the registry */    if (ORTE_SUCCESS != (rc = orte_pls_base_store_active_daemons(&active_daemons))) {        ORTE_ERROR_LOG(rc);    }cleanup:    OBJ_RELEASE(map);    if (NULL != lib_base) {        free(lib_base);    }    if (NULL != bin_base) {        free(bin_base);    }    if (NULL != jobid_string) free(jobid_string);  /* done with this variable */    if (NULL != argv) opal_argv_free(argv);    return rc;}/** * Terminate all processes for a given job */int orte_pls_rsh_terminate_job(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs){    int rc;    opal_list_t daemons;    opal_list_item_t *item;        OPAL_TRACE(1);        /* construct the list of active daemons on this job */    OBJ_CONSTRUCT(&daemons, opal_list_t);    if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid, attrs))) {        ORTE_ERROR_LOG(rc);        goto CLEANUP;    }        /* order them to kill their local procs for this job */    if (ORTE_SUCCESS != (rc = orte_pls_base_orted_kill_local_procs(&daemons, jobid, timeout))) {        ORTE_ERROR_LOG(rc);        goto CLEANUP;    }    CLEANUP:    while (NULL != (item = opal_list_remove_first(&daemons))) {        OBJ_RELEASE(item);    }    OBJ_DESTRUCT(&daemons);    return rc;}/*** Terminate the orteds for a given job */int orte_pls_rsh_terminate_orteds(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs){    int rc;    opal_list_t daemons;    opal_list_item_t *item;        OPAL_TRACE(1);        /* construct the list of active daemons on this job */    OBJ_CONSTRUCT(&daemons, opal_list_t);    if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid, attrs))) {        ORTE_ERROR_LOG(rc);        goto CLEANUP;    }        /* now tell them to die! */    if (ORTE_SUCCESS != (rc = orte_pls_base_orted_exit(&daemons, timeout))) {        ORTE_ERROR_LOG(rc);    }    CLEANUP:    while (NULL != (item = opal_list_remove_first(&daemons))) {        OBJ_RELEASE(item);    }    OBJ_DESTRUCT(&daemons);    return rc;}/* * Terminate a specific process */int orte_pls_rsh_terminate_proc(const orte_process_name_t* proc){    OPAL_TRACE(1);        return ORTE_ERR_NOT_IMPLEMENTED;}int orte_pls_rsh_signal_job(orte_jobid_t jobid, int32_t signal, opal_list_t *attrs){    int rc;    opal_list_t daemons;    opal_list_item_t *item;        OPAL_TRACE(1);        /* construct the list of active daemons on this job */    OBJ_CONSTRUCT(&daemons, opal_list_t);    if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid, attrs))) {        ORTE_ERROR_LOG(rc);        OBJ_DESTRUCT(&daemons);        return rc;    }        /* order them to pass this signal to their local procs */    if (ORTE_SUCCESS != (rc = orte_pls_base_orted_signal_local_procs(&daemons, signal))) {        ORTE_ERROR_LOG(rc);    }        while (NULL != (item = opal_list_remove_first(&daemons))) {        OBJ_RELEASE(item);    }    OBJ_DESTRUCT(&daemons);    return rc;}int orte_pls_rsh_signal_proc(const orte_process_name_t* proc, int32_t signal){    OPAL_TRACE(1);        return ORTE_ERR_NOT_IMPLEMENTED;}/** * Cancel an operation involving comm to an orted */int orte_pls_rsh_cancel_operation(void){    int rc;        OPAL_TRACE(1);        if (ORTE_SUCCESS != (rc = orte_pls_base_orted_cancel_operation())) {        ORTE_ERROR_LOG(rc);    }        return rc;}int orte_pls_rsh_finalize(void){    int rc;        /* cleanup any pending recvs */    if (ORTE_SUCCESS != (rc = orte_pls_base_comm_stop())) {        ORTE_ERROR_LOG(rc);    }    return rc;}/** * Handle threading issues. */#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS && OMPI_ENABLE_PROGRESS_THREADSstruct orte_pls_rsh_stack_t {    opal_condition_t cond;    opal_mutex_t mutex;    bool complete;    orte_jobid_t jobid;    int rc;};typedef struct orte_pls_rsh_stack_t orte_pls_rsh_stack_t;static void orte_pls_rsh_stack_construct(orte_pls_rsh_stack_t* stack){    OBJ_CONSTRUCT(&stack->mutex, opal_mutex_t);    OBJ_CONSTRUCT(&stack->cond, opal_condition_t);    stack->rc = 0;    stack->complete = false;}static void orte_pls_rsh_stack_destruct(orte_pls_rsh_stack_t* stack){    OBJ_DESTRUCT(&stack->mutex);    OBJ_DESTRUCT(&stack->cond);}static OBJ_CLASS_INSTANCE(    orte_pls_rsh_stack_t,    opal_object_t,    orte_pls_rsh_stack_construct,    orte_pls_rsh_stack_destruct);static void orte_pls_rsh_launch_cb(int fd, short event, void* args){    orte_pls_rsh_stack_t *stack = (orte_pls_rsh_stack_t*)args;    OPAL_THREAD_LOCK(&stack->mutex);    stack->rc = orte_pls_rsh_launch(stack->jobid);    stack->complete = true;    opal_condition_signal(&stack->cond);    OPAL_THREAD_UNLOCK(&stack->mutex);}static int orte_pls_rsh_launch_threaded(orte_jobid_t jobid){    struct timeval tv = { 0, 0 };    struct opal_event event;    struct orte_pls_rsh_stack_t stack;    OBJ_CONSTRUCT(&stack, orte_pls_rsh_stack_t);    stack.jobid = jobid;    if( opal_event_progress_thread() ) {        stack.rc = orte_pls_rsh_launch( jobid );    } else {        opal_evtimer_set(&event, orte_pls_rsh_launch_cb, &stack);        opal_evtimer_add(&event, &tv);        OPAL_THREAD_LOCK(&stack.mutex);        while (stack.complete == false) {            opal_condition_wait(&stack.cond, &stack.mutex);        }        OPAL_THREAD_UNLOCK(&stack.mutex);    }    OBJ_DESTRUCT(&stack);    return stack.rc;}#endifstatic void set_handler_default(int sig){    struct sigaction act;    act.sa_handler = SIG_DFL;    act.sa_flags = 0;    sigemptyset(&act.sa_mask);    sigaction(sig, &act, (struct sigaction *)0);}static orte_pls_rsh_shell_t find_shell(char *shell) {    int i         = 0;    char *sh_name = NULL;    sh_name = rindex(shell, '/');    /* skip the '/' */    ++sh_name;    for (i = 0; i < (int)(sizeof (orte_pls_rsh_shell_name) /                          sizeof(orte_pls_rsh_shell_name[0])); ++i) {        if (0 == strcmp(sh_name, orte_pls_rsh_shell_name[i])) {            return i;        }    }    /* We didn't find it */    return ORTE_PLS_RSH_SHELL_UNKNOWN;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -