btl_udapl_endpoint.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 1,319 行 · 第 1/4 页
C
1,319 行
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006 Sandia National Laboratories. All rights * reserved. * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. * * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include <sys/time.h>#include <time.h>#include "ompi/types.h"#include "opal/util/show_help.h"#include "orte/mca/ns/base/base.h"#include "orte/mca/oob/base/base.h"#include "orte/mca/rml/rml.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/dss/dss.h"#include "orte/class/orte_pointer_array.h"#include "ompi/class/ompi_free_list.h"#include "ompi/mca/mpool/rdma/mpool_rdma.h"#include "ompi/mca/btl/base/btl_base_error.h"#include "btl_udapl.h"#include "btl_udapl_endpoint.h"#include "btl_udapl_frag.h"#include "btl_udapl_mca.h"#include "btl_udapl_proc.h"static void mca_btl_udapl_endpoint_send_cb(int status, orte_process_name_t* endpoint, orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata);static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint);static int mca_btl_udapl_endpoint_post_recv(mca_btl_udapl_endpoint_t* endpoint, size_t size);void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint);void mca_btl_udapl_endpoint_recv(int status, orte_process_name_t* endpoint, orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata);static int mca_btl_udapl_endpoint_finish_eager(mca_btl_udapl_endpoint_t*);static int mca_btl_udapl_endpoint_finish_max(mca_btl_udapl_endpoint_t*);static void mca_btl_udapl_endpoint_connect_eager_rdma(mca_btl_udapl_endpoint_t* endpoint);static int mca_btl_udapl_endpoint_write_eager(mca_btl_base_endpoint_t* endpoint, mca_btl_udapl_frag_t* frag);static void mca_btl_udapl_endpoint_control_send_cb(mca_btl_base_module_t* btl, mca_btl_base_endpoint_t* endpoint, mca_btl_base_descriptor_t* descriptor, int status);static int mca_btl_udapl_endpoint_send_eager_rdma(mca_btl_base_endpoint_t* endpoint);/* * Write a fragment * * @param endpoint (IN) BTL addressing information * @param frag (IN) Fragment to be transferred * * @return OMPI_SUCCESS or OMPI_ERROR */int mca_btl_udapl_endpoint_write_eager(mca_btl_base_endpoint_t* endpoint, mca_btl_udapl_frag_t* frag){ DAT_DTO_COOKIE cookie; mca_btl_udapl_frag_eager_rdma_t* remote_frag; char* remote_buf; DAT_RMR_TRIPLET remote_buffer; int rc = OMPI_SUCCESS; int pad = 0; uint8_t head = endpoint->endpoint_eager_rdma_remote.head; /* now that we have the head update it */ MCA_BTL_UDAPL_RDMA_NEXT_INDEX(endpoint->endpoint_eager_rdma_remote.head); MCA_BTL_UDAPL_FRAG_CALC_ALIGNMENT_PAD(pad, (frag->segment.seg_len + sizeof(mca_btl_udapl_footer_t))); /* set the rdma footer information */ frag->rdma_ftr = (mca_btl_udapl_rdma_footer_t *) ((char *)frag->segment.seg_addr.pval + frag->segment.seg_len + sizeof(mca_btl_udapl_footer_t) + pad); frag->rdma_ftr->active = 1; frag->rdma_ftr->size = frag->segment.seg_len; /* this is size PML wants; * will have to calc * alignment * at the other end */ /* find remote fragment to be used */ remote_frag = (mca_btl_udapl_frag_eager_rdma_t *) ((char *)(endpoint->endpoint_eager_rdma_remote.base.pval) + (head * mca_btl_udapl_component.udapl_eager_rdma_frag_size)); /* prep the fragment to be written out */ frag->type = MCA_BTL_UDAPL_RDMA_WRITE; frag->triplet.segment_length = frag->segment.seg_len + sizeof(mca_btl_udapl_footer_t) + pad + sizeof(mca_btl_udapl_rdma_footer_t); /* set remote_buf to start of the remote write location; * compute by first finding the end of the entire fragment * and then working way back */ remote_buf = (char *)remote_frag + (sizeof(mca_btl_udapl_frag_eager_rdma_t) + frag->size) - frag->triplet.segment_length; /* execute transfer with one contiguous write */ /* establish remote memory region */ remote_buffer.rmr_context = (DAT_RMR_CONTEXT)endpoint->endpoint_eager_rdma_remote.rkey; remote_buffer.target_address = (DAT_VADDR)remote_buf; remote_buffer.segment_length = frag->triplet.segment_length; /* write the data out */ cookie.as_ptr = frag; rc = dat_ep_post_rdma_write(endpoint->endpoint_eager, 1, &(frag->triplet), cookie, &remote_buffer, DAT_COMPLETION_DEFAULT_FLAG); if(DAT_SUCCESS != rc) { char* major; char* minor; dat_strerror(rc, (const char**)&major, (const char**)&minor); BTL_ERROR(("ERROR: %s %s %s\n", "dat_ep_post_rdma_write", major, minor)); return OMPI_ERROR; } return rc;}int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, mca_btl_udapl_frag_t* frag){ int rc = OMPI_SUCCESS; DAT_RETURN dat_rc; DAT_DTO_COOKIE cookie; bool call_progress = false; /* Fix up the segment length before we do anything with the frag */ frag->triplet.segment_length = frag->segment.seg_len + sizeof(mca_btl_udapl_footer_t); OPAL_THREAD_LOCK(&endpoint->endpoint_lock); switch(endpoint->endpoint_state) { case MCA_BTL_UDAPL_CONNECTED: /* just send it already.. */ if(frag->size == mca_btl_udapl_component.udapl_eager_frag_size) { if(OPAL_THREAD_ADD32(&endpoint->endpoint_eager_rdma_remote.tokens, -1) < 0) { /* no rdma segment available so either send or queue */ OPAL_THREAD_ADD32(&endpoint->endpoint_eager_rdma_remote.tokens, 1); if(OPAL_THREAD_ADD32(&endpoint->endpoint_sr_tokens[BTL_UDAPL_EAGER_CONNECTION], -1) < 0) { OPAL_THREAD_ADD32(&endpoint->endpoint_sr_tokens[BTL_UDAPL_EAGER_CONNECTION], 1); opal_list_append(&endpoint->endpoint_eager_frags, (opal_list_item_t*)frag); call_progress = true; } else { cookie.as_ptr = frag; dat_rc = dat_ep_post_send(endpoint->endpoint_eager, 1, &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); if(DAT_SUCCESS != dat_rc) { char* major; char* minor; dat_strerror(rc, (const char**)&major, (const char**)&minor); BTL_ERROR(("ERROR: %s %s %s\n", "dat_ep_post_send", major, minor)); endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; rc = OMPI_ERROR; } } } else { rc = mca_btl_udapl_endpoint_write_eager(endpoint, frag); } } else { assert(frag->size == mca_btl_udapl_component.udapl_max_frag_size); if(OPAL_THREAD_ADD32(&endpoint->endpoint_sr_tokens[BTL_UDAPL_MAX_CONNECTION], -1) < 0) { OPAL_THREAD_ADD32(&endpoint->endpoint_sr_tokens[BTL_UDAPL_MAX_CONNECTION], 1); opal_list_append(&endpoint->endpoint_max_frags, (opal_list_item_t*)frag); call_progress = true; } else { cookie.as_ptr = frag; dat_rc = dat_ep_post_send(endpoint->endpoint_max, 1, &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); if(DAT_SUCCESS != dat_rc) { char* major; char* minor; dat_strerror(rc, (const char**)&major, (const char**)&minor); BTL_ERROR(("ERROR: %s %s %s\n", "dat_ep_post_send", major, minor)); rc = OMPI_ERROR; } } } break; case MCA_BTL_UDAPL_CLOSED: /* Initiate a new connection, add this send to a queue */ rc = mca_btl_udapl_start_connect(endpoint); if(OMPI_SUCCESS != rc) { endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; break; } /* Fall through on purpose to queue the send */ case MCA_BTL_UDAPL_CONN_EAGER: case MCA_BTL_UDAPL_CONN_MAX: /* Add this send to a queue */ if(frag->size == mca_btl_udapl_component.udapl_eager_frag_size) { opal_list_append(&endpoint->endpoint_eager_frags, (opal_list_item_t*)frag); } else { assert(frag->size == mca_btl_udapl_component.udapl_max_frag_size); OPAL_THREAD_ADD32(&endpoint->endpoint_max_sends, -1); opal_list_append(&endpoint->endpoint_max_frags, (opal_list_item_t*)frag); } break; case MCA_BTL_UDAPL_FAILED: rc = OMPI_ERR_UNREACH; break; } OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); if(call_progress) opal_progress(); return rc;}static void mca_btl_udapl_endpoint_send_cb(int status, orte_process_name_t* endpoint, orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata){ OBJ_RELEASE(buffer);}/* * Set uDAPL endpoint parameters as required in ep_param. Accomplished * by retrieving the default set of parameters from temporary (dummy) * endpoint and then setting any other parameters as required by * this BTL. * * @param btl (IN) BTL module * @param ep_param (IN/OUT)Pointer to a valid endpoint parameter location * * @return OMPI_SUCCESS or error status on failure */int mca_btl_udapl_endpoint_get_params(mca_btl_udapl_module_t* btl, DAT_EP_PARAM* ep_param){ int rc = OMPI_SUCCESS; int request_dtos; int max_control_messages; DAT_EP_HANDLE dummy_ep; DAT_EP_ATTR* ep_attr = &((*ep_param).ep_attr); /* open dummy endpoint, used to find default endpoint parameters */ rc = dat_ep_create(btl->udapl_ia, btl->udapl_pz, btl->udapl_evd_dto, btl->udapl_evd_dto, btl->udapl_evd_conn, NULL, &dummy_ep); if (rc != DAT_SUCCESS) { char* major; char* minor; dat_strerror(rc, (const char**)&major, (const char**)&minor); BTL_ERROR(("ERROR: %s %s %s\n", "dat_ep_create", major, minor)); /* this could be recoverable, by just using defaults */ ep_attr = NULL; return OMPI_ERROR; } rc = dat_ep_query(dummy_ep, DAT_EP_FIELD_ALL, ep_param); if (rc != DAT_SUCCESS) { char* major; char* minor; dat_strerror(rc, (const char**)&major, (const char**)&minor);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?