⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 odls_bproc.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
    }    if (tcgetattr(aslave, &term_attrs) < 0) {        rc = ORTE_ERROR;        ORTE_ERROR_LOG(rc);        goto cleanup;    }    term_attrs.c_lflag &= ~ (ECHO | ECHOE | ECHOK |                             ECHOCTL | ECHOKE | ECHONL);    term_attrs.c_iflag &= ~ (ICRNL | INLCR | ISTRIP | INPCK | IXON);    term_attrs.c_oflag &= ~ (OCRNL | ONLCR);    if (tcsetattr(aslave, TCSANOW, &term_attrs) == -1) {        rc = ORTE_ERROR;        ORTE_ERROR_LOG(rc);        goto cleanup;    }     orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE,                         ORTE_IOF_STDOUT, amaster);    goto stderr_fifo_setup;stdout_fifo_setup:#endif    if (0 != mkfifo(fd_link_path, S_IRWXU)) {         perror("odls_bproc mkfifo failed");         rc = ORTE_ERROR;         goto cleanup;    }    fd = open(fd_link_path, O_RDWR);    if (-1 == fd) {        perror("odls_bproc open failed");        rc = ORTE_ERROR;        goto cleanup;    }    orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE,                         ORTE_IOF_STDOUT, fd);#if defined(HAVE_OPENPTY) && (OMPI_ENABLE_PTY_SUPPORT != 0)stderr_fifo_setup:#endif    free(fd_link_path);    fd_link_path = NULL;    /* setup the stderr FIFO.  Always a fifo */    fd_link_path = opal_os_path( false, path_prefix, "2", NULL );    if (NULL == fd_link_path) {        rc = ORTE_ERROR;        ORTE_ERROR_LOG(rc);        goto cleanup;    }    if (0 != mkfifo(fd_link_path, S_IRWXU)) {         perror("odls_bproc mkfifo failed");         rc = ORTE_ERROR;         goto cleanup;    }    fd = open(fd_link_path, O_RDWR);    if (-1 == fd) {        perror("odls_bproc open failed");        rc = ORTE_ERROR;        goto cleanup;    }    orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE,                         ORTE_IOF_STDERR, fd);cleanup:    if (NULL != path_prefix) {       free(path_prefix);    }    if (NULL != fd_link_path) {        free(fd_link_path);    }    return rc;}/* this entire function gets called within a GPR compound command, * so the subscription actually doesn't get done until the orted * executes the compound command */int orte_odls_bproc_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc){    char *segment;    orte_gpr_value_t *values[1];    orte_gpr_subscription_t *subs, sub=ORTE_GPR_SUBSCRIPTION_EMPTY;    orte_gpr_trigger_t *trigs, trig=ORTE_GPR_TRIGGER_EMPTY;    char* keys[] = {        ORTE_PROC_NAME_KEY,        ORTE_PROC_APP_CONTEXT_KEY,        ORTE_NODE_NAME_KEY,    };    int num_keys = 3;    int i, rc;        /* get the job segment name */    if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) {        ORTE_ERROR_LOG(rc);        return rc;    }        /* attach ourselves to the "standard" orted trigger */    if (ORTE_SUCCESS !=        (rc = orte_schema.get_std_trigger_name(&(trig.name),                                               ORTED_LAUNCH_STAGE_GATE_TRIGGER, job))) {        ORTE_ERROR_LOG(rc);        free(segment);        return rc;    }        /* ask for return of all data required for launching local processes */    subs = &sub;    sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG;    if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&(sub.name),                                                                    ORTED_LAUNCH_STG_SUB,                                                                    job))) {        ORTE_ERROR_LOG(rc);        free(segment);        free(trig.name);        return rc;    }    sub.cnt = 1;    sub.values = values;        if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,                                                    segment, num_keys, 0))) {        ORTE_ERROR_LOG(rc);        free(segment);        free(sub.name);        free(trig.name);        return rc;    }    for (i=0; i < num_keys; i++) {        if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[0]->keyvals[i]),                                                         keys[i], ORTE_UNDEF, NULL))) {            ORTE_ERROR_LOG(rc);            free(segment);            free(sub.name);            free(trig.name);            OBJ_RELEASE(values[0]);            return rc;        }    }        sub.cbfunc = cbfunc;        trigs = &trig;         /* do the subscription */    if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {        ORTE_ERROR_LOG(rc);    }    free(segment);    free(sub.name);    free(trig.name);    OBJ_RELEASE(values[0]);        return rc;}/** * Setup io for the current node, then tell orterun we are ready for the actual * processes. * @retval ORTE_SUCCESS * @retval error */intorte_odls_bproc_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ){    odls_bproc_child_t *child;    opal_list_item_t* item;    orte_gpr_value_t *value, **values;    orte_gpr_keyval_t *kval;    char *node_name;    int rc;    orte_std_cntr_t i, j, kv, kv2, *sptr;    int src = 0;    orte_buffer_t *ack;    bool connect_stdin;    orte_jobid_t jobid;    int cycle = 0;    /* first, retrieve the job number we are to launch from the     * returned data - we can extract the jobid directly from the     * subscription name we created     */    if (ORTE_SUCCESS != (rc = orte_schema.extract_jobid_from_std_trigger_name(&jobid, data->target))) {        ORTE_ERROR_LOG(rc);        return rc;    }    /**     * hack for bproc4, change process group so that we do not receive signals     * from the parent/front-end process, as bproc4 does not currently allow the     * process to intercept the signal    */    setpgid(0,0);    /* loop through the returned data to find the global info and     * the info for processes going onto this node     */    values = (orte_gpr_value_t**)(data->values)->addr;    for (j=0, i=0; i < data->cnt && j < (data->values)->size; j++) {  /* loop through all returned values */        if (NULL != values[j]) {            i++;            value = values[j];            /* this must have come from one of the process containers, so it must            * contain data for a proc structure - see if it belongs to this node            */            for (kv=0; kv < value->cnt; kv++) {                kval = value->keyvals[kv];                if (strcmp(kval->key, ORTE_NODE_NAME_KEY) == 0) {                    /* Most C-compilers will bark if we try to directly compare the string in the                    * kval data area against a regular string, so we need to "get" the data                    * so we can access it */                    if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&node_name, kval->value, ORTE_STRING))) {                        ORTE_ERROR_LOG(rc);                        return rc;                    }                    /* if this is our node...must also protect against a zero-length string  */                    if (NULL != node_name && 0 == strcmp(node_name, orte_system_info.nodename)) {                        /* ...harvest the info into a new child structure */                        child = OBJ_NEW(odls_bproc_child_t);                        for (kv2 = 0; kv2 < value->cnt; kv2++) {                            kval = value->keyvals[kv2];                            if(strcmp(kval->key, ORTE_PROC_NAME_KEY) == 0) {                                /* copy the name into the child object */                                if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(child->name), kval->value->data, ORTE_NAME))) {                                    ORTE_ERROR_LOG(rc);                                    return rc;                                }                                continue;                            }                            if(strcmp(kval->key, ORTE_PROC_APP_CONTEXT_KEY) == 0) {                                if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, kval->value, ORTE_STD_CNTR))) {                                    ORTE_ERROR_LOG(rc);                                    return rc;                                }                                child->app_idx = *sptr;  /* save the index into the app_context objects */                                continue;                            }                        } /* kv2 */                        /* protect operation on the global list of children */                        OPAL_THREAD_LOCK(&mca_odls_bproc_component.mutex);                        opal_list_append(&mca_odls_bproc_component.children, &child->super);                        opal_condition_signal(&mca_odls_bproc_component.cond);                        OPAL_THREAD_UNLOCK(&mca_odls_bproc_component.mutex);                    }                }            } /* for kv */        } /* for j */    }    /* set up the io files for our children */    for(item =  opal_list_get_first(&mca_odls_bproc_component.children);        item != opal_list_get_end(&mca_odls_bproc_component.children);        item =  opal_list_get_next(item)) {        child = (odls_bproc_child_t *) item;        if(0 < mca_odls_bproc_component.debug) {            opal_output(0, "orte_odls_bproc_launch: setting up io for "                            "[%lu,%lu,%lu] proc rank %lu\n",                            ORTE_NAME_ARGS((child->name)),                            child->name->vpid);        }        /* only setup to forward stdin if it is rank 0, otherwise connect            * to /dev/null */        if(0 == child->name->vpid) {            connect_stdin = true;        } else {            connect_stdin = false;        }        rc = odls_bproc_setup_stdio(child->name, cycle,                                    jobid, child->app_idx,                                    connect_stdin);        if (ORTE_SUCCESS != rc) {            ORTE_ERROR_LOG(rc);            goto cleanup;        }        cycle++;    }    /* message to indicate that we are ready */    ack = OBJ_NEW(orte_buffer_t);    rc = orte_dss.pack(ack, &src, 1, ORTE_INT);    if(ORTE_SUCCESS != rc) {        ORTE_ERROR_LOG(rc);    }    rc = mca_oob_send_packed_nb(ORTE_PROC_MY_HNP, ack, ORTE_RML_TAG_BPROC, 0,        odls_bproc_send_cb, NULL);    if (0 > rc) {        ORTE_ERROR_LOG(rc);        goto cleanup;    }    rc = ORTE_SUCCESS;cleanup:    return rc;}/** * Function to terminate a job. Since this component only runs on remote nodes * and doesn't actually launch any processes, this function is not needed * so is a noop. */int orte_odls_bproc_kill_local_procs(orte_jobid_t job, bool set_state){    orte_iof.iof_flush();    return ORTE_SUCCESS;}/** * Function to signal a process. Since this component only runs on remote nodes * and doesn't actually launch any processes, this function is not needed * so is a noop. * @param proc the process's name * @param signal The signal to send * @retval ORTE_SUCCESS */int orte_odls_bproc_signal_local_procs(const orte_process_name_t* proc, int32_t signal){    orte_iof.iof_flush();    return ORTE_SUCCESS;}/** * Finalizes the bproc module. Cleanup tmp directory/files * used for I/O forwarding. * @retval ORTE_SUCCESS */int orte_odls_bproc_finalize(void){    orte_iof.iof_flush();    odls_bproc_remove_dir();    orte_session_dir_finalize(orte_process_info.my_name);    return ORTE_SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -