⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 orte_wait.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
{    if (!cb_enabled) return;    while (1) {        int status;        pid_t ret = internal_waitpid(-1, &status, WNOHANG);        pending_pids_item_t *pending;        registered_cb_item_t *cb;        if (-1 == ret && EINTR == errno) continue;        if (ret <= 0) break;        cb = find_waiting_cb(ret, false);        if (NULL == cb) {            pending = OBJ_NEW(pending_pids_item_t);            pending->pid = ret;            pending->status = status;            opal_list_append(&pending_pids, &pending->super);        } else {            opal_list_remove_item(&registered_cb, &cb->super);            OPAL_THREAD_UNLOCK(&mutex);            cb->callback(cb->pid, status, cb->data);            OPAL_THREAD_LOCK(&mutex);            OBJ_RELEASE(cb);        }    }}static voidtrigger_callback(registered_cb_item_t *cb, pending_pids_item_t *pending){    assert(cb->pid == pending->pid);    cb->callback(cb->pid, pending->status, cb->data);    opal_list_remove_item(&pending_pids, (opal_list_item_t*) pending);    opal_list_remove_item(&registered_cb, (opal_list_item_t*) cb);}static intregister_callback(pid_t pid, orte_wait_fn_t callback, void *data){    registered_cb_item_t *reg_cb;    pending_pids_item_t *pending;    /* register the callback */    reg_cb = find_waiting_cb(pid, true);    if (NULL == reg_cb) return ORTE_ERROR;    if (NULL != reg_cb->callback) return ORTE_EXISTS;    reg_cb->pid = pid;    reg_cb->callback = callback;    reg_cb->data = data;    /* make sure we shouldn't trigger right now */    pending = find_pending_pid(pid, false);    if (NULL != pending) {        trigger_callback(reg_cb, pending);    }    return ORTE_SUCCESS;}static intunregister_callback(pid_t pid){    registered_cb_item_t *reg_cb;    /* register the callback */    reg_cb = find_waiting_cb(pid, false);    if (NULL == reg_cb) return ORTE_ERR_BAD_PARAM;    opal_list_remove_item(&registered_cb, (opal_list_item_t*) reg_cb);    return ORTE_SUCCESS;}static pid_tinternal_waitpid(pid_t pid, int *status, int options){#if  OMPI_THREADS_HAVE_DIFFERENT_PIDS    waitpid_callback_data_t data;    struct timeval tv;    struct opal_event ev;    if (opal_event_progress_thread()) {        /* I already am the progress thread.  no need to event me */        return waitpid(pid, status, options);    }    data.done = false;    data.pid = pid;    data.options = options;    OBJ_CONSTRUCT(&(data.mutex), opal_mutex_t);    OBJ_CONSTRUCT(&(data.cond), opal_condition_t);    OPAL_THREAD_LOCK(&(data.mutex));    tv.tv_sec = 0;    tv.tv_usec = 0;    opal_evtimer_set(&ev, internal_waitpid_callback, &data);    opal_evtimer_add(&ev, &tv);    while (data.done == false) {        opal_condition_wait(&(data.cond), &(data.mutex));    }    OPAL_THREAD_UNLOCK(&(data.mutex));    OBJ_DESTRUCT(&(data.cond));    OBJ_DESTRUCT(&(data.mutex));    *status = data.status;    return data.ret;    #else    return waitpid(pid, status, options);#endif}#if  OMPI_THREADS_HAVE_DIFFERENT_PIDSstatic voidinternal_waitpid_callback(int fd, short event, void *arg){    waitpid_callback_data_t *data = (waitpid_callback_data_t*) arg;    data->ret = waitpid(data->pid, &(data->status), data->options);    data->done = true;    opal_condition_signal(&(data->cond));}#endif#elif defined(__WINDOWS__)typedef struct {    opal_list_item_t super;    pid_t pid;    HANDLE registered_handle;    orte_wait_fn_t callback;    void *data;    int status;} opal_process_handle_t;static void opal_process_handle_construct( opal_object_t* obj ){    opal_process_handle_t* handle = (opal_process_handle_t*)obj;    handle->registered_handle = INVALID_HANDLE_VALUE;    handle->pid               = 0;    handle->callback          = NULL;    handle->data              = NULL;}static void opal_process_handle_destruct( opal_object_t* obj ){    opal_process_handle_t* handle = (opal_process_handle_t*)obj;    if( INVALID_HANDLE_VALUE != handle->registered_handle ) {        if( 0 == UnregisterWait( handle->registered_handle ) ) {            int error = GetLastError();        }        if( 0 == CloseHandle( handle->registered_handle ) ) {            int error = GetLastError();        }        handle->registered_handle = INVALID_HANDLE_VALUE;    }}static OBJ_CLASS_INSTANCE( opal_process_handle_t, opal_list_item_t,                           opal_process_handle_construct, opal_process_handle_destruct );intorte_wait_init(void){    OBJ_CONSTRUCT(&mutex, opal_mutex_t);    OBJ_CONSTRUCT(&registered_cb, opal_list_t);    OBJ_CONSTRUCT(&pending_pids, opal_list_t);    return ORTE_SUCCESS;}intorte_wait_finalize(void){    opal_list_item_t* item;    OPAL_THREAD_LOCK(&mutex);    OPAL_THREAD_UNLOCK(&mutex);    while (NULL != (item = opal_list_remove_first(&pending_pids))) {        OBJ_RELEASE(item);    }    while (NULL != (item = opal_list_remove_first(&registered_cb))) {        OBJ_RELEASE(item);    }    OBJ_DESTRUCT(&mutex);    OBJ_DESTRUCT(&registered_cb);    OBJ_DESTRUCT(&pending_pids);    return ORTE_SUCCESS;}/** * Internal function which find a corresponding process structure * based on the pid. If create is true and the pid does not have a * corresponding process structure, one will be automatically * created and attached to the end of the list. */static opal_process_handle_t* opal_find_pid_in_list(opal_list_t* list, pid_t pid, bool create){    opal_list_item_t *item = NULL;    opal_process_handle_t *handle = NULL;    for (item = opal_list_get_first(list) ;         item != opal_list_get_end(list) ;         item = opal_list_get_next(item)) {        handle = (opal_process_handle_t*) item;        if (handle->pid == pid) {            return handle;        }    }    if (create) {        handle = OBJ_NEW(opal_process_handle_t);        if (NULL == handle) return NULL;        handle->pid = pid;        handle->callback = NULL;        handle->data = NULL;        opal_list_append(list, (opal_list_item_t*)handle);        return handle;    }    return NULL;}#define find_pending_pid(PID, CREATE)  opal_find_pid_in_list( &pending_pids, (PID), (CREATE) )#define find_pending_cb(PID, CREATE)   opal_find_pid_in_list( &registered_cb, (PID), (CREATE) )pid_torte_waitpid(pid_t wpid, int *status, int options){    opal_process_handle_t* pending;    OPAL_THREAD_LOCK(&mutex);    /**     * Is the child already gone ?     */    pending = find_pending_pid( wpid, false );    if( NULL != pending ) {        *status = pending->status;        opal_list_remove_item( &pending_pids, (opal_list_item_t*)pending );        OBJ_RELEASE(pending);        OPAL_THREAD_UNLOCK(&mutex);        return wpid;    }    /**     * Do we have any registered callback for this particular pid ?     */    pending = find_pending_cb( wpid, false );    if( NULL != pending ) {        opal_list_remove_item( &registered_cb, (opal_list_item_t*)pending );        OBJ_RELEASE( pending );    }    /**     * No luck so far. Wait until the process complete ...     */    if( WAIT_OBJECT_0 == WaitForSingleObject( (HANDLE)wpid, INFINITE ) ) {        DWORD exitCode;        /* Process completed. Grab the exit value and return. */        if( 0 == GetExitCodeProcess( (HANDLE)wpid, &exitCode ) ) {            int error = GetLastError();        }        *status = (int)exitCode;    }    OPAL_THREAD_UNLOCK(&mutex);    return wpid;}static void CALLBACK trigger_process_detection( void* lpParameter, BOOLEAN TimerOrWaitFired ){    opal_process_handle_t* handle = (opal_process_handle_t*)lpParameter;    DWORD exitCode;    opal_list_remove_item( &registered_cb, (opal_list_item_t*)handle );    /**     * As this item will never be triggered again, we can safely remove the     * registered handle.     */    if( 0 == UnregisterWait( handle->registered_handle ) ) {		/**		 * If any callback functions associated with the timer have not completed when		 * UnregisterWait is called, UnregisterWait unregisters the wait on the callback		 * functions and fails with the ERROR_IO_PENDING error code. The error code does		 * not indicate that the function has failed, and the function does not need to		 * be called again. If your code requires an error code to set only when the		 * unregister operation has failed, call UnregisterWaitEx instead.		 */        int error = GetLastError();    }    /*if( 0 == CloseHandle( handle->registered_handle ) ) {        int error = GetLastError();    }*/    handle->registered_handle = INVALID_HANDLE_VALUE;    /**     * Get the exit code of the process.     */    if( 0 == GetExitCodeProcess( (HANDLE)handle->pid, &exitCode ) ) {        int error = GetLastError();    }    handle->status = (int)exitCode;    handle->callback(handle->pid, handle->status, handle->data);    OBJ_RELEASE( handle );}intorte_wait_cb(pid_t wpid, orte_wait_fn_t callback, void *data){    opal_process_handle_t* handle;    if (wpid <= 0) return ORTE_ERR_NOT_IMPLEMENTED;    if (NULL == callback) return ORTE_ERR_BAD_PARAM;    OPAL_THREAD_LOCK(&mutex);    handle = find_pending_pid(wpid, false);    if( handle != NULL ) {        opal_list_remove_item( &pending_pids, (opal_list_item_t*)handle );        OBJ_RELEASE(handle);        return ORTE_SUCCESS;    }    handle = find_pending_cb( wpid, true );    handle->pid = wpid;    handle->callback = callback;    handle->data = data;    RegisterWaitForSingleObject( &handle->registered_handle, (HANDLE)handle->pid,                                  trigger_process_detection, (void*)handle, INFINITE, WT_EXECUTEINWAITTHREAD);    OPAL_THREAD_UNLOCK(&mutex);    return OPAL_SUCCESS;}intorte_wait_cb_cancel(pid_t wpid){    opal_process_handle_t* pending;    OPAL_THREAD_LOCK(&mutex);    /**     * Do we have any registered callback for this particular pid ?     */    pending = find_pending_cb( wpid, false );    if( NULL != pending ) {        opal_list_remove_item( &registered_cb, (opal_list_item_t*)pending );        OBJ_RELEASE( pending );        OPAL_THREAD_UNLOCK(&mutex);        return ORTE_SUCCESS;    }    OPAL_THREAD_UNLOCK(&mutex);    return ORTE_ERR_BAD_PARAM;}intorte_wait_cb_disable(void){    OPAL_THREAD_LOCK(&mutex);    cb_enabled = false;    OPAL_THREAD_UNLOCK(&mutex);    return ORTE_SUCCESS;}intorte_wait_cb_enable(void){    OPAL_THREAD_LOCK(&mutex);    cb_enabled = true;    OPAL_THREAD_UNLOCK(&mutex);    return ORTE_SUCCESS;}intorte_wait_kill(int sig){    opal_list_item_t* item;    /**     * First pass. For all registered processes not yet triggered, terminate the     * process. Once this first pass is done, we will try to cleanup the ressources     */    OPAL_THREAD_LOCK(&mutex);    for (item = opal_list_get_first(&registered_cb) ;         item != opal_list_get_end(&registered_cb) ;         item = opal_list_get_next(item)) {        opal_process_handle_t* handle = (opal_process_handle_t*)item;        if( 0 == TerminateProcess( (HANDLE)handle->pid, sig ) ) {            int error = GetLastError();        }    }    /* Give them a chance to complete the kill. */    SleepEx( 1000, TRUE );    /* And now clean all ressources. */    while (NULL != (item = opal_list_remove_first(&registered_cb))) {        OBJ_RELEASE(item);    }    while (NULL != (item = opal_list_remove_first(&pending_pids))) {        OBJ_RELEASE(item);    }    OPAL_THREAD_UNLOCK(&mutex);    return ORTE_SUCCESS;}#else /* no waitpid and not windows */intorte_wait_init(void) {    return ORTE_SUCCESS;}intorte_wait_finalize(void){    return ORTE_SUCCESS;}pid_torte_waitpid(pid_t wpid, int *status, int options){    return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_cb(pid_t wpid, orte_wait_fn_t callback, void *data){    return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_cb_cancel(pid_t wpid){    return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_cb_disable(void){    return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_cb_enable(void){    return ORTE_ERR_NOT_SUPPORTED;}intorte_wait_kill(int sig){    return ORTE_ERR_NOT_SUPPORTED;}#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -