📄 pls_bproc.c
字号:
ORTE_ERROR_LOG(rc); orte_pls_bproc_terminate_job(map->job, &orte_abort_timeout, NULL); goto cleanup; } } } /* indicate that the daemons have now launched */ daemons_launched = true; if (orte_pls_base.timing) { if (0 != gettimeofday(&launchstop, NULL)) { opal_output(0, "pls_bproc: could not obtain stop time"); } else { opal_output(0, "pls_bproc: total job launch time is %ld usec", (launchstop.tv_sec - joblaunchstart.tv_sec)*1000000 + (launchstop.tv_usec - joblaunchstart.tv_usec)); } } cleanup: if(NULL != argv) { opal_argv_free(argv); } if(NULL != pids) { free(pids); } if(NULL != orted_path) { free(orted_path); } while (NULL != (item = opal_list_remove_first(&daemons))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&daemons); return rc;}static intorte_pls_bproc_node_failed(orte_gpr_notify_message_t *msg) { orte_jobid_t job; /* respond to a node failure reported by the smr. We know that * this function will only be called when one or more nodes in * our allocation fails, so we just need to respond to it. The * complication is that the failure could occur in any of several * states: * (a) before we start to launch the daemons * (b) while we are launching the daemons * (c) after the daemons are launched, while we are launching the app * (d) during app launch * (e) after app launch, but before completion * (f) while the app is finalizing * (g) while we are cleaning up after the app has finalized */ printf("mpirun has detected a dead node within the job and is terminating\n"); /* extract the jobid from the returned data */ orte_schema.extract_jobid_from_std_trigger_name(&job, msg->target); /* terminate all jobs in the in the job family */ orte_pls_bproc_terminate_job(job, &orte_abort_timeout, NULL); /* kill the daemons */ orte_pls_bproc_terminate_job(0, &orte_abort_timeout, NULL); /* shouldn't ever get here.. */ exit(1); }/** * Launches the application processes * @param cellid the cellid of the job * @param jobid the jobid of the job * @param map a pointer to the mapping of this application * @param num_processes the number of processes in this job * @param vpid_start the starting vpid for this app context * @param global_vpid_start the starting vpid for the user's processes * @param app_context the application context number * @param node_array the node array for this context * @param node_array_len the length of the node array * @retval ORTE_SUCCESS * @retval error */static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots, orte_vpid_t vpid_start, int app_context) { int *node_array, num_nodes, cycle; int rc, i, j, stride; orte_std_cntr_t num_processes; int *pids = NULL; char *var, *param; orte_process_name_t * proc_name; struct bproc_io_t bproc_io[3]; char **env; int dbg; OPAL_TRACE(1); /* point to the env array for this app_context */ env = opal_argv_copy(map->apps[app_context]->env); /* set up app context */ asprintf(¶m, "%d", app_context); var = mca_base_param_environ_variable("pls", "bproc", "app_context"); opal_setenv(var, param, true, &env); free(param); free(var); /* set the app_context number into the environment for the attributes */ var = mca_base_param_environ_variable("orte","app","num"); asprintf(¶m, "%ld", (long)app_context); opal_setenv(var, param, true, &env); free(param); free(var); /* set the vpid-to-vpid stride based on the mapping mode */ if (bynode) { /* we are mapping by node, so we want to set the stride * length (i.e., the step size between vpids that is used * to compute the process name) to 1 */ stride = 1; } else { /* we are mapping by slot, so we want to set the stride * length (i.e., the step size between vpids that is used * to compute the process name) to the number of slots */ stride = num_slots; } /* and push that value into the process' environment */ asprintf(¶m, "%ld", (long)stride); var = mca_base_param_environ_variable("pls", "bproc", "stride"); opal_setenv(var, param, true, &env); free(param); free(var); /* set up the node_array to handle the launch */ node_array = (int*)malloc(map->num_nodes * sizeof(int)); if (NULL == node_array) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } /* initialize the cycle count. Computing the process name under Bproc * is a complex matter when mapping by slot as Bproc's inherent * methodology is to do everything by node. When mapping by slot, the * first num_slots number of launch cycles all have a vpid_start that * will differ by one - i.e., the processes on a given node will have * vpids that differ by only one. * * However, when we oversubscribe, we enter into a cyclic arrangement. * During each cycle, the above description of how names are assigned * is accurate. However, each cycle (i.e., each collection of num_nodes * processes that we launch) will have a vpid start that is offset by * num_slots * num_nodes. We have to compensate for that here when we * calculate and pass the vpid_start param so that the processes can * correctly compute their name */ cycle = 1; /* launch the processes */ i = 1; num_processes = map->vpid_range; rc = orte_pls_bproc_node_list(map, node_array, &num_nodes, i); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } opal_output_verbose(1, orte_pls_base.pls_output, "launching app %s", map->apps[app_context]->app); while(0 != num_nodes) { if (0 < mca_pls_bproc_component.debug) { opal_output_verbose(1, orte_pls_base.pls_output, "\tlaunching cycle %d", i); for (dbg=0; dbg<num_nodes; dbg++) { opal_output_verbose(1, orte_pls_base.pls_output, "\t\tlaunching on node %d", node_array[dbg]); } } /* setup environment so the procs can figure out their names */ rc = orte_ns_nds_bproc_put(ORTE_PROC_MY_NAME->cellid, map->job, vpid_start, map->vpid_start, num_processes, &env); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } rc = orte_pls_bproc_setup_io(map->job, bproc_io, i - 1, app_context); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } if(0 < mca_pls_bproc_component.debug) { opal_output(0, "pls_bproc: launching %d processes:", num_nodes); } /* allocate space for bproc to return the pids */ pids = (int*)malloc(num_nodes * sizeof(int)); if (NULL == pids) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); rc = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } if (mca_pls_bproc_component.do_not_launch) { for (j=0; j < num_nodes; j++) pids[j] = j+1; rc = num_nodes; } else { rc = bproc_vexecmove_io(num_nodes, node_array, pids, bproc_io, 3, map->apps[app_context]->app, map->apps[app_context]->argv, env); } if(0 < mca_pls_bproc_component.debug) { opal_output(0, "pls_bproc: %d processes launched. First pid: %d", rc, *pids); } if(rc != num_nodes) { opal_show_help("help-pls-bproc.txt", "proc-launch-number", true, num_nodes, rc, map->apps[app_context]->app); rc = ORTE_ERROR; goto cleanup; } for(j = 0; j < num_nodes; j++) { if(0 >= pids[j]) { opal_show_help("help-pls-bproc.txt", "proc-launch-bad-pid", true, node_array[j], pids[j], errno, map->apps[app_context]->app); rc = ORTE_ERROR; ORTE_ERROR_LOG(rc); goto cleanup; } else { rc = orte_ns.create_process_name(&proc_name, ORTE_PROC_MY_NAME->cellid, map->job, vpid_start + j*stride); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } orte_pls_bproc_set_proc_pid(proc_name, pids[j], node_array[j]); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } if (!mca_pls_bproc_component.do_not_launch) { rc = orte_wait_cb(pids[j], orte_pls_bproc_waitpid_cb, proc_name); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } } } } free(pids); pids = NULL; i++; if (bynode) { /* we are mapping by node, so the vpid_start must increment by * the number of nodes */ vpid_start += num_nodes; } else { /* we are mapping by slot. Here is where we need to check our * cyclic condition - if we are at the end of a cycle, then * we need to increment the vpid_start by num_slots*num_nodes. * Otherwise, we just increment it by one. */ if (cycle == num_slots) { /* end of cycle condition */ vpid_start += num_slots * num_nodes - 1; cycle = 1; } else { vpid_start += 1; cycle++; } } rc = orte_pls_bproc_node_list(map, node_array, &num_nodes, i); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } }cleanup: if(NULL != pids) { free(pids); } free(node_array); if (NULL != env) opal_argv_free(env); return rc;}/** * The main bproc launcher. See pls_bproc.h for a high level overview of how * the bproc launching works. * Here we: * -# Launch the deamons on the backend nodes. * -# The daemons setup files for io forwarding then connect back to us to * tells us they are ready for the actual apps. * -# Launch the apps on the backend nodes * * @param jobid the jobid of the job to launch * @retval ORTE_SUCCESS * @retval error */int orte_pls_bproc_launch(orte_jobid_t jobid) { orte_job_map_t* map; orte_mapped_node_t *map_node; orte_vpid_t vpid_launch; int rc; int num_slots; int context; int i; char cwd_save[OMPI_PATH_MAX + 1]; orte_ras_node_t *ras_node; char **daemon_env; opal_list_t nodelist; OPAL_TRACE(1); /* make sure the pls_bproc receive function has been started */ if (ORTE_SUCCESS != (rc = orte_pls_bproc_comm_start())) { ORTE_ERROR_LOG(rc); return rc; } /* save the current working directory */ if (NULL == getcwd(cwd_save, sizeof(cwd_save))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); return ORTE_ERR_NOT_FOUND; } cwd_save[sizeof(cwd_save) - 1] = '\0'; /* get the job map */ if(ORTE_SUCCESS != (rc = orte_rmaps.get_job_map(&map, jobid))) { ORTE_ERROR_LOG(rc); return rc; } /* set the mapping mode */ if (NULL != map->mapping_mode && 0 == strcmp("bynode", map->mapping_mode)) { bynode = true; } else { bynode = false; } /* check all of the app_contexts for sanity */ for (i=0; i < map->num_apps; i++) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -