📄 pls_gridengine_module.c
字号:
exit(-1); /* exit instead of return ORTE_ERR_OUT_OF_RESOURCE */ }#endif /* setup node name */ free(argv[node_name_index1]); if (NULL != rmaps_node->username && 0 != strlen (rmaps_node->username)) { asprintf(&argv[node_name_index1], "%s@%s", rmaps_node->username, rmaps_node->nodename); } else { argv[node_name_index1] = strdup(rmaps_node->nodename); } free(argv[node_name_index2]); argv[node_name_index2] = strdup(rmaps_node->nodename); /* initialize daemons process name */ rc = orte_ns.create_process_name(&name, rmaps_node->cell, 0, vpid); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } /* new daemon - setup to record its info */ dmn = OBJ_NEW(orte_pls_daemon_info_t); dmn->active_job = jobid; dmn->cell = rmaps_node->cell; dmn->nodename = strdup(rmaps_node->nodename); if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(dmn->name), name, ORTE_NAME))) { ORTE_ERROR_LOG(rc); goto cleanup; } opal_list_append(&daemons, &dmn->super); #ifdef __WINDOWS__ printf("Unimplemented feature for windows\n"); return ORTE_ERR_NOT_IMPLEMENTED;#else /* fork a child to do qrsh */ pid = fork();#endif if (pid < 0) { rc = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } /* child */ if (pid == 0) { char* name_string; char* var; long fd, fdmax = sysconf(_SC_OPEN_MAX); if (mca_pls_gridengine_component.debug) { opal_output(0, "pls:gridengine: launching on node %s", rmaps_node->nodename); } /* setting exec_argv and exec_path for qrsh */ exec_argv = &argv[0]; sge_root = getenv("SGE_ROOT"); sge_arch = getenv("ARC"); asprintf(&exec_path, "%s/bin/%s/qrsh", sge_root, sge_arch); exec_path = opal_path_findv(exec_path, X_OK, environ, NULL); if (NULL == exec_path) { opal_show_help("help-pls-gridengine.txt", "bad-qrsh-path", true, exec_path, sge_root, sge_arch); return ORTE_ERR_NOT_FOUND; } if (mca_pls_gridengine_component.debug) { opal_output(0, "pls:gridengine: exec_argv[0]=%s, exec_path=%s", exec_argv[0], exec_path); } /* setting orted_path for orted */ orted_path = opal_path_findv(exec_argv[orted_index], 0, environ, NULL); if (NULL == orted_path && NULL == prefix_dir) { rc = orte_pls_gridengine_fill_orted_path(&orted_path); if (ORTE_SUCCESS != rc) { return rc; } } else { if (NULL != prefix_dir) { orted_path = opal_os_path( false, prefix_dir, bin_base, "orted", NULL ); } /* If we yet did not fill up the orted_path, do so now */ if (NULL == orted_path) { rc = orte_pls_gridengine_fill_orted_path(&orted_path); if (ORTE_SUCCESS != rc) { return rc; } } } asprintf(&argv[orted_index], orted_path); if (mca_pls_gridengine_component.debug) { opal_output(0, "pls:gridengine: orted_path=%s", orted_path); } var = getenv("HOME"); if (NULL != var) { if (mca_pls_gridengine_component.debug) { opal_output(0, "pls:gridengine: changing to directory %s", var); } /* Ignore errors -- what are we going to do? (and we ignore errors on the remote nodes in the fork pls, so this is consistent) */ chdir(var); } /* setup process name */ rc = orte_ns.get_proc_name_string(&name_string, name); if (ORTE_SUCCESS != rc) { opal_output(0, "pls:gridengine: unable to create process name"); exit(-1); } free(argv[proc_name_index]); argv[proc_name_index] = strdup(name_string); if (!mca_pls_gridengine_component.debug) { /* setup stdin */ int fd = open("/dev/null", O_RDWR, 0); dup2(fd, 0); close(fd); } /* close all file descriptors w/ exception of stdin/stdout/stderr */ for(fd=3; fd<fdmax; fd++) close(fd); /* Set signal handlers back to the default. Do this close to the execve() because the event library may (and likely will) reset them. If we don't do this, the event library may have left some set that, at least on some OS's, don't get reset via fork() or exec(). Hence, the orted could be unkillable (for example). */ set_handler_default(SIGTERM); set_handler_default(SIGINT);#ifndef __WINDOWS__ set_handler_default(SIGHUP); set_handler_default(SIGPIPE);#endif set_handler_default(SIGCHLD); /* Unblock all signals, for many of the same reasons that we set the default handlers, above. This is noticable on Linux where the event library blocks SIGTERM, but we don't want that blocked by the orted (or, more specifically, we don't want it to be blocked by the orted and then inherited by the ORTE processes that it forks, making them unkillable by SIGTERM). */#ifndef __WINDOWS__ sigprocmask(0, 0, &sigs); sigprocmask(SIG_UNBLOCK, &sigs, 0);#endif /* exec the daemon */ if (mca_pls_gridengine_component.debug) { param = opal_argv_join(exec_argv, ' '); if (NULL != param) { opal_output(0, "pls:gridengine: executing: %s", param); free(param); } } execve(exec_path, exec_argv, env); opal_output(0, "pls:gridengine: execve failed with errno=%d\n", errno); exit(-1); } else { /* parent */ if (mca_pls_gridengine_component.debug) { opal_output(0, "pls:gridengine: parent"); } /* setup callback on sigchild - wait until setup above is complete * as the callback can occur in the call to orte_wait_cb */ orte_wait_cb(pid, orte_pls_gridengine_wait_daemon, dmn); vpid++; } free(name); } /* all done, so store the daemon info on the registry */ if (ORTE_SUCCESS != (rc = orte_pls_base_store_active_daemons(&daemons))) { ORTE_ERROR_LOG(rc); } cleanup: OBJ_RELEASE(map); if (NULL != lib_base) { free(lib_base); } if (NULL != bin_base) { free(bin_base); } free(jobid_string); /* done with this variable */ opal_argv_free(argv); opal_argv_free(env); return rc;}#if 0/** * Query the registry for the gridengine slot count, and update it */static int update_slot_keyval(orte_ras_node_t* ras_node, int* slot_cnt){ int rc, *iptr, ivalue; orte_std_cntr_t num_tokens, i, get_cnt; orte_gpr_value_t** get_values; char **tokens; char *get_keys[] = {"orte-gridengine-slot-cnt", NULL}; orte_gpr_keyval_t *condition; /* get token */ if (ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&tokens, &num_tokens, ras_node->node_cellid, ras_node->node_name))) { ORTE_ERROR_LOG(rc); return rc; } /* setup condition/filter for query - return only processes that * are assigned to the specified node name */ if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&condition, ORTE_NODE_NAME_KEY, ORTE_STRING, (void*)ras_node->node_name))) { ORTE_ERROR_LOG(rc); return rc; } rc = orte_gpr.get_conditional( ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR, ORTE_NODE_SEGMENT, tokens, get_keys, 1, &condition, &get_cnt, &get_values); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return rc; } /* parse the response */ for(i=0; i<get_cnt; i++) { orte_gpr_value_t* value = get_values[i]; orte_std_cntr_t k; /* looking in each GPR container for the keyval */ for(k=0; k < value->cnt; k++) { orte_gpr_keyval_t* keyval = value->keyvals[k]; orte_data_value_t *put_value; if(strcmp(keyval->key, "orte-gridengine-slot-cnt") == 0) { if (ORTE_SUCCESS != (rc = orte_dss.get( (void**)&iptr, keyval->value, ORTE_INT))) { ORTE_ERROR_LOG(rc); continue; } *slot_cnt = *iptr; free(iptr); if (mca_pls_gridengine_component.debug) { opal_output(0, "pls:gridengine: %s: registry shows PE slots=%d", ras_node->node_name, *slot_cnt); } (*slot_cnt)--; /* account for the current launch */ if (mca_pls_gridengine_component.debug) { opal_output(0,"pls:gridengine: %s: decrementing, PE slots=%d", ras_node->node_name, *slot_cnt); } put_value = OBJ_NEW(orte_data_value_t); if (NULL == put_value) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } ivalue = *slot_cnt; put_value->type = ORTE_INT; put_value->data = &ivalue; /* put the keyvalue in the segment */ if (ORTE_SUCCESS != (rc = orte_gpr.put_1( ORTE_GPR_OVERWRITE|ORTE_GPR_TOKENS_XAND, ORTE_NODE_SEGMENT, tokens, "orte-gridengine-slot-cnt", put_value ))) { ORTE_ERROR_LOG(rc); } continue; } } } for(i=1; i<get_cnt; i++) OBJ_RELEASE(get_values[i]); if (NULL != get_values) free(get_values); opal_argv_free(tokens); return rc;}#endif/** * Query the registry for all nodes participating in the job */int orte_pls_gridengine_terminate_job(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs){ int rc; opal_list_t daemons; opal_list_item_t *item; /* construct the list of active daemons on this job */ OBJ_CONSTRUCT(&daemons, opal_list_t); if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid, attrs))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } /* order them to kill their local procs for this job */ if (ORTE_SUCCESS != (rc = orte_pls_base_orted_kill_local_procs(&daemons, jobid, timeout))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } CLEANUP: while (NULL != (item = opal_list_remove_first(&daemons))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&daemons); return rc;}int orte_pls_gridengine_terminate_proc(const orte_process_name_t* proc){ return ORTE_ERR_NOT_IMPLEMENTED;}/** * Terminate the orteds for a given job */int orte_pls_gridengine_terminate_orteds(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs){ int rc; opal_list_t daemons; opal_list_item_t *item; /* construct the list of active daemons on this job */ OBJ_CONSTRUCT(&daemons, opal_list_t); if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid, attrs))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } /* now tell them to die! */ if (ORTE_SUCCESS != (rc = orte_pls_base_orted_exit(&daemons, timeout))) { ORTE_ERROR_LOG(rc); } CLEANUP: while (NULL != (item = opal_list_remove_first(&daemons))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&daemons); return rc;}/** * Signal all processes associated with this job */int orte_pls_gridengine_signal_job(orte_jobid_t jobid, int32_t signal, opal_list_t *attrs){ int rc; opal_list_t daemons; opal_list_item_t *item; /* construct the list of active daemons on this job */ OBJ_CONSTRUCT(&daemons, opal_list_t); if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid, attrs))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&daemons); return rc; } /* order them to pass this signal to their local procs */ if (ORTE_SUCCESS != (rc = orte_pls_base_orted_signal_local_procs(&daemons, signal))) { ORTE_ERROR_LOG(rc); } while (NULL != (item = opal_list_remove_first(&daemons))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&daemons); return rc;}/** * Signal a specific process. */int orte_pls_gridengine_signal_proc(const orte_process_name_t* proc, int32_t signal){ return ORTE_ERR_NOT_IMPLEMENTED;}/** * Cancel an operation involving comm to an orted */int orte_pls_gridengine_cancel_operation(void){ int rc; if (ORTE_SUCCESS != (rc = orte_pls_base_orted_cancel_operation())) { ORTE_ERROR_LOG(rc); } return rc;}/** * Finalize */int orte_pls_gridengine_finalize(void){ int rc; /* cleanup any pending recvs */ if (ORTE_SUCCESS != (rc = orte_pls_base_comm_stop())) { ORTE_ERROR_LOG(rc); } return ORTE_SUCCESS;}/** * Set signal handler */static void set_handler_default(int sig){#ifndef __WINDOWS__ struct sigaction act; act.sa_handler = SIG_DFL; act.sa_flags = 0; sigemptyset(&act.sa_mask); sigaction(sig, &act, (struct sigaction *)0);#endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -