📄 pls_poe_module.c
字号:
} OBJ_DESTRUCT(&mapping_list); if (mca_pls_poe_component.verbose > 10) opal_output(0, "%s: --- END rc(%d) ---\n", __FUNCTION__, rc); return rc;}#endif/**poe_wait_job - call back when POE finish@param pid pid@param status status@param cbdata call back data@return error number*/static void poe_wait_job(pid_t pid, int status, void* cbdata){ orte_job_map_t *map; opal_list_item_t *item, *item2; int rc; /* query allocation for the job */ rc = orte_rmaps.get_job_map(&map, mca_pls_poe_component.jobid); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } for(item = opal_list_get_first(&map->nodes); item != opal_list_get_end(&map->nodes); item = opal_list_get_next(item)) { orte_mapped_node_t* node = (orte_mapped_node_t*) item; for (item2 = opal_list_get_first(&node->procs); item2 != opal_list_get_end(&node->procs); item2 = opal_list_get_next(item2)) { orte_mapped_proc_t* proc = (orte_mapped_proc_t*)item2; orte_session_dir_finalize(&(proc->name)); rc = orte_smr.set_proc_state(&(proc->name), ORTE_PROC_STATE_ABORTED, status); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } } }}/**poe_create_cmd_file - create POE command file@param cfp command file pointer [IN]@param context context [IN]@param proc proc [IN]@param vpid_start vpid start [IN]@param vpid_range vpid range [IN]@return error number*/static int poe_create_cmd_file( FILE *cfp, orte_app_context_t* context, orte_mapped_proc_t* proc, orte_vpid_t vpid_start, orte_vpid_t vpid_range){ int i; char* param; char* uri; char **environ_copy; /* setup base environment */ environ_copy = NULL; param = mca_base_param_environ_variable("rmgr","bootproxy","jobid"); opal_unsetenv(param, &environ_copy); /* setup universe info */ if(NULL != orte_universe_info.name) { param = mca_base_param_environ_variable("universe", NULL, NULL); asprintf(&uri, "%s@%s:%s", orte_universe_info.uid, orte_universe_info.host, orte_universe_info.name); opal_setenv(param, uri, true, &environ_copy); free(param); free(uri); } /* setup ns contact info */ if(NULL != orte_process_info.ns_replica_uri) { uri = strdup(orte_process_info.ns_replica_uri); } else { uri = orte_rml.get_uri(); } param = mca_base_param_environ_variable("ns","replica","uri"); opal_setenv(param, uri, true, &environ_copy); free(param); free(uri); /* setup gpr contact info */ if(NULL != orte_process_info.gpr_replica_uri) { uri = strdup(orte_process_info.gpr_replica_uri); } else { uri = orte_rml.get_uri(); } param = mca_base_param_environ_variable("gpr","replica","uri"); opal_setenv(param, uri, true, &environ_copy); free(param); free(uri); /* push name into environment */ orte_ns_nds_env_put(&proc->name, vpid_start, vpid_range, &environ_copy); if (context->argv == NULL) { context->argv = malloc(sizeof(char*)*2); context->argv[0] = strdup(context->app); context->argv[1] = NULL; } i=0; fprintf(cfp,"%s",mca_pls_poe_component.env); while(environ_copy[i]!=NULL) { fprintf(cfp," %s",environ_copy[i++]); } opal_argv_free(environ_copy); fprintf(cfp," %s",context->app); i=1; while(context->argv[i]!=NULL) { fprintf(cfp," %s",context->argv[i++]); } /* POE will upset if the file doesn't contain end of line. */ fprintf(cfp,"\n"); return ORTE_SUCCESS;}/**poe_launch_interactive - launch an interactive job@param jobid JOB Identifier [IN]@return error number*/static inline int poe_launch_interactive_job(orte_jobid_t jobid){ orte_job_map_t *map; opal_list_item_t *item, *item2; orte_vpid_t vpid_start, vpid_range; orte_std_cntr_t num_nodes, num_procs; FILE *hfp, *cfp; char** argv; int argc; int rc, pid; sigset_t sigs; if( (NULL==(mca_pls_poe_component.cmdfile=tempnam(NULL,NULL))) || (NULL==(cfp=fopen(mca_pls_poe_component.cmdfile,"w"))) ) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } mca_pls_poe_component.jobid = jobid; /* get the map for this job */ rc = orte_rmaps.get_job_map(&map, jobid); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } num_nodes = opal_list_get_size(&map->nodes); if(!strncmp(mca_pls_poe_component.resource_allocation,"hostfile",8)) { /* Create a temporary hostlist file if user specify */ if( (NULL==(mca_pls_poe_component.hostfile=tempnam(NULL,NULL))) || (NULL==(hfp=fopen(mca_pls_poe_component.hostfile,"w"))) ) { return ORTE_ERR_OUT_OF_RESOURCE; } for(item = opal_list_get_first(&map->nodes); item != opal_list_get_end(&map->nodes); item = opal_list_get_next(item)) { orte_mapped_node_t* node = (orte_mapped_node_t*)item; fprintf(hfp,"%s\n",node->nodename); } fclose(hfp); } rc = orte_rmgr.get_vpid_range(jobid, &vpid_start, &vpid_range); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } /* Create a temporary POE command file */ num_procs = 0; for(item = opal_list_get_first(&map->nodes); item != opal_list_get_end(&map->nodes); item = opal_list_get_next(item)) { orte_mapped_node_t* node = (orte_mapped_node_t*)item; for (item2 = opal_list_get_first(&node->procs); item2 != opal_list_get_end(&node->procs); item2 = opal_list_get_next(item2)) { orte_mapped_proc_t* proc = (orte_mapped_proc_t*)item2; rc = poe_create_cmd_file(cfp, map->apps[proc->app_idx], proc, vpid_start, vpid_range); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } num_procs++; } } fclose(cfp); /* Generate POE command line */ argv = opal_argv_copy(mca_pls_poe_component.argv); argc = mca_pls_poe_component.argc; if(!strncmp(mca_pls_poe_component.resource_allocation,"hostfile",8)) { opal_argv_append(&argc, &argv, "-hostfile"); opal_argv_append(&argc, &argv, mca_pls_poe_component.hostfile); opal_argv_append(&argc, &argv, "-resd"); opal_argv_append(&argc, &argv, "no"); rc=poe_argv_append_int(&argc, &argv, num_nodes, 1, "-nodes"); if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; } } opal_argv_append(&argc, &argv, "-pgmmodel"); opal_argv_append(&argc, &argv, "mpmd"); opal_argv_append(&argc, &argv, "-cmdfile"); opal_argv_append(&argc, &argv, mca_pls_poe_component.cmdfile); opal_argv_append(&argc, &argv, "-labelio"); opal_argv_append(&argc, &argv, mca_pls_poe_component.mp_labelio); opal_argv_append(&argc, &argv, "-stdoutmode"); opal_argv_append(&argc, &argv, mca_pls_poe_component.mp_stdoutmode); rc=poe_argv_append_int(&argc, &argv, num_procs, 1, "-procs"); if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; } rc=poe_argv_append_int(&argc, &argv, mca_pls_poe_component.mp_retry, 0, "-retry"); if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; } rc=poe_argv_append_int(&argc, &argv, mca_pls_poe_component.mp_retrycount, 0, "-retrycount"); if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; } rc=poe_argv_append_int(&argc, &argv, mca_pls_poe_component.mp_infolevel, 0, "-infolevel"); if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; } if(mca_pls_poe_component.verbose>10) { opal_output(0, "POE cmdline %s\n", opal_argv_join(argv, ' ')); } /* Start job with POE */ pid = fork(); if(pid < 0) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } if(pid == 0) { poe_set_handler_default(SIGTERM); poe_set_handler_default(SIGINT); poe_set_handler_default(SIGHUP); poe_set_handler_default(SIGCHLD); poe_set_handler_default(SIGPIPE); sigprocmask(0, 0, &sigs); sigprocmask(SIG_UNBLOCK, &sigs, 0); execv(mca_pls_poe_component.path, argv); opal_output(0, "orte_pls_poe: execv failed with errno=%d\n", errno); exit(-1); } else { orte_wait_cb(pid, poe_wait_job, NULL); }cleanup: OBJ_RELEASE(map); return rc;}/**pls_poe_launch - launch a POE job@warning current support interactive class only!.@param jobid JOB Identifier [IN]@return error number*/static int pls_poe_launch_job(orte_jobid_t jobid){ if(0 == strncmp(mca_pls_poe_component.class,"interactive",11)) { return poe_launch_interactive_job(jobid); } return ORTE_ERR_NOT_IMPLEMENTED;}static int pls_poe_terminate_job(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs){ return ORTE_ERR_NOT_IMPLEMENTED;}static int pls_poe_terminate_proc(const orte_process_name_t *name){ return ORTE_ERR_NOT_IMPLEMENTED;}static int pls_poe_terminate_orteds(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs){ return ORTE_ERR_NOT_IMPLEMENTED;}static int pls_poe_signal_job(orte_jobid_t jobid, int32_t signal, opal_list_t *attrs){ return ORTE_ERR_NOT_IMPLEMENTED;}static int pls_poe_signal_proc(const orte_process_name_t *name, int32_t signal){ return ORTE_ERR_NOT_IMPLEMENTED;}/**pls_poe_finalize - clean up temporary files@return error number*/static int pls_poe_finalize(void){ unlink(mca_pls_poe_component.cmdfile); unlink(mca_pls_poe_component.hostfile); return ORTE_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -