📄 orted.c
字号:
int rc; if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted: received launch callback", ORTE_NAME_ARGS(orte_process_info.my_name)); } /* pass the data to the orted_local_launcher and get a report on * success or failure of the launch */ if (ORTE_SUCCESS != (rc = orte_odls.launch_local_procs(data, orted_globals.saved_environ))) { /* if there was an error, report it. * NOTE: it is absolutely imperative that we do not cause the orted to EXIT when * this happens!!! If we do, then the HNP will "hang" as the orted will no longer * be around to receive messages telling it what to do in response to the failure */ ORTE_ERROR_LOG(rc); } /* all done - return and let the orted sleep until something happens */ return;}static void signal_callback(int fd, short flags, void *arg){ OPAL_TRACE(1); orted_globals.exit_condition = true; opal_condition_signal(&orted_globals.condition);}static void orte_daemon_recv_pls(int status, orte_process_name_t* sender, orte_buffer_t *buffer, orte_rml_tag_t tag, void* cbdata){ orte_daemon_cmd_flag_t command; orte_buffer_t answer; int ret; orte_std_cntr_t n; int32_t signal; orte_gpr_notify_data_t *ndat; orte_jobid_t job; OPAL_TRACE(1); OPAL_THREAD_LOCK(&orted_globals.mutex); if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted_recv_pls: received message from [%ld,%ld,%ld]", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(sender)); } /* unpack the command */ n = 1; if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_DAEMON_CMD))) { ORTE_ERROR_LOG(ret); goto CLEANUP; } switch(command) { /**** KILL_LOCAL_PROCS ****/ case ORTE_DAEMON_KILL_LOCAL_PROCS: if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted_recv_pls: received kill_local_procs", ORTE_NAME_ARGS(orte_process_info.my_name)); } /* unpack the jobid - could be JOBID_WILDCARD, which would indicatge * we should kill all local procs. Otherwise, only kill those within * the specified jobid */ n = 1; if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &job, &n, ORTE_JOBID))) { ORTE_ERROR_LOG(ret); goto CLEANUP; } if (ORTE_SUCCESS != (ret = orte_odls.kill_local_procs(job, true))) { ORTE_ERROR_LOG(ret); } break; /**** SIGNAL_LOCAL_PROCS ****/ case ORTE_DAEMON_SIGNAL_LOCAL_PROCS: if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted_recv_pls: received signal_local_procs", ORTE_NAME_ARGS(orte_process_info.my_name)); } /* get the signal */ n = 1; if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &signal, &n, ORTE_INT32))) { ORTE_ERROR_LOG(ret); goto CLEANUP; } /* see if they specified a process to signal, or if we * should just signal them all * * NOTE: FOR NOW, WE JUST SIGNAL ALL CHILDREN */ if (ORTE_SUCCESS != (ret = orte_odls.signal_local_procs(NULL, signal))) { ORTE_ERROR_LOG(ret); } break; /**** ADD_LOCAL_PROCS ****/ case ORTE_DAEMON_ADD_LOCAL_PROCS: if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted_recv_pls: received add_local_procs", ORTE_NAME_ARGS(orte_process_info.my_name)); } /* unpack the notify data object */ n = 1; if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &ndat, &n, ORTE_GPR_NOTIFY_DATA))) { ORTE_ERROR_LOG(ret); goto CLEANUP; } /* launch the processes */ if (ORTE_SUCCESS != (ret = orte_odls.launch_local_procs(ndat, orted_globals.saved_environ))) { ORTE_ERROR_LOG(ret); } /* cleanup the memory */ OBJ_RELEASE(ndat); break; /**** EXIT COMMAND ****/ case ORTE_DAEMON_EXIT_CMD: if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted_recv_pls: received exit", ORTE_NAME_ARGS(orte_process_info.my_name)); } /* no response to send here - we'll send it when nearly exit'd */ orted_globals.exit_condition = true; opal_condition_signal(&orted_globals.condition); OPAL_THREAD_UNLOCK(&orted_globals.mutex); return; break; default: ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); break; } CLEANUP: /* send an ack that command is done */ OBJ_CONSTRUCT(&answer, orte_buffer_t); if (0 > orte_rml.send_buffer(sender, &answer, ORTE_RML_TAG_PLS_ORTED_ACK, 0)) { ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); } OBJ_DESTRUCT(&answer); OPAL_THREAD_UNLOCK(&orted_globals.mutex); /* reissue the non-blocking receive */ ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED, ORTE_RML_NON_PERSISTENT, orte_daemon_recv_pls, NULL); if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) { ORTE_ERROR_LOG(ret); } return;}static void exit_callback(int fd, short event, void *arg){ /* Trigger the normal exit conditions */ orted_globals.exit_condition = true; opal_condition_signal(&orted_globals.condition); OPAL_THREAD_UNLOCK(&orted_globals.mutex);}static void halt_vm(void){ int ret; struct timeval tv = { 1, 0 }; opal_event_t* event; opal_list_t attrs; opal_list_item_t *item; /* terminate the vm - this will also wake us up so we can exit */ OBJ_CONSTRUCT(&attrs, opal_list_t); orte_rmgr.add_attribute(&attrs, ORTE_NS_INCLUDE_DESCENDANTS, ORTE_UNDEF, NULL, ORTE_RMGR_ATTR_OVERRIDE); ret = orte_pls.terminate_orteds(0, &orte_abort_timeout, &attrs); while (NULL != (item = opal_list_remove_first(&attrs))) OBJ_RELEASE(item); OBJ_DESTRUCT(&attrs); /* setup a delay to give the orteds time to complete their departure */ if (NULL != (event = (opal_event_t*)malloc(sizeof(opal_event_t)))) { opal_evtimer_set(event, exit_callback, NULL); opal_evtimer_add(event, &tv); }}static void orte_daemon_recv(int status, orte_process_name_t* sender, orte_buffer_t *buffer, orte_rml_tag_t tag, void* cbdata){ orte_buffer_t *answer; orte_daemon_cmd_flag_t command; int ret; orte_std_cntr_t n; char *contact_info; OPAL_TRACE(1); OPAL_THREAD_LOCK(&orted_globals.mutex); if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted_recv: received message from [%ld,%ld,%ld]", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(sender)); } n = 1; if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_DAEMON_CMD))) { ORTE_ERROR_LOG(ret); OPAL_THREAD_UNLOCK(&orted_globals.mutex); return; } answer = OBJ_NEW(orte_buffer_t); if (NULL == answer) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); goto DONE; } switch(command) { /**** EXIT COMMAND ****/ case ORTE_DAEMON_EXIT_CMD: if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted_recv: received exit", ORTE_NAME_ARGS(orte_process_info.my_name)); } orted_globals.exit_condition = true; opal_condition_signal(&orted_globals.condition); break; /**** HALT VM COMMAND ****/ case ORTE_DAEMON_HALT_VM_CMD: if (orted_globals.debug_daemons) { opal_output(0, "[%lu,%lu,%lu] orted_recv: received halt vm", ORTE_NAME_ARGS(orte_process_info.my_name)); } halt_vm(); break; /**** CONTACT QUERY COMMAND ****/ case ORTE_DAEMON_CONTACT_QUERY_CMD: /* send back contact info */ contact_info = orte_rml.get_uri(); if (NULL == contact_info) { ORTE_ERROR_LOG(ORTE_ERROR); goto CLEANUP; } if (ORTE_SUCCESS != (ret = orte_dss.pack(answer, &contact_info, 1, ORTE_STRING))) { ORTE_ERROR_LOG(ret); goto CLEANUP; } if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) { ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); } break; /**** HOSTFILE COMMAND ****/ case ORTE_DAEMON_HOSTFILE_CMD: ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); break; /**** SCRIPTFILE COMMAND ****/ case ORTE_DAEMON_SCRIPTFILE_CMD: ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); break; /**** HEARTBEAT COMMAND ****/ case ORTE_DAEMON_HEARTBEAT_CMD: ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); break; default: ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); } CLEANUP: OBJ_RELEASE(answer); DONE: OPAL_THREAD_UNLOCK(&orted_globals.mutex); /* reissue the non-blocking receive */ ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL); if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) { ORTE_ERROR_LOG(ret); } return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -