📄 orte_setup_hnp.c
字号:
tf_flag = true; if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[3]), ORTE_RDS_FE_SSH, ORTE_BOOL, &tf_flag))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(value); return rc; } /* Place value in GPR */ rc = orte_gpr.put(1, &value); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(value); return rc; } OBJ_RELEASE(value); free(cellname); can_launch = true; } if (!can_launch || ORTE_CELLID_MAX == cellid) { return ORTE_ERR_UNREACH; } /* get the user's name on the headnode */ if (NULL == username) { uid = strdup(orte_system_info.user); } else { uid = strdup(username); } /* SETUP TO LAUNCH PROBE */ /* setup the conditioned wait and mutex variables */ OBJ_CONSTRUCT(&orte_setup_hnp_mutex, opal_mutex_t); OBJ_CONSTRUCT(&orte_setup_hnp_condition, opal_condition_t); /* get a jobid for the probe */ rc = orte_ns.create_jobid(&jobid, NULL); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return rc; } /* get a vpid for the probe */ rc = orte_ns.reserve_range(jobid, 1, &vpid); if (ORTE_SUCCESS != rc ) { ORTE_ERROR_LOG(rc); return rc; } /* initialize probe's process name... */ rc = orte_ns.create_process_name(&(orte_setup_hnp_cbdata.name), cellid, jobid, vpid); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return rc; } /* ...and get string representation */ rc = orte_ns.get_proc_name_string(&name_string, orte_setup_hnp_cbdata.name); if (ORTE_SUCCESS != rc ) { ORTE_ERROR_LOG(rc); goto CLEANUP; } /* setup callback data on sigchild */ if (NULL != target_cluster) { orte_setup_hnp_cbdata.target_cluster = strdup(target_cluster); } else { orte_setup_hnp_cbdata.target_cluster = NULL; } orte_setup_hnp_cbdata.headnode = strdup(headnode); orte_setup_hnp_cbdata.jobid = jobid; /* get name of probe application - just in case user specified something different */ id = mca_base_param_register_string("orteprobe",NULL,NULL,NULL,"orteprobe"); mca_base_param_lookup_string(id, &orteprobe); /* get rsh/ssh launch mechanism parameters */ id = mca_base_param_register_string("pls","rsh","agent",NULL,"ssh"); mca_base_param_lookup_string(id, ¶m); /* Initialize the argv array */ argv = opal_argv_split(param, ' '); argc = opal_argv_count(argv); if (argc <= 0) { ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); rc = ORTE_ERR_BAD_PARAM; goto CLEANUP; } free(param); /* setup the path */ path = opal_path_findv(argv[0], 0, environ, NULL); /* add the username and nodename */ opal_argv_append(&argc, &argv, "-l"); opal_argv_append(&argc, &argv, uid); opal_argv_append(&argc, &argv, hn); /* add the probe application */ opal_argv_append(&argc, &argv, orteprobe); /* tell the probe it's name */ opal_argv_append(&argc, &argv, "--name"); opal_argv_append(&argc, &argv, name_string); /* setup probe's ns contact info */ opal_argv_append(&argc, &argv, "--nsreplica"); if(NULL != orte_process_info.ns_replica_uri) { uri = strdup(orte_process_info.ns_replica_uri); } else { uri = orte_rml.get_uri(); } asprintf(¶m, "\"%s\"", uri); opal_argv_append(&argc, &argv, param); free(param); free(uri); /* setup probe's gpr contact info */ opal_argv_append(&argc, &argv, "--gprreplica"); if(NULL != orte_process_info.gpr_replica_uri) { uri = strdup(orte_process_info.gpr_replica_uri); } else { uri = orte_rml.get_uri(); } asprintf(¶m, "\"%s\"", uri); opal_argv_append(&argc, &argv, param); free(param); free(uri); /* tell the probe who to report to */ uri = orte_rml.get_uri(); asprintf(¶m, "\"%s\"", uri); opal_argv_append(&argc, &argv, "--requestor"); opal_argv_append(&argc, &argv, param); free(param); free(uri); /* pass along any parameters for the head node process * in case one needs to be created */ id = mca_base_param_register_string("scope",NULL,NULL,NULL,"public"); mca_base_param_lookup_string(id, ¶m); opal_argv_append(&argc, &argv, "--scope"); opal_argv_append(&argc, &argv, param); free(param); id = mca_base_param_register_int("persistent",NULL,NULL,NULL,(int)false); mca_base_param_lookup_int(id, &intparam); if (intparam) { opal_argv_append(&argc, &argv, "--persistent"); } /* issue the non-blocking recv to get the probe's findings */ rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PROBE, 0, orte_setup_hnp_recv, NULL); if(rc < 0) { ORTE_ERROR_LOG(rc); goto CLEANUP; }#ifndef __WINDOWS__ /* fork a child to exec the rsh/ssh session */ orte_setup_hnp_rc = ORTE_SUCCESS; pid = fork(); if (pid < 0) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); rc = ORTE_ERR_OUT_OF_RESOURCE; goto CLEANUP; } if (pid == 0) { /* child */ /* exec the probe launch */ execv(path, argv); ORTE_ERROR_LOG(ORTE_ERROR); opal_output(0, "orte_setup_hnp: execv failed with errno=%d\n", errno); return ORTE_ERROR; } else { /* parent */ orte_wait_cb(pid, orte_setup_hnp_wait, &orte_setup_hnp_cbdata); /* block until a timeout occurs or probe dies/calls back */ gettimeofday(&tv, NULL); ts.tv_sec = tv.tv_sec + 1000000; ts.tv_nsec = 0; OPAL_THREAD_LOCK(&orte_setup_hnp_mutex); opal_condition_timedwait(&orte_setup_hnp_condition, &orte_setup_hnp_mutex, &ts); OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex); if (ORTE_SUCCESS == orte_setup_hnp_rc) { /* Remember if we were infrastructure or not */ id = mca_base_param_find("orte", NULL, "infrastructure"); mca_base_param_lookup_int(id, &intparam); if ( ((int)true) != intparam) { infrastructure = false; } /* need to restart the local system so it can connect to the remote daemon. */ if (ORTE_SUCCESS != (rc = orte_restart(orte_setup_hnp_cbdata.name, orte_setup_hnp_orted_uri))) { /** can't use ORTE_ERROR_LOG here as it may no longer be valid. Since we may * have gotten part way through the shutdown/restart process, we can't have * any idea of our current state - all we can really do at this point is * abort */ fprintf(stderr, "orte_setup_hnp: aborted during restart of local process\n"); } /* * ...and we are now ready to go! */ return ORTE_SUCCESS; } return orte_setup_hnp_rc; }#else ORTE_ERROR_LOG(ORTE_ERROR); opal_output(0, "This function has not been implemented in windows yet, file %s line %d\n", __FILE__, __LINE__); abort();#endifCLEANUP: return rc;}static void orte_setup_hnp_recv(int status, orte_process_name_t* sender, orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata){ orte_std_cntr_t n=1; int rc; OPAL_THREAD_LOCK(&orte_setup_hnp_mutex); if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &orte_setup_hnp_orted_uri, &n, ORTE_STRING))) { ORTE_ERROR_LOG(rc); orte_setup_hnp_rc = rc; opal_condition_signal(&orte_setup_hnp_condition); OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex); return; } orte_setup_hnp_rc = ORTE_SUCCESS; opal_condition_signal(&orte_setup_hnp_condition); OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);}static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata){ orte_setup_hnp_cb_data_t *data; OPAL_THREAD_LOCK(&orte_setup_hnp_mutex); data = (orte_setup_hnp_cb_data_t*)cbdata; /* if ssh exited abnormally, print something useful to the user and cleanup * the registry entries for the HNP jobid. This should somehow be pushed up to the calling level, but we don't really have a way to do that just yet. */ if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) { /* tell the user something went wrong */ opal_output(0, "ERROR: The probe on head node %s of the %s cluster failed to start as expected.", data->headnode, data->target_cluster); opal_output(0, "ERROR: There may be more information available from"); opal_output(0, "ERROR: the remote shell (see above)."); if (WIFEXITED(status)) { opal_output(0, "ERROR: The probe exited unexpectedly with status %d.", WEXITSTATUS(status)); } else if (WIFSIGNALED(status)) {#ifdef WCOREDUMP if (WCOREDUMP(status)) { opal_output(0, "The probe received a signal %d (with core).", WTERMSIG(status)); } else { opal_output(0, "The probe received a signal %d.", WTERMSIG(status)); }#else opal_output(0, "The probe received a signal %d.", WTERMSIG(status));#endif /* WCOREDUMP */ } else { opal_output(0, "No extra status information is available: %d.", status); } } opal_condition_signal(&orte_setup_hnp_condition); OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -