📄 pls_bproc.c
字号:
/* Check that the cwd is sane. We have to chdir there in to check the executable, because the executable could have been specified as a relative path to the wdir */ rc = orte_rmgr.check_context_cwd(map->apps[i], true); if (ORTE_SUCCESS != rc) { goto cleanup; } /* Check that the app exists and is executable */ rc = orte_rmgr.check_context_app(map->apps[i]); if (ORTE_SUCCESS != rc) { goto cleanup; } /* Return to the original dir */ if (0 != chdir(cwd_save)) { rc = ORTE_ERR_IN_ERRNO; goto cleanup; } } /* For Bproc, we need to know how many slots were allocated on each * node so the spawned processes can computer their name. Only Bproc * needs to do this, so we choose not to modify the mapped_node struct * to hold this info - bproc can go get it. * * Since Bproc also requires that the slots allocated on each node * be the same, we really only need to lookup a single node. So grab * the data for the first node on the map * * RHC: Unfortunately, the user may have passed these nodes to us * via a hostfile or -host argument. In that case, we cannot trust * that the slots allocated on each node are the same - and we get * erratic behavior if they don't. Until we can verify that Bproc * now supports clusters with differing numbers of slots on each node, * we have to protect the system by erroring out. So - even though this * will slow down the launch on large clusters - we have to get the * allocation and check to ensure that all the slots match */ OBJ_CONSTRUCT(&nodelist, opal_list_t); if (ORTE_SUCCESS != (rc = orte_ras.node_query(&nodelist))) { ORTE_ERROR_LOG(rc); goto cleanup; } if (NULL == (ras_node = (orte_ras_node_t*)opal_list_remove_first(&nodelist))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); rc = ORTE_ERR_NOT_FOUND; goto cleanup; } num_slots = ras_node->node_slots; OBJ_RELEASE(ras_node); while (NULL != (ras_node = (orte_ras_node_t*)opal_list_remove_first(&nodelist))) { if (num_slots != ras_node->node_slots) { /* mismatch - error out */ opal_show_help("help-pls-bproc.txt", "mismatched-slots", true); ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED); rc = ORTE_ERR_NOT_SUPPORTED; goto cleanup; } OBJ_RELEASE(ras_node); } OBJ_DESTRUCT(&nodelist); if(0 < mca_pls_bproc_component.debug) { opal_output(0, "pls_bproc: --- starting to launch procs ---"); } /* save the daemon environment */ daemon_env = opal_argv_copy(map->apps[0]->env); /* for each application context, setup its env */ for(i=0; i < map->num_apps; i++) { orte_pls_bproc_setup_env(&map->apps[i]->env); } /* tell the smr which nodes to monitor so we can be notified when the node's state changes, useful for aborting when a bproc node up and dies */ if (ORTE_SUCCESS != (rc = orte_smr.begin_monitoring(map, orte_pls_bproc_node_failed, NULL))) { ORTE_ERROR_LOG(rc); goto cleanup; } /* launch the daemons on all nodes which have processes assigned to them */ rc = orte_pls_bproc_launch_daemons(map, &daemon_env); opal_argv_free(daemon_env); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } vpid_launch = map->vpid_start; /* for each application context launch the app */ for(context=0; context < map->num_apps; context++) { rc = orte_rmgr.check_context_cwd(map->apps[context], true); if (ORTE_SUCCESS != rc) { goto cleanup; } rc = orte_pls_bproc_launch_app(map, num_slots, vpid_launch, context); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } vpid_launch += map->apps[context]->num_procs; }cleanup: chdir(cwd_save); OBJ_RELEASE(map); if (mca_pls_bproc_component.do_not_launch) { /* indicate that we failed to launch, but do so silently */ return ORTE_ERR_SILENT; } return rc;}/** * Terminate all processes associated with this job */int orte_pls_bproc_terminate_job(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs) { pid_t* pids; orte_std_cntr_t i, num_pids; int rc; OPAL_TRACE(1); if(0 < mca_pls_bproc_component.debug) { opal_output(0, "orte_pls_bproc: terminating job %ld", jobid); } /* kill application process */ if(ORTE_SUCCESS != (rc = orte_pls_bproc_get_proc_pids(jobid, &pids, &num_pids, attrs))) return rc; for(i=0; i<num_pids; i++) { if(mca_pls_bproc_component.debug) { opal_output(0, "orte_pls_bproc: killing proc: %d\n", pids[i]); } kill(pids[i], mca_pls_bproc_component.terminate_sig); } if(NULL != pids) free(pids); /* dont kill daemons - mpirun will do this for us */ return ORTE_SUCCESS;}/*** Terminate the orteds for a given job */int orte_pls_bproc_terminate_orteds(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs){ int rc; opal_list_t daemons; opal_list_item_t *item; OPAL_TRACE(1); /* construct the list of active daemons on this job */ OBJ_CONSTRUCT(&daemons, opal_list_t); if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid, attrs))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } /* now tell them to die! */ if (ORTE_SUCCESS != (rc = orte_pls_base_orted_exit(&daemons, timeout))) { ORTE_ERROR_LOG(rc); }CLEANUP: while (NULL != (item = opal_list_remove_first(&daemons))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&daemons); return rc;}/** * Terminate a specific process. */int orte_pls_bproc_terminate_proc(const orte_process_name_t* proc_name) { int rc; pid_t pid; OPAL_TRACE(1); if(ORTE_SUCCESS != (rc = orte_pls_bproc_get_proc_pid(proc_name, &pid))) return rc; if(kill(pid, mca_pls_bproc_component.terminate_sig) != 0) { switch(errno) { case EINVAL: return ORTE_ERR_BAD_PARAM; case ESRCH: return ORTE_ERR_NOT_FOUND; case EPERM: return ORTE_ERR_PERM; default: return ORTE_ERROR; } } return ORTE_SUCCESS;}/** * Signal all processes associated with this job */int orte_pls_bproc_signal_job(orte_jobid_t jobid, int32_t signal, opal_list_t *attrs) { pid_t* pids; orte_std_cntr_t i, num_pids; int rc; OPAL_TRACE(1); /* signal application process */ if(ORTE_SUCCESS != (rc = orte_pls_bproc_get_proc_pids(jobid, &pids, &num_pids, attrs))) return rc; for(i=0; i<num_pids; i++) { if(mca_pls_bproc_component.debug) { opal_output(0, "orte_pls_bproc: signaling proc: %d\n", pids[i]); } kill(pids[i], (int)signal); } if(NULL != pids) free(pids); /** dont signal daemons - this is strictly for signalling application processes */ return ORTE_SUCCESS;}/** * Signal a specific process. */int orte_pls_bproc_signal_proc(const orte_process_name_t* proc_name, int32_t signal) { int rc; pid_t pid; OPAL_TRACE(1); if(ORTE_SUCCESS != (rc = orte_pls_bproc_get_proc_pid(proc_name, &pid))) return rc; if(kill(pid, (int)signal) != 0) { switch(errno) { case EINVAL: return ORTE_ERR_BAD_PARAM; case ESRCH: return ORTE_ERR_NOT_FOUND; case EPERM: return ORTE_ERR_PERM; default: return ORTE_ERROR; } } return ORTE_SUCCESS;}/** * Cancel an operation involving comm to an orted */int orte_pls_bproc_cancel_operation(void){ int rc; OPAL_TRACE(1); if (ORTE_SUCCESS != (rc = orte_pls_base_orted_cancel_operation())) { ORTE_ERROR_LOG(rc); } return rc;}/** * Module cleanup */int orte_pls_bproc_finalize(void){ return ORTE_SUCCESS;}/* * Handle threading issues. */#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDSstruct orte_pls_bproc_stack_t { opal_condition_t cond; opal_mutex_t mutex; bool complete; orte_jobid_t jobid; int rc;};typedef struct orte_pls_bproc_stack_t orte_pls_bproc_stack_t;static void orte_pls_bproc_stack_construct(orte_pls_bproc_stack_t* stack){ OBJ_CONSTRUCT(&stack->mutex, opal_mutex_t); OBJ_CONSTRUCT(&stack->cond, opal_condition_t); stack->rc = 0; stack->complete = false;}static void orte_pls_bproc_stack_destruct(orte_pls_bproc_stack_t* stack){ OBJ_DESTRUCT(&stack->mutex); OBJ_DESTRUCT(&stack->cond);}static OBJ_CLASS_INSTANCE( orte_pls_bproc_stack_t, opal_object_t, orte_pls_bproc_stack_construct, orte_pls_bproc_stack_destruct);static void orte_pls_bproc_launch_cb(int fd, short event, void* args){ orte_pls_bproc_stack_t *stack = (orte_pls_bproc_stack_t*)args; stack->rc = orte_pls_bproc_launch(stack->jobid); OPAL_THREAD_LOCK(&stack->mutex); stack->complete = true; opal_condition_signal(&stack->cond); OPAL_THREAD_UNLOCK(&stack->mutex);}int orte_pls_bproc_launch_threaded(orte_jobid_t jobid){ struct timeval tv = { 0, 0 }; struct opal_event event; struct orte_pls_bproc_stack_t stack; OBJ_CONSTRUCT(&stack, orte_pls_bproc_stack_t); stack.jobid = jobid; opal_evtimer_set(&event, orte_pls_bproc_launch_cb, &stack); opal_evtimer_add(&event, &tv); OPAL_THREAD_LOCK(&stack.mutex); while(stack.complete == false) opal_condition_wait(&stack.cond, &stack.mutex); OPAL_THREAD_UNLOCK(&stack.mutex); OBJ_DESTRUCT(&stack); return stack.rc;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -