📄 odls_bproc.c
字号:
} if (tcgetattr(aslave, &term_attrs) < 0) { rc = ORTE_ERROR; ORTE_ERROR_LOG(rc); goto cleanup; } term_attrs.c_lflag &= ~ (ECHO | ECHOE | ECHOK | ECHOCTL | ECHOKE | ECHONL); term_attrs.c_iflag &= ~ (ICRNL | INLCR | ISTRIP | INPCK | IXON); term_attrs.c_oflag &= ~ (OCRNL | ONLCR); if (tcsetattr(aslave, TCSANOW, &term_attrs) == -1) { rc = ORTE_ERROR; ORTE_ERROR_LOG(rc); goto cleanup; } orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE, ORTE_IOF_STDOUT, amaster); goto stderr_fifo_setup;stdout_fifo_setup:#endif if (0 != mkfifo(fd_link_path, S_IRWXU)) { perror("odls_bproc mkfifo failed"); rc = ORTE_ERROR; goto cleanup; } fd = open(fd_link_path, O_RDWR); if (-1 == fd) { perror("odls_bproc open failed"); rc = ORTE_ERROR; goto cleanup; } orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE, ORTE_IOF_STDOUT, fd);#if defined(HAVE_OPENPTY) && (OMPI_ENABLE_PTY_SUPPORT != 0)stderr_fifo_setup:#endif free(fd_link_path); fd_link_path = NULL; /* setup the stderr FIFO. Always a fifo */ fd_link_path = opal_os_path( false, path_prefix, "2", NULL ); if (NULL == fd_link_path) { rc = ORTE_ERROR; ORTE_ERROR_LOG(rc); goto cleanup; } if (0 != mkfifo(fd_link_path, S_IRWXU)) { perror("odls_bproc mkfifo failed"); rc = ORTE_ERROR; goto cleanup; } fd = open(fd_link_path, O_RDWR); if (-1 == fd) { perror("odls_bproc open failed"); rc = ORTE_ERROR; goto cleanup; } orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE, ORTE_IOF_STDERR, fd);cleanup: if (NULL != path_prefix) { free(path_prefix); } if (NULL != fd_link_path) { free(fd_link_path); } return rc;}/* this entire function gets called within a GPR compound command, * so the subscription actually doesn't get done until the orted * executes the compound command */int orte_odls_bproc_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc){ char *segment; orte_gpr_value_t *values[1]; orte_gpr_subscription_t *subs, sub=ORTE_GPR_SUBSCRIPTION_EMPTY; orte_gpr_trigger_t *trigs, trig=ORTE_GPR_TRIGGER_EMPTY; char* keys[] = { ORTE_PROC_NAME_KEY, ORTE_PROC_APP_CONTEXT_KEY, ORTE_NODE_NAME_KEY, }; int num_keys = 3; int i, rc; /* get the job segment name */ if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) { ORTE_ERROR_LOG(rc); return rc; } /* attach ourselves to the "standard" orted trigger */ if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&(trig.name), ORTED_LAUNCH_STAGE_GATE_TRIGGER, job))) { ORTE_ERROR_LOG(rc); free(segment); return rc; } /* ask for return of all data required for launching local processes */ subs = ⊂ sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG; if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&(sub.name), ORTED_LAUNCH_STG_SUB, job))) { ORTE_ERROR_LOG(rc); free(segment); free(trig.name); return rc; } sub.cnt = 1; sub.values = values; if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, segment, num_keys, 0))) { ORTE_ERROR_LOG(rc); free(segment); free(sub.name); free(trig.name); return rc; } for (i=0; i < num_keys; i++) { if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[0]->keyvals[i]), keys[i], ORTE_UNDEF, NULL))) { ORTE_ERROR_LOG(rc); free(segment); free(sub.name); free(trig.name); OBJ_RELEASE(values[0]); return rc; } } sub.cbfunc = cbfunc; trigs = &trig; /* do the subscription */ if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { ORTE_ERROR_LOG(rc); } free(segment); free(sub.name); free(trig.name); OBJ_RELEASE(values[0]); return rc;}/** * Setup io for the current node, then tell orterun we are ready for the actual * processes. * @retval ORTE_SUCCESS * @retval error */intorte_odls_bproc_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ){ odls_bproc_child_t *child; opal_list_item_t* item; orte_gpr_value_t *value, **values; orte_gpr_keyval_t *kval; char *node_name; int rc; orte_std_cntr_t i, j, kv, kv2, *sptr; int src = 0; orte_buffer_t *ack; bool connect_stdin; orte_jobid_t jobid; int cycle = 0; /* first, retrieve the job number we are to launch from the * returned data - we can extract the jobid directly from the * subscription name we created */ if (ORTE_SUCCESS != (rc = orte_schema.extract_jobid_from_std_trigger_name(&jobid, data->target))) { ORTE_ERROR_LOG(rc); return rc; } /** * hack for bproc4, change process group so that we do not receive signals * from the parent/front-end process, as bproc4 does not currently allow the * process to intercept the signal */ setpgid(0,0); /* loop through the returned data to find the global info and * the info for processes going onto this node */ values = (orte_gpr_value_t**)(data->values)->addr; for (j=0, i=0; i < data->cnt && j < (data->values)->size; j++) { /* loop through all returned values */ if (NULL != values[j]) { i++; value = values[j]; /* this must have come from one of the process containers, so it must * contain data for a proc structure - see if it belongs to this node */ for (kv=0; kv < value->cnt; kv++) { kval = value->keyvals[kv]; if (strcmp(kval->key, ORTE_NODE_NAME_KEY) == 0) { /* Most C-compilers will bark if we try to directly compare the string in the * kval data area against a regular string, so we need to "get" the data * so we can access it */ if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&node_name, kval->value, ORTE_STRING))) { ORTE_ERROR_LOG(rc); return rc; } /* if this is our node...must also protect against a zero-length string */ if (NULL != node_name && 0 == strcmp(node_name, orte_system_info.nodename)) { /* ...harvest the info into a new child structure */ child = OBJ_NEW(odls_bproc_child_t); for (kv2 = 0; kv2 < value->cnt; kv2++) { kval = value->keyvals[kv2]; if(strcmp(kval->key, ORTE_PROC_NAME_KEY) == 0) { /* copy the name into the child object */ if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(child->name), kval->value->data, ORTE_NAME))) { ORTE_ERROR_LOG(rc); return rc; } continue; } if(strcmp(kval->key, ORTE_PROC_APP_CONTEXT_KEY) == 0) { if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, kval->value, ORTE_STD_CNTR))) { ORTE_ERROR_LOG(rc); return rc; } child->app_idx = *sptr; /* save the index into the app_context objects */ continue; } } /* kv2 */ /* protect operation on the global list of children */ OPAL_THREAD_LOCK(&mca_odls_bproc_component.mutex); opal_list_append(&mca_odls_bproc_component.children, &child->super); opal_condition_signal(&mca_odls_bproc_component.cond); OPAL_THREAD_UNLOCK(&mca_odls_bproc_component.mutex); } } } /* for kv */ } /* for j */ } /* set up the io files for our children */ for(item = opal_list_get_first(&mca_odls_bproc_component.children); item != opal_list_get_end(&mca_odls_bproc_component.children); item = opal_list_get_next(item)) { child = (odls_bproc_child_t *) item; if(0 < mca_odls_bproc_component.debug) { opal_output(0, "orte_odls_bproc_launch: setting up io for " "[%lu,%lu,%lu] proc rank %lu\n", ORTE_NAME_ARGS((child->name)), child->name->vpid); } /* only setup to forward stdin if it is rank 0, otherwise connect * to /dev/null */ if(0 == child->name->vpid) { connect_stdin = true; } else { connect_stdin = false; } rc = odls_bproc_setup_stdio(child->name, cycle, jobid, child->app_idx, connect_stdin); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } cycle++; } /* message to indicate that we are ready */ ack = OBJ_NEW(orte_buffer_t); rc = orte_dss.pack(ack, &src, 1, ORTE_INT); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } rc = mca_oob_send_packed_nb(ORTE_PROC_MY_HNP, ack, ORTE_RML_TAG_BPROC, 0, odls_bproc_send_cb, NULL); if (0 > rc) { ORTE_ERROR_LOG(rc); goto cleanup; } rc = ORTE_SUCCESS;cleanup: return rc;}/** * Function to terminate a job. Since this component only runs on remote nodes * and doesn't actually launch any processes, this function is not needed * so is a noop. */int orte_odls_bproc_kill_local_procs(orte_jobid_t job, bool set_state){ orte_iof.iof_flush(); return ORTE_SUCCESS;}/** * Function to signal a process. Since this component only runs on remote nodes * and doesn't actually launch any processes, this function is not needed * so is a noop. * @param proc the process's name * @param signal The signal to send * @retval ORTE_SUCCESS */int orte_odls_bproc_signal_local_procs(const orte_process_name_t* proc, int32_t signal){ orte_iof.iof_flush(); return ORTE_SUCCESS;}/** * Finalizes the bproc module. Cleanup tmp directory/files * used for I/O forwarding. * @retval ORTE_SUCCESS */int orte_odls_bproc_finalize(void){ orte_iof.iof_flush(); odls_bproc_remove_dir(); orte_session_dir_finalize(orte_process_info.my_name); return ORTE_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -