osc_pt2pt_sync.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 622 行 · 第 1/2 页
C
622 行
return ret;}intompi_osc_pt2pt_module_post(ompi_group_t *group, int assert, ompi_win_t *win){ int i; OBJ_RETAIN(group); /* BWB - do I need this? */ ompi_group_increment_proc_count(group); OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); assert(NULL == P2P_MODULE(win)->p2p_pw_group); P2P_MODULE(win)->p2p_pw_group = group; OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); /* Set our mode to expose w/ post */ ompi_win_remove_mode(win, OMPI_WIN_FENCE); ompi_win_append_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED); /* list how many complete counters we're still waiting on */ OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_complete_msgs), ompi_group_size(P2P_MODULE(win)->p2p_pw_group)); /* send a hello counter to everyone in group */ for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) { ompi_osc_pt2pt_control_send(P2P_MODULE(win), group->grp_proc_pointers[i], OMPI_OSC_PT2PT_HDR_POST, 1, 0); } return OMPI_SUCCESS;}intompi_osc_pt2pt_module_wait(ompi_win_t *win){ ompi_group_t *group; while (0 != (P2P_MODULE(win)->p2p_num_pending_in) || 0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) { ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } ompi_win_remove_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED); OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); group = P2P_MODULE(win)->p2p_pw_group; P2P_MODULE(win)->p2p_pw_group = NULL; OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); /* BWB - do I need this? */ ompi_group_decrement_proc_count(group); OBJ_RELEASE(group); return OMPI_SUCCESS;}int ompi_osc_pt2pt_module_test(ompi_win_t *win, int *flag){ ompi_group_t *group; if (0 != (P2P_MODULE(win)->p2p_num_pending_in) || 0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) { ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); if (0 != (P2P_MODULE(win)->p2p_num_pending_in) || 0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) { *flag = 0; return OMPI_SUCCESS; } } *flag = 1; ompi_win_remove_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED); OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); group = P2P_MODULE(win)->p2p_pw_group; P2P_MODULE(win)->p2p_pw_group = NULL; OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); /* BWB - do I need this? */ ompi_group_decrement_proc_count(group); OBJ_RELEASE(group); return OMPI_SUCCESS;}struct ompi_osc_pt2pt_pending_lock_t { opal_list_item_t super; ompi_proc_t *proc; int32_t lock_type;};typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t;OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t, NULL, NULL);intompi_osc_pt2pt_module_lock(int lock_type, int target, int assert, ompi_win_t *win){ ompi_proc_t *proc = ompi_comm_peer_lookup( P2P_MODULE(win)->p2p_comm, target ); assert(lock_type != 0); /* set our mode on the window */ ompi_win_remove_mode(win, OMPI_WIN_FENCE); ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS); opal_output_verbose(50, ompi_osc_base_output, "%d sending lock request to %d", P2P_MODULE(win)->p2p_comm->c_my_rank, target); /* generate a lock request */ ompi_osc_pt2pt_control_send(P2P_MODULE(win), proc, OMPI_OSC_PT2PT_HDR_LOCK_REQ, P2P_MODULE(win)->p2p_comm->c_my_rank, lock_type); /* return */ return OMPI_SUCCESS;}intompi_osc_pt2pt_module_unlock(int target, ompi_win_t *win){ int32_t out_count; opal_list_item_t *item; int ret; ompi_proc_t *proc = ompi_comm_peer_lookup( P2P_MODULE(win)->p2p_comm, target ); while (0 == P2P_MODULE(win)->p2p_lock_received_ack) { ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } P2P_MODULE(win)->p2p_lock_received_ack = 0; /* start all the requests */ ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win)); /* try to start all the requests. We've copied everything we need out of pending_sendreqs, so don't need the lock here */ out_count = opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)); /* we want to send all the requests, plus we wait for one more completion event for the control message ack from the unlocker saying we're done */ OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), out_count + 1); /* send the unlock request */ opal_output_verbose(50, ompi_osc_base_output, "%d sending unlock request to %d", P2P_MODULE(win)->p2p_comm->c_my_rank, target); ompi_osc_pt2pt_control_send(P2P_MODULE(win), proc, OMPI_OSC_PT2PT_HDR_UNLOCK_REQ, P2P_MODULE(win)->p2p_comm->c_my_rank, out_count); while (NULL != (item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) { ompi_osc_pt2pt_sendreq_t *req = (ompi_osc_pt2pt_sendreq_t*) item; ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req); if (OMPI_SUCCESS != ret) { opal_output_verbose(5, ompi_osc_base_output, "unlock: failure in starting sendreq (%d). Will try later.", ret); opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item); } } /* wait for all the requests */ while (0 != P2P_MODULE(win)->p2p_num_pending_out) { ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } /* set our mode on the window */ ompi_win_remove_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS); return OMPI_SUCCESS;}intompi_osc_pt2pt_passive_lock(ompi_osc_pt2pt_module_t *module, int32_t origin, int32_t lock_type){ bool send_ack = false; int ret = OMPI_SUCCESS; ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, origin ); ompi_osc_pt2pt_pending_lock_t *new_pending; OPAL_THREAD_LOCK(&(module->p2p_lock)); if (lock_type == MPI_LOCK_EXCLUSIVE) { if (module->p2p_lock_status == 0) { module->p2p_lock_status = MPI_LOCK_EXCLUSIVE; ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH); send_ack = true; } else { opal_output_verbose(50, ompi_osc_base_output, "%d queuing lock request from %d (%d)", module->p2p_comm->c_my_rank, origin, lock_type); new_pending = OBJ_NEW(ompi_osc_pt2pt_pending_lock_t); new_pending->proc = proc; new_pending->lock_type = lock_type; opal_list_append(&(module->p2p_locks_pending), &(new_pending->super)); } } else if (lock_type == MPI_LOCK_SHARED) { if (module->p2p_lock_status != MPI_LOCK_EXCLUSIVE) { module->p2p_lock_status = MPI_LOCK_SHARED; module->p2p_shared_count++; ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH); send_ack = true; } else { opal_output_verbose(50, ompi_osc_base_output, "queuing lock request from %d (%d)", module->p2p_comm->c_my_rank, origin, lock_type); new_pending = OBJ_NEW(ompi_osc_pt2pt_pending_lock_t); new_pending->proc = proc; new_pending->lock_type = lock_type; opal_list_append(&(module->p2p_locks_pending), &(new_pending->super)); } } else { ret = OMPI_ERROR; } OPAL_THREAD_UNLOCK(&(module->p2p_lock)); if (send_ack) { opal_output_verbose(50, ompi_osc_base_output, "%d sending lock ack to %d", module->p2p_comm->c_my_rank, origin); ompi_osc_pt2pt_control_send(module, proc, OMPI_OSC_PT2PT_HDR_LOCK_REQ, module->p2p_comm->c_my_rank, OMPI_SUCCESS); } return OMPI_SUCCESS;}intompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module, int32_t origin, int32_t count){ ompi_osc_pt2pt_pending_lock_t *new_pending = NULL; ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, origin ); assert(module->p2p_lock_status != 0); OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count); while (0 != module->p2p_num_pending_in) { ompi_osc_pt2pt_progress_long(module); } OPAL_THREAD_LOCK(&(module->p2p_lock)); if (module->p2p_lock_status == MPI_LOCK_EXCLUSIVE) { ompi_win_remove_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH); module->p2p_lock_status = 0; } else { module->p2p_shared_count--; if (module->p2p_shared_count == 0) { ompi_win_remove_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH); module->p2p_lock_status = 0; } } ompi_osc_pt2pt_control_send(module, proc, OMPI_OSC_PT2PT_HDR_UNLOCK_REPLY, OMPI_SUCCESS, OMPI_SUCCESS); /* if we were really unlocked, see if we have more to process */ new_pending = (ompi_osc_pt2pt_pending_lock_t*) opal_list_remove_first(&(module->p2p_locks_pending)); OPAL_THREAD_UNLOCK(&(module->p2p_lock)); if (NULL != new_pending) { opal_output_verbose(50, ompi_osc_base_output, "sending lock request to proc"); ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH); /* set lock state and generate a lock request */ module->p2p_lock_status = new_pending->lock_type; ompi_osc_pt2pt_control_send(module, new_pending->proc, OMPI_OSC_PT2PT_HDR_LOCK_REQ, module->p2p_comm->c_my_rank, OMPI_SUCCESS); OBJ_DESTRUCT(new_pending); } return OMPI_SUCCESS;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?