osc_pt2pt_component.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 788 行 · 第 1/2 页
C
788 行
opal_atomic_mb(); /* start up receive for protocol headers */ OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers, item, ret); if (NULL == item) { free(module->p2p_sc_remote_ranks); free(module->p2p_sc_remote_active_ranks); free(module->p2p_fence_coll_results); free(module->p2p_fence_coll_counts); free(module->p2p_copy_num_pending_sendreqs); OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs); OBJ_DESTRUCT(&module->p2p_long_msgs); free(module->p2p_num_pending_sendreqs); OBJ_DESTRUCT(&module->p2p_pending_sendreqs); ompi_comm_free(&comm); OBJ_DESTRUCT(&(module->p2p_acc_lock)); OBJ_DESTRUCT(&(module->p2p_lock)); free(module); return OMPI_ERROR; } buffer = (ompi_osc_pt2pt_buffer_t*) item; buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb; buffer->cbdata = (void*) module; ret = MCA_PML_CALL(irecv(buffer->payload, mca_osc_pt2pt_component.p2p_c_eager_size, MPI_BYTE, MPI_ANY_SOURCE, CONTROL_MSG_TAG, module->p2p_comm, &buffer->request)); opal_list_append(&module->p2p_pending_control_sends, &buffer->super.super); return ret;}/* dispatch for callback on message completion */static voidompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffer){ int ret; void *payload, *buffer; size_t buffer_len; ompi_osc_pt2pt_module_t *module; ompi_osc_pt2pt_buffer_t *new_pt2pt_buffer; opal_free_list_item_t *item; buffer = pt2pt_buffer->payload; buffer_len = pt2pt_buffer->status._count; module = pt2pt_buffer->cbdata; /* post a new receive message */ /* start up receive for protocol headers */ OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers, item, ret); assert(NULL != item); new_pt2pt_buffer = (ompi_osc_pt2pt_buffer_t*) item; new_pt2pt_buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb; new_pt2pt_buffer->cbdata = (void*) module; ret = MCA_PML_CALL(irecv(new_pt2pt_buffer->payload, mca_osc_pt2pt_component.p2p_c_eager_size, MPI_BYTE, MPI_ANY_SOURCE, CONTROL_MSG_TAG, module->p2p_comm, &new_pt2pt_buffer->request)); assert(OMPI_SUCCESS == ret); opal_list_append(&module->p2p_pending_control_sends, &new_pt2pt_buffer->super.super); assert(buffer_len >= sizeof(ompi_osc_pt2pt_base_header_t)); /* handle message */ switch (((ompi_osc_pt2pt_base_header_t*) buffer)->hdr_type) { case OMPI_OSC_PT2PT_HDR_PUT: { ompi_osc_pt2pt_send_header_t *header; /* get our header and payload */ header = (ompi_osc_pt2pt_send_header_t*) buffer; payload = (void*) (header + 1);#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); }#endif assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); if (!ompi_win_exposure_epoch(module->p2p_win)) { if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) { /* well, we're definitely in an access epoch now */ ompi_win_set_mode(module->p2p_win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_EXPOSE_EPOCH); } } ret = ompi_osc_pt2pt_sendreq_recv_put(module, header, payload); } break; case OMPI_OSC_PT2PT_HDR_ACC: { ompi_osc_pt2pt_send_header_t *header; /* get our header and payload */ header = (ompi_osc_pt2pt_send_header_t*) buffer; payload = (void*) (header + 1);#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); }#endif assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); if (!ompi_win_exposure_epoch(module->p2p_win)) { if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) { /* well, we're definitely in an access epoch now */ ompi_win_set_mode(module->p2p_win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_EXPOSE_EPOCH); } } /* receive into temporary buffer */ ret = ompi_osc_pt2pt_sendreq_recv_accum(module, header, payload); } break; case OMPI_OSC_PT2PT_HDR_GET: { ompi_datatype_t *datatype; ompi_osc_pt2pt_send_header_t *header; ompi_osc_pt2pt_replyreq_t *replyreq; ompi_proc_t *proc; /* get our header and payload */ header = (ompi_osc_pt2pt_send_header_t*) buffer; payload = (void*) (header + 1);#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); }#endif assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); if (!ompi_win_exposure_epoch(module->p2p_win)) { if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) { /* well, we're definitely in an access epoch now */ ompi_win_set_mode(module->p2p_win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_EXPOSE_EPOCH); } } /* create or get a pointer to our datatype */ proc = ompi_comm_peer_lookup( module->p2p_comm, header->hdr_origin ); datatype = ompi_osc_pt2pt_datatype_create(proc, &payload); /* create replyreq sendreq */ ret = ompi_osc_pt2pt_replyreq_alloc_init(module, header->hdr_origin, header->hdr_origin_sendreq, header->hdr_target_disp, header->hdr_target_count, datatype, &replyreq); /* send replyreq */ ompi_osc_pt2pt_replyreq_send(module, replyreq); /* sendreq does the right retain, so we can release safely */ OBJ_RELEASE(datatype); } break; case OMPI_OSC_PT2PT_HDR_REPLY: { ompi_osc_pt2pt_reply_header_t *header; ompi_osc_pt2pt_sendreq_t *sendreq; /* get our header and payload */ header = (ompi_osc_pt2pt_reply_header_t*) buffer; payload = (void*) (header + 1);#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header); }#endif /* get original sendreq pointer */ sendreq = (ompi_osc_pt2pt_sendreq_t*) header->hdr_origin_sendreq.pval; module = sendreq->req_module; /* receive data */ ompi_osc_pt2pt_replyreq_recv(module, sendreq, header, payload); } break; case OMPI_OSC_PT2PT_HDR_POST: { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); }#endif assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs), -1); } break; case OMPI_OSC_PT2PT_HDR_COMPLETE: { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); }#endif assert(module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); /* we've heard from one more place, and have value reqs to process */ OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs), -1); OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]); } break; case OMPI_OSC_PT2PT_HDR_LOCK_REQ: { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); }#endif assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); if (header->hdr_value[1] > 0) { ompi_osc_pt2pt_passive_lock(module, header->hdr_value[0], header->hdr_value[1]); } else { OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1); } } break; case OMPI_OSC_PT2PT_HDR_UNLOCK_REQ: { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); }#endif assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); ompi_osc_pt2pt_passive_unlock(module, header->hdr_value[0], header->hdr_value[1]); } break; case OMPI_OSC_PT2PT_HDR_UNLOCK_REPLY: { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); }#endif assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1); } break; default: opal_output_verbose(5, ompi_osc_base_output, "received packet for Window with unknown type"); } item = &(pt2pt_buffer->super); OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, item);}intompi_osc_pt2pt_request_test(ompi_request_t ** rptr, int *completed, ompi_status_public_t * status ){ ompi_request_t *request = *rptr; int ret = OMPI_SUCCESS;#if OMPI_ENABLE_PROGRESS_THREADS == 0 if (request->req_state == OMPI_REQUEST_INACTIVE || request->req_complete) { ret = ompi_request_test(rptr, completed, status); } else { *completed = 0; }#else ret = ompi_request_test(rptr, completed, status);#endif return ret;}intompi_osc_pt2pt_progress(void){ int ret, done, count = 0; void *node; uint32_t key; ompi_osc_pt2pt_module_t *module; opal_list_item_t *item; ret = opal_hash_table_get_first_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules, &key, (void**) &module, &node); if (OMPI_SUCCESS != ret) return 0; do { /* loop through pending requests */ for (item = opal_list_get_first(&module->p2p_pending_control_sends) ; item != opal_list_get_end(&module->p2p_pending_control_sends) ; item = opal_list_get_next(item)) { ompi_osc_pt2pt_buffer_t *buffer = (ompi_osc_pt2pt_buffer_t*) item; ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &buffer->status); if (OMPI_SUCCESS == ret && done) { item = opal_list_remove_item(&module->p2p_pending_control_sends, item); buffer->cbfunc(buffer); /* it's possible that cbfunc is going to do something that calls progress, which means our loop is probably hosed up because it's possible that the list changed under us. It's either exit the loop through the list or start all over again. I'm going with exit. */ break; } } } while (OMPI_SUCCESS == opal_hash_table_get_next_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules, &key, (void**) &module, node, &node)); return count;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?