osc_pt2pt_sync.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 622 行 · 第 1/2 页

C
622
字号
    return ret;}intompi_osc_pt2pt_module_post(ompi_group_t *group,                           int assert,                           ompi_win_t *win){    int i;    OBJ_RETAIN(group);    /* BWB - do I need this? */    ompi_group_increment_proc_count(group);    OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));    assert(NULL == P2P_MODULE(win)->p2p_pw_group);    P2P_MODULE(win)->p2p_pw_group = group;        OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));    /* Set our mode to expose w/ post */    ompi_win_remove_mode(win, OMPI_WIN_FENCE);    ompi_win_append_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);    /* list how many complete counters we're still waiting on */    OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_complete_msgs),                      ompi_group_size(P2P_MODULE(win)->p2p_pw_group));    /* send a hello counter to everyone in group */    for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) {        ompi_osc_pt2pt_control_send(P2P_MODULE(win),                                     group->grp_proc_pointers[i],                                    OMPI_OSC_PT2PT_HDR_POST, 1, 0);    }        return OMPI_SUCCESS;}intompi_osc_pt2pt_module_wait(ompi_win_t *win){    ompi_group_t *group;    while (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||           0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {        ompi_osc_pt2pt_progress_long(P2P_MODULE(win));            }    ompi_win_remove_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);    OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));    group = P2P_MODULE(win)->p2p_pw_group;    P2P_MODULE(win)->p2p_pw_group = NULL;    OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));    /* BWB - do I need this? */    ompi_group_decrement_proc_count(group);    OBJ_RELEASE(group);    return OMPI_SUCCESS;}int ompi_osc_pt2pt_module_test(ompi_win_t *win,                           int *flag){    ompi_group_t *group;    if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||        0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {        ompi_osc_pt2pt_progress_long(P2P_MODULE(win));                if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||            0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {            *flag = 0;            return OMPI_SUCCESS;        }    }    *flag = 1;    ompi_win_remove_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);    OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));    group = P2P_MODULE(win)->p2p_pw_group;    P2P_MODULE(win)->p2p_pw_group = NULL;    OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));    /* BWB - do I need this? */    ompi_group_decrement_proc_count(group);    OBJ_RELEASE(group);    return OMPI_SUCCESS;}struct ompi_osc_pt2pt_pending_lock_t {    opal_list_item_t super;    ompi_proc_t *proc;    int32_t lock_type;};typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t;OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t,                   NULL, NULL);intompi_osc_pt2pt_module_lock(int lock_type,                           int target,                           int assert,                           ompi_win_t *win){    ompi_proc_t *proc = ompi_comm_peer_lookup( P2P_MODULE(win)->p2p_comm, target );    assert(lock_type != 0);    /* set our mode on the window */    ompi_win_remove_mode(win, OMPI_WIN_FENCE);    ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);    opal_output_verbose(50, ompi_osc_base_output,                        "%d sending lock request to %d",                         P2P_MODULE(win)->p2p_comm->c_my_rank, target);    /* generate a lock request */    ompi_osc_pt2pt_control_send(P2P_MODULE(win),                                 proc,                                OMPI_OSC_PT2PT_HDR_LOCK_REQ,                                P2P_MODULE(win)->p2p_comm->c_my_rank,                                lock_type);    /* return */    return OMPI_SUCCESS;}intompi_osc_pt2pt_module_unlock(int target,                             ompi_win_t *win){    int32_t out_count;    opal_list_item_t *item;    int ret;    ompi_proc_t *proc = ompi_comm_peer_lookup( P2P_MODULE(win)->p2p_comm, target );    while (0 == P2P_MODULE(win)->p2p_lock_received_ack) {        ompi_osc_pt2pt_progress_long(P2P_MODULE(win));            }    P2P_MODULE(win)->p2p_lock_received_ack = 0;    /* start all the requests */    ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));    /* try to start all the requests.  We've copied everything we need       out of pending_sendreqs, so don't need the lock here */    out_count = opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs));    /* we want to send all the requests, plus we wait for one more       completion event for the control message ack from the unlocker       saying we're done */    OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), out_count + 1);    /* send the unlock request */    opal_output_verbose(50, ompi_osc_base_output,                        "%d sending unlock request to %d",                         P2P_MODULE(win)->p2p_comm->c_my_rank, target);    ompi_osc_pt2pt_control_send(P2P_MODULE(win),                                 proc,                                OMPI_OSC_PT2PT_HDR_UNLOCK_REQ,                                P2P_MODULE(win)->p2p_comm->c_my_rank,                                out_count);    while (NULL !=            (item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {        ompi_osc_pt2pt_sendreq_t *req =             (ompi_osc_pt2pt_sendreq_t*) item;        ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);        if (OMPI_SUCCESS != ret) {            opal_output_verbose(5, ompi_osc_base_output,                                "unlock: failure in starting sendreq (%d).  Will try later.",                                ret);            opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);        }    }    /* wait for all the requests */    while (0 != P2P_MODULE(win)->p2p_num_pending_out) {        ompi_osc_pt2pt_progress_long(P2P_MODULE(win));            }    /* set our mode on the window */    ompi_win_remove_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);    return OMPI_SUCCESS;}intompi_osc_pt2pt_passive_lock(ompi_osc_pt2pt_module_t *module,                            int32_t origin,                            int32_t lock_type){    bool send_ack = false;    int ret = OMPI_SUCCESS;    ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, origin );    ompi_osc_pt2pt_pending_lock_t *new_pending;    OPAL_THREAD_LOCK(&(module->p2p_lock));    if (lock_type == MPI_LOCK_EXCLUSIVE) {        if (module->p2p_lock_status == 0) {            module->p2p_lock_status = MPI_LOCK_EXCLUSIVE;            ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);            send_ack = true;        } else {            opal_output_verbose(50, ompi_osc_base_output,                                "%d queuing lock request from %d (%d)",                                 module->p2p_comm->c_my_rank, origin, lock_type);            new_pending = OBJ_NEW(ompi_osc_pt2pt_pending_lock_t);            new_pending->proc = proc;            new_pending->lock_type = lock_type;            opal_list_append(&(module->p2p_locks_pending), &(new_pending->super));        }    } else if (lock_type == MPI_LOCK_SHARED) {        if (module->p2p_lock_status != MPI_LOCK_EXCLUSIVE) {            module->p2p_lock_status = MPI_LOCK_SHARED;            module->p2p_shared_count++;            ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);            send_ack = true;        } else {            opal_output_verbose(50, ompi_osc_base_output,                                "queuing lock request from %d (%d)",                                 module->p2p_comm->c_my_rank, origin, lock_type);            new_pending = OBJ_NEW(ompi_osc_pt2pt_pending_lock_t);            new_pending->proc = proc;            new_pending->lock_type = lock_type;            opal_list_append(&(module->p2p_locks_pending), &(new_pending->super));        }    } else {        ret = OMPI_ERROR;    }    OPAL_THREAD_UNLOCK(&(module->p2p_lock));    if (send_ack) {        opal_output_verbose(50, ompi_osc_base_output,                            "%d sending lock ack to %d",                             module->p2p_comm->c_my_rank, origin);        ompi_osc_pt2pt_control_send(module, proc,                                    OMPI_OSC_PT2PT_HDR_LOCK_REQ,                                    module->p2p_comm->c_my_rank,                                    OMPI_SUCCESS);    }    return OMPI_SUCCESS;}intompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,                              int32_t origin,                              int32_t count){    ompi_osc_pt2pt_pending_lock_t *new_pending = NULL;    ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, origin );    assert(module->p2p_lock_status != 0);    OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count);    while (0 != module->p2p_num_pending_in) {        ompi_osc_pt2pt_progress_long(module);    }    OPAL_THREAD_LOCK(&(module->p2p_lock));    if (module->p2p_lock_status == MPI_LOCK_EXCLUSIVE) {        ompi_win_remove_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);        module->p2p_lock_status = 0;    } else {        module->p2p_shared_count--;        if (module->p2p_shared_count == 0) {            ompi_win_remove_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);            module->p2p_lock_status = 0;        }    }    ompi_osc_pt2pt_control_send(module, proc,                                OMPI_OSC_PT2PT_HDR_UNLOCK_REPLY,                                OMPI_SUCCESS, OMPI_SUCCESS);    /* if we were really unlocked, see if we have more to process */    new_pending = (ompi_osc_pt2pt_pending_lock_t*)         opal_list_remove_first(&(module->p2p_locks_pending));    OPAL_THREAD_UNLOCK(&(module->p2p_lock));    if (NULL != new_pending) {        opal_output_verbose(50, ompi_osc_base_output,                            "sending lock request to proc");        ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);        /* set lock state and generate a lock request */        module->p2p_lock_status = new_pending->lock_type;        ompi_osc_pt2pt_control_send(module,                                    new_pending->proc,                                    OMPI_OSC_PT2PT_HDR_LOCK_REQ,                                    module->p2p_comm->c_my_rank,                                    OMPI_SUCCESS);        OBJ_DESTRUCT(new_pending);    }    return OMPI_SUCCESS;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?