📄 orte_wait.c
字号:
{ if (!cb_enabled) return; while (1) { int status; pid_t ret = internal_waitpid(-1, &status, WNOHANG); pending_pids_item_t *pending; registered_cb_item_t *cb; if (-1 == ret && EINTR == errno) continue; if (ret <= 0) break; cb = find_waiting_cb(ret, false); if (NULL == cb) { pending = OBJ_NEW(pending_pids_item_t); pending->pid = ret; pending->status = status; opal_list_append(&pending_pids, &pending->super); } else { opal_list_remove_item(®istered_cb, &cb->super); OPAL_THREAD_UNLOCK(&mutex); cb->callback(cb->pid, status, cb->data); OPAL_THREAD_LOCK(&mutex); OBJ_RELEASE(cb); } }}static voidtrigger_callback(registered_cb_item_t *cb, pending_pids_item_t *pending){ assert(cb->pid == pending->pid); cb->callback(cb->pid, pending->status, cb->data); opal_list_remove_item(&pending_pids, (opal_list_item_t*) pending); opal_list_remove_item(®istered_cb, (opal_list_item_t*) cb);}static intregister_callback(pid_t pid, orte_wait_fn_t callback, void *data){ registered_cb_item_t *reg_cb; pending_pids_item_t *pending; /* register the callback */ reg_cb = find_waiting_cb(pid, true); if (NULL == reg_cb) return ORTE_ERROR; if (NULL != reg_cb->callback) return ORTE_EXISTS; reg_cb->pid = pid; reg_cb->callback = callback; reg_cb->data = data; /* make sure we shouldn't trigger right now */ pending = find_pending_pid(pid, false); if (NULL != pending) { trigger_callback(reg_cb, pending); } return ORTE_SUCCESS;}static intunregister_callback(pid_t pid){ registered_cb_item_t *reg_cb; /* register the callback */ reg_cb = find_waiting_cb(pid, false); if (NULL == reg_cb) return ORTE_ERR_BAD_PARAM; opal_list_remove_item(®istered_cb, (opal_list_item_t*) reg_cb); return ORTE_SUCCESS;}static pid_tinternal_waitpid(pid_t pid, int *status, int options){#if OMPI_THREADS_HAVE_DIFFERENT_PIDS waitpid_callback_data_t data; struct timeval tv; struct opal_event ev; if (opal_event_progress_thread()) { /* I already am the progress thread. no need to event me */ return waitpid(pid, status, options); } data.done = false; data.pid = pid; data.options = options; OBJ_CONSTRUCT(&(data.mutex), opal_mutex_t); OBJ_CONSTRUCT(&(data.cond), opal_condition_t); OPAL_THREAD_LOCK(&(data.mutex)); tv.tv_sec = 0; tv.tv_usec = 0; opal_evtimer_set(&ev, internal_waitpid_callback, &data); opal_evtimer_add(&ev, &tv); while (data.done == false) { opal_condition_wait(&(data.cond), &(data.mutex)); } OPAL_THREAD_UNLOCK(&(data.mutex)); OBJ_DESTRUCT(&(data.cond)); OBJ_DESTRUCT(&(data.mutex)); *status = data.status; return data.ret; #else return waitpid(pid, status, options);#endif}#if OMPI_THREADS_HAVE_DIFFERENT_PIDSstatic voidinternal_waitpid_callback(int fd, short event, void *arg){ waitpid_callback_data_t *data = (waitpid_callback_data_t*) arg; data->ret = waitpid(data->pid, &(data->status), data->options); data->done = true; opal_condition_signal(&(data->cond));}#endif#elif defined(__WINDOWS__)typedef struct { opal_list_item_t super; pid_t pid; HANDLE registered_handle; orte_wait_fn_t callback; void *data; int status;} opal_process_handle_t;static void opal_process_handle_construct( opal_object_t* obj ){ opal_process_handle_t* handle = (opal_process_handle_t*)obj; handle->registered_handle = INVALID_HANDLE_VALUE; handle->pid = 0; handle->callback = NULL; handle->data = NULL;}static void opal_process_handle_destruct( opal_object_t* obj ){ opal_process_handle_t* handle = (opal_process_handle_t*)obj; if( INVALID_HANDLE_VALUE != handle->registered_handle ) { if( 0 == UnregisterWait( handle->registered_handle ) ) { int error = GetLastError(); } if( 0 == CloseHandle( handle->registered_handle ) ) { int error = GetLastError(); } handle->registered_handle = INVALID_HANDLE_VALUE; }}static OBJ_CLASS_INSTANCE( opal_process_handle_t, opal_list_item_t, opal_process_handle_construct, opal_process_handle_destruct );intorte_wait_init(void){ OBJ_CONSTRUCT(&mutex, opal_mutex_t); OBJ_CONSTRUCT(®istered_cb, opal_list_t); OBJ_CONSTRUCT(&pending_pids, opal_list_t); return ORTE_SUCCESS;}intorte_wait_finalize(void){ opal_list_item_t* item; OPAL_THREAD_LOCK(&mutex); OPAL_THREAD_UNLOCK(&mutex); while (NULL != (item = opal_list_remove_first(&pending_pids))) { OBJ_RELEASE(item); } while (NULL != (item = opal_list_remove_first(®istered_cb))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&mutex); OBJ_DESTRUCT(®istered_cb); OBJ_DESTRUCT(&pending_pids); return ORTE_SUCCESS;}/** * Internal function which find a corresponding process structure * based on the pid. If create is true and the pid does not have a * corresponding process structure, one will be automatically * created and attached to the end of the list. */static opal_process_handle_t* opal_find_pid_in_list(opal_list_t* list, pid_t pid, bool create){ opal_list_item_t *item = NULL; opal_process_handle_t *handle = NULL; for (item = opal_list_get_first(list) ; item != opal_list_get_end(list) ; item = opal_list_get_next(item)) { handle = (opal_process_handle_t*) item; if (handle->pid == pid) { return handle; } } if (create) { handle = OBJ_NEW(opal_process_handle_t); if (NULL == handle) return NULL; handle->pid = pid; handle->callback = NULL; handle->data = NULL; opal_list_append(list, (opal_list_item_t*)handle); return handle; } return NULL;}#define find_pending_pid(PID, CREATE) opal_find_pid_in_list( &pending_pids, (PID), (CREATE) )#define find_pending_cb(PID, CREATE) opal_find_pid_in_list( ®istered_cb, (PID), (CREATE) )pid_torte_waitpid(pid_t wpid, int *status, int options){ opal_process_handle_t* pending; OPAL_THREAD_LOCK(&mutex); /** * Is the child already gone ? */ pending = find_pending_pid( wpid, false ); if( NULL != pending ) { *status = pending->status; opal_list_remove_item( &pending_pids, (opal_list_item_t*)pending ); OBJ_RELEASE(pending); OPAL_THREAD_UNLOCK(&mutex); return wpid; } /** * Do we have any registered callback for this particular pid ? */ pending = find_pending_cb( wpid, false ); if( NULL != pending ) { opal_list_remove_item( ®istered_cb, (opal_list_item_t*)pending ); OBJ_RELEASE( pending ); } /** * No luck so far. Wait until the process complete ... */ if( WAIT_OBJECT_0 == WaitForSingleObject( (HANDLE)wpid, INFINITE ) ) { DWORD exitCode; /* Process completed. Grab the exit value and return. */ if( 0 == GetExitCodeProcess( (HANDLE)wpid, &exitCode ) ) { int error = GetLastError(); } *status = (int)exitCode; } OPAL_THREAD_UNLOCK(&mutex); return wpid;}static void CALLBACK trigger_process_detection( void* lpParameter, BOOLEAN TimerOrWaitFired ){ opal_process_handle_t* handle = (opal_process_handle_t*)lpParameter; DWORD exitCode; opal_list_remove_item( ®istered_cb, (opal_list_item_t*)handle ); /** * As this item will never be triggered again, we can safely remove the * registered handle. */ if( 0 == UnregisterWait( handle->registered_handle ) ) { /** * If any callback functions associated with the timer have not completed when * UnregisterWait is called, UnregisterWait unregisters the wait on the callback * functions and fails with the ERROR_IO_PENDING error code. The error code does * not indicate that the function has failed, and the function does not need to * be called again. If your code requires an error code to set only when the * unregister operation has failed, call UnregisterWaitEx instead. */ int error = GetLastError(); } /*if( 0 == CloseHandle( handle->registered_handle ) ) { int error = GetLastError(); }*/ handle->registered_handle = INVALID_HANDLE_VALUE; /** * Get the exit code of the process. */ if( 0 == GetExitCodeProcess( (HANDLE)handle->pid, &exitCode ) ) { int error = GetLastError(); } handle->status = (int)exitCode; handle->callback(handle->pid, handle->status, handle->data); OBJ_RELEASE( handle );}intorte_wait_cb(pid_t wpid, orte_wait_fn_t callback, void *data){ opal_process_handle_t* handle; if (wpid <= 0) return ORTE_ERR_NOT_IMPLEMENTED; if (NULL == callback) return ORTE_ERR_BAD_PARAM; OPAL_THREAD_LOCK(&mutex); handle = find_pending_pid(wpid, false); if( handle != NULL ) { opal_list_remove_item( &pending_pids, (opal_list_item_t*)handle ); OBJ_RELEASE(handle); return ORTE_SUCCESS; } handle = find_pending_cb( wpid, true ); handle->pid = wpid; handle->callback = callback; handle->data = data; RegisterWaitForSingleObject( &handle->registered_handle, (HANDLE)handle->pid, trigger_process_detection, (void*)handle, INFINITE, WT_EXECUTEINWAITTHREAD); OPAL_THREAD_UNLOCK(&mutex); return OPAL_SUCCESS;}intorte_wait_cb_cancel(pid_t wpid){ opal_process_handle_t* pending; OPAL_THREAD_LOCK(&mutex); /** * Do we have any registered callback for this particular pid ? */ pending = find_pending_cb( wpid, false ); if( NULL != pending ) { opal_list_remove_item( ®istered_cb, (opal_list_item_t*)pending ); OBJ_RELEASE( pending ); OPAL_THREAD_UNLOCK(&mutex); return ORTE_SUCCESS; } OPAL_THREAD_UNLOCK(&mutex); return ORTE_ERR_BAD_PARAM;}intorte_wait_cb_disable(void){ OPAL_THREAD_LOCK(&mutex); cb_enabled = false; OPAL_THREAD_UNLOCK(&mutex); return ORTE_SUCCESS;}intorte_wait_cb_enable(void){ OPAL_THREAD_LOCK(&mutex); cb_enabled = true; OPAL_THREAD_UNLOCK(&mutex); return ORTE_SUCCESS;}intorte_wait_kill(int sig){ opal_list_item_t* item; /** * First pass. For all registered processes not yet triggered, terminate the * process. Once this first pass is done, we will try to cleanup the ressources */ OPAL_THREAD_LOCK(&mutex); for (item = opal_list_get_first(®istered_cb) ; item != opal_list_get_end(®istered_cb) ; item = opal_list_get_next(item)) { opal_process_handle_t* handle = (opal_process_handle_t*)item; if( 0 == TerminateProcess( (HANDLE)handle->pid, sig ) ) { int error = GetLastError(); } } /* Give them a chance to complete the kill. */ SleepEx( 1000, TRUE ); /* And now clean all ressources. */ while (NULL != (item = opal_list_remove_first(®istered_cb))) { OBJ_RELEASE(item); } while (NULL != (item = opal_list_remove_first(&pending_pids))) { OBJ_RELEASE(item); } OPAL_THREAD_UNLOCK(&mutex); return ORTE_SUCCESS;}#else /* no waitpid and not windows */intorte_wait_init(void) { return ORTE_SUCCESS;}intorte_wait_finalize(void){ return ORTE_SUCCESS;}pid_torte_waitpid(pid_t wpid, int *status, int options){ return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_cb(pid_t wpid, orte_wait_fn_t callback, void *data){ return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_cb_cancel(pid_t wpid){ return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_cb_disable(void){ return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_cb_enable(void){ return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_kill(int sig){ return ORTE_ERR_NOT_SUPPORTED;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -