osc_pt2pt_component.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 788 行 · 第 1/2 页

C
788
字号
    opal_atomic_mb();    /* start up receive for protocol headers */    OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,                       item, ret);    if (NULL == item) {        free(module->p2p_sc_remote_ranks);        free(module->p2p_sc_remote_active_ranks);        free(module->p2p_fence_coll_results);        free(module->p2p_fence_coll_counts);        free(module->p2p_copy_num_pending_sendreqs);        OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);        OBJ_DESTRUCT(&module->p2p_long_msgs);        free(module->p2p_num_pending_sendreqs);        OBJ_DESTRUCT(&module->p2p_pending_sendreqs);        ompi_comm_free(&comm);        OBJ_DESTRUCT(&(module->p2p_acc_lock));        OBJ_DESTRUCT(&(module->p2p_lock));        free(module);        return OMPI_ERROR;    }    buffer = (ompi_osc_pt2pt_buffer_t*) item;    buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb;    buffer->cbdata = (void*) module;        ret = MCA_PML_CALL(irecv(buffer->payload,                             mca_osc_pt2pt_component.p2p_c_eager_size,                             MPI_BYTE,                             MPI_ANY_SOURCE,                             CONTROL_MSG_TAG,                             module->p2p_comm,                             &buffer->request));    opal_list_append(&module->p2p_pending_control_sends,                      &buffer->super.super);    return ret;}/* dispatch for callback on message completion */static voidompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffer){    int ret;    void *payload, *buffer;    size_t buffer_len;    ompi_osc_pt2pt_module_t *module;    ompi_osc_pt2pt_buffer_t *new_pt2pt_buffer;    opal_free_list_item_t *item;    buffer = pt2pt_buffer->payload;    buffer_len = pt2pt_buffer->status._count;    module = pt2pt_buffer->cbdata;    /* post a new receive message */    /* start up receive for protocol headers */    OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,                       item, ret);    assert(NULL != item);    new_pt2pt_buffer = (ompi_osc_pt2pt_buffer_t*) item;    new_pt2pt_buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb;    new_pt2pt_buffer->cbdata = (void*) module;        ret = MCA_PML_CALL(irecv(new_pt2pt_buffer->payload,                             mca_osc_pt2pt_component.p2p_c_eager_size,                             MPI_BYTE,                             MPI_ANY_SOURCE,                             CONTROL_MSG_TAG,                             module->p2p_comm,                             &new_pt2pt_buffer->request));    assert(OMPI_SUCCESS == ret);    opal_list_append(&module->p2p_pending_control_sends,                      &new_pt2pt_buffer->super.super);    assert(buffer_len >=           sizeof(ompi_osc_pt2pt_base_header_t));    /* handle message */    switch (((ompi_osc_pt2pt_base_header_t*) buffer)->hdr_type) {    case OMPI_OSC_PT2PT_HDR_PUT:        {            ompi_osc_pt2pt_send_header_t *header;            /* get our header and payload */            header = (ompi_osc_pt2pt_send_header_t*)                 buffer;            payload = (void*) (header + 1);#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);            }#endif            assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));            if (!ompi_win_exposure_epoch(module->p2p_win)) {                if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {                    /* well, we're definitely in an access epoch now */                    ompi_win_set_mode(module->p2p_win,                                       OMPI_WIN_FENCE |                                       OMPI_WIN_ACCESS_EPOCH |                                      OMPI_WIN_EXPOSE_EPOCH);                }            }            ret = ompi_osc_pt2pt_sendreq_recv_put(module, header, payload);        }        break;    case OMPI_OSC_PT2PT_HDR_ACC:         {            ompi_osc_pt2pt_send_header_t *header;            /* get our header and payload */            header = (ompi_osc_pt2pt_send_header_t*)                 buffer;            payload = (void*) (header + 1);#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);            }#endif            assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));            if (!ompi_win_exposure_epoch(module->p2p_win)) {                if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {                    /* well, we're definitely in an access epoch now */                    ompi_win_set_mode(module->p2p_win,                                       OMPI_WIN_FENCE |                                       OMPI_WIN_ACCESS_EPOCH |                                      OMPI_WIN_EXPOSE_EPOCH);                }            }            /* receive into temporary buffer */            ret = ompi_osc_pt2pt_sendreq_recv_accum(module, header, payload);        }        break;    case OMPI_OSC_PT2PT_HDR_GET:        {            ompi_datatype_t *datatype;            ompi_osc_pt2pt_send_header_t *header;            ompi_osc_pt2pt_replyreq_t *replyreq;            ompi_proc_t *proc;            /* get our header and payload */            header = (ompi_osc_pt2pt_send_header_t*)                 buffer;            payload = (void*) (header + 1);#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);            }#endif            assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));            if (!ompi_win_exposure_epoch(module->p2p_win)) {                if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {                    /* well, we're definitely in an access epoch now */                    ompi_win_set_mode(module->p2p_win,                                       OMPI_WIN_FENCE |                                       OMPI_WIN_ACCESS_EPOCH |                                      OMPI_WIN_EXPOSE_EPOCH);                }            }            /* create or get a pointer to our datatype */            proc = ompi_comm_peer_lookup( module->p2p_comm, header->hdr_origin );            datatype = ompi_osc_pt2pt_datatype_create(proc, &payload);            /* create replyreq sendreq */            ret = ompi_osc_pt2pt_replyreq_alloc_init(module,                                                  header->hdr_origin,                                                  header->hdr_origin_sendreq,                                                  header->hdr_target_disp,                                                  header->hdr_target_count,                                                  datatype,                                                  &replyreq);            /* send replyreq */            ompi_osc_pt2pt_replyreq_send(module, replyreq);            /* sendreq does the right retain, so we can release safely */            OBJ_RELEASE(datatype);        }        break;    case OMPI_OSC_PT2PT_HDR_REPLY:        {            ompi_osc_pt2pt_reply_header_t *header;            ompi_osc_pt2pt_sendreq_t *sendreq;            /* get our header and payload */            header = (ompi_osc_pt2pt_reply_header_t*)                 buffer;            payload = (void*) (header + 1);#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header);            }#endif            /* get original sendreq pointer */            sendreq = (ompi_osc_pt2pt_sendreq_t*) header->hdr_origin_sendreq.pval;            module = sendreq->req_module;            /* receive data */            ompi_osc_pt2pt_replyreq_recv(module, sendreq, header, payload);        }        break;    case OMPI_OSC_PT2PT_HDR_POST:        {            ompi_osc_pt2pt_control_header_t *header =                 (ompi_osc_pt2pt_control_header_t*)                 buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);            }#endif            assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));            OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs), -1);        }        break;    case OMPI_OSC_PT2PT_HDR_COMPLETE:        {            ompi_osc_pt2pt_control_header_t *header =                 (ompi_osc_pt2pt_control_header_t*)                 buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);            }#endif            assert(module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx));            /* we've heard from one more place, and have value reqs to               process */            OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs), -1);            OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]);        }        break;    case OMPI_OSC_PT2PT_HDR_LOCK_REQ:        {            ompi_osc_pt2pt_control_header_t *header =                 (ompi_osc_pt2pt_control_header_t*)                 buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);            }#endif            assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));            if (header->hdr_value[1] > 0) {                ompi_osc_pt2pt_passive_lock(module, header->hdr_value[0],                                             header->hdr_value[1]);            } else {                OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1);            }        }        break;    case OMPI_OSC_PT2PT_HDR_UNLOCK_REQ:        {            ompi_osc_pt2pt_control_header_t *header =                 (ompi_osc_pt2pt_control_header_t*)                 buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);            }#endif            assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));            ompi_osc_pt2pt_passive_unlock(module, header->hdr_value[0],                                          header->hdr_value[1]);        }        break;    case OMPI_OSC_PT2PT_HDR_UNLOCK_REPLY:        {            ompi_osc_pt2pt_control_header_t *header =                 (ompi_osc_pt2pt_control_header_t*)                 buffer;#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT            if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {                OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);            }#endif            assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));            OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1);        }        break;    default:        opal_output_verbose(5, ompi_osc_base_output,                            "received packet for Window with unknown type");   }    item = &(pt2pt_buffer->super);    OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,                          item);}intompi_osc_pt2pt_request_test(ompi_request_t ** rptr,                            int *completed,                            ompi_status_public_t * status ){    ompi_request_t *request = *rptr;    int ret = OMPI_SUCCESS;#if OMPI_ENABLE_PROGRESS_THREADS == 0    if (request->req_state == OMPI_REQUEST_INACTIVE ||        request->req_complete) {        ret = ompi_request_test(rptr, completed, status);    } else {        *completed = 0;    }#else    ret = ompi_request_test(rptr, completed, status);#endif    return ret;}intompi_osc_pt2pt_progress(void){    int ret, done, count = 0;    void *node;    uint32_t key;    ompi_osc_pt2pt_module_t *module;    opal_list_item_t *item;    ret = opal_hash_table_get_first_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules,                                               &key,                                               (void**) &module,                                               &node);    if (OMPI_SUCCESS != ret) return 0;    do {        /* loop through pending requests */        for (item = opal_list_get_first(&module->p2p_pending_control_sends) ;             item != opal_list_get_end(&module->p2p_pending_control_sends) ;             item = opal_list_get_next(item)) {            ompi_osc_pt2pt_buffer_t *buffer =                 (ompi_osc_pt2pt_buffer_t*) item;            ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &buffer->status);            if (OMPI_SUCCESS == ret && done) {                item = opal_list_remove_item(&module->p2p_pending_control_sends,                                              item);                buffer->cbfunc(buffer);                /* it's possible that cbfunc is going to do something                   that calls progress, which means our loop is                   probably hosed up because it's possible that the                   list changed under us.  It's either exit the loop                   through the list or start all over again.  I'm                   going with exit. */                break;            }        }    } while (OMPI_SUCCESS ==             opal_hash_table_get_next_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules,                                                 &key,                                                 (void**) &module,                                                 node,                                                 &node));    return count;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?