ibu.vapi.c
来自「mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环」· C语言 代码 · 共 1,542 行 · 第 1/4 页
C
1,542 行
*num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "no more remote packets available.")); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .", ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head - limit = %d ", ((ibu->remote_RDMA_head - ibu->remote_RDMA_limit + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS))); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_SUCCESS; } MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .",ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); mem_ptr = ibuBlockAllocIB(ibu->allocator); if (mem_ptr == NULL) { MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockAlloc returned NULL.")); *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_SUCCESS; } /*MPIU_DBG_PRINTF(("iov length: %d\n, n));*/ if (2 == n && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < IBU_EAGER_PACKET_SIZE) { total = iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN; msg_size = total + sizeof(ibu_rdma_buf_footer_t); buf = ((ibu_rdma_buf_t *)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size); memcpy(buf,iov[0].MPID_IOV_BUF,iov[0].MPID_IOV_LEN); memcpy(buf+iov[0].MPID_IOV_LEN,iov[1].MPID_IOV_BUF,iov[1].MPID_IOV_LEN); cur_index =n; } else { num_avail = IBU_EAGER_PACKET_SIZE; for (; msg_calc_cur_index < n && num_avail; ) { len = min (num_avail, msg_calc_cur_len); num_avail -= len; cur_msg_total += len; MPIU_DBG_PRINTF((" Cur index IOV[%d] - Adding 0x%x to msg_size. Total is 0x%x \n",msg_calc_cur_index, len, total)); if (msg_calc_cur_len == len) { msg_calc_cur_index++; msg_calc_cur_len = iov[msg_calc_cur_index].MPID_IOV_LEN; } else { msg_calc_cur_len -= len; } } msg_size = cur_msg_total + sizeof(ibu_rdma_buf_footer_t); /* set buf pointer to where data should start . cpy_index will be equal to cur_index after loop*/ buf = ((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size); num_avail = IBU_EAGER_PACKET_SIZE; for (; cur_index < n && num_avail; ) { len = min (num_avail, cur_len); num_avail -= len; total += len; memcpy(buf, cur_buf, len); buf += len; if (cur_len == len) { cur_index++; cur_len = iov[cur_index].MPID_IOV_LEN; cur_buf = iov[cur_index].MPID_IOV_BUF; } else { cur_len -= len; cur_buf += len; } } } /*((ibu_rdma_buf_t*)mem_ptr)->footer.cur_offset = 0;*/ ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_VALID_RDMA_BUF; ((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit = (( ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) - 1) % IBU_NUM_OF_RDMA_BUFS; /* Piggybacked update of remote Q state */ ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size = msg_size; /* Already added size of additional header*/ /* Mellanox END copying data */ data.len = msg_size; MPIU_Assert(data.len); /* Data.addr points to Beginning of original buffer */ data.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size)); data.lkey = GETLKEY(mem_ptr); /*MPIU_Assert(data.lkey == s_lkey);*/ work_req.opcode = VAPI_RDMA_WRITE; /* Mellanox dafna changed from SEND July 11th*/ signaled_type = ibui_signaled_completion(ibu); /* Mellanox dafna July 11th*/ work_req.comp_type = signaled_type; work_req.sg_lst_p = &data; work_req.sg_lst_len = 1; work_req.imm_data = 0; work_req.fence = 0; work_req.remote_ah = 0; work_req.remote_qp = 0; work_req.remote_qkey = 0; work_req.ethertype = 0; work_req.eecn = 0; work_req.set_se = 0; work_req.remote_addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(ibu->remote_RDMA_buf_base + (ibu->remote_RDMA_head + 1)); work_req.remote_addr -= msg_size; work_req.r_key = ibu->remote_RDMA_buf_hndl.rkey; work_req.compare_add = 0; work_req.swap = 0; MPIU_DBG_PRINTF((" work_req remote_addr = %p \n",work_req.remote_addr )); /* Mellanox Allocate id_ptr only if wqe is signaled */ if (signaled_type == VAPI_SIGNALED) { id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator); *((ibu_work_id_handle_t**)&work_req.id) = id_ptr; if (id_ptr == NULL) { MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlocAlloc returned NULL")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); MPIU_Assert(0); return IBU_FAIL; } id_ptr->ibu = ibu; id_ptr->mem = (void*)mem_ptr; id_ptr->length = msg_size; } /*sanity_check_send(&work_req);*/ if (msg_size < (unsigned int)ibu->max_inline_size) { MPIDI_DBG_PRINTF((60, FCNAME, "EVAPI_post_inline_sr(%d bytes)", msg_size)); status = EVAPI_post_inline_sr( IBU_Process.hca_handle, ibu->qp_handle, &work_req); } else { MPIDI_DBG_PRINTF((60, FCNAME, "VAPI_post_sr(%d bytes)", msg_size)); status = VAPI_post_sr( IBU_Process.hca_handle, ibu->qp_handle, &work_req); } if (status != VAPI_OK) { /* Free id_ptr if was signaled and VAPI posting failed */ if (signaled_type == VAPI_SIGNALED) { ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr); } MPIU_Internal_error_printf("%s: Error: failed to post ib send, status = %s\n", FCNAME, VAPI_strerror(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_FAIL; } /* Mellanox push entry to send_wqe_fifo */ entry_type = (signaled_type == VAPI_SIGNALED)? IBU_RDMA_EAGER_SIGNALED : IBU_RDMA_EAGER_UNSIGNALED; send_wqe_info_fifo_push(ibu, entry_type , mem_ptr, msg_size); /* Mellanox update head of remote RDMA buffer to write to */ ibu->remote_RDMA_head = (ibu->remote_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS; /* Mellanox update remote RDMA limit to what was sent in the packet */ ibu->local_last_updated_RDMA_limit = (( ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS )- 1) % IBU_NUM_OF_RDMA_BUFS; /* Mellanox change of print. use remote_RDMA_head/remote_RMDA_limit instead of nAvailRemote use new local RDMA limit for nUnacked */ MPIU_DBG_PRINTF(("send posted, nAvailRemote: %d, local_last_updated_RDMA_limit: %d\n", (ibu->remote_RDMA_limit - ibu->remote_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS, ((ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS)- 1) % IBU_NUM_OF_RDMA_BUFS)); } while (cur_index < n); *num_bytes_ptr = total; MPIU_DBG_PRINTF(("exiting ibu_writev\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_SUCCESS;}#ifdef HAVE_WINDOWS_H#ifdef USE_DEBUG_ALLOCATION_HOOKint __cdecl ibu_allocation_hook(int nAllocType, void * pvData, size_t nSize, int nBlockUse, long lRequest, const unsigned char * szFileName, int nLine) { /*nBlockUse = _FREE_BLOCK, _NORMAL_BLOCK, _CRT_BLOCK, _IGNORE_BLOCK, _CLIENT_BLOCK */ if ( nBlockUse == _CRT_BLOCK ) /* Ignore internal C runtime library allocations */ return( TRUE ); /*nAllocType = _HOOK_ALLOC, _HOOK_REALLOC, _HOOK_FREE */ if (nAllocType == _HOOK_FREE) { /* remove from cache */ if ( pvData != NULL ) { ibu_invalidate_memory(pvData, nSize); } } return( TRUE ); /* Allow the memory operation to proceed */}#endif#endif#undef FUNCNAME#define FUNCNAME ibui_get_list_of_hcas#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static VAPI_ret_t ibui_get_list_of_hcas(VAPI_hca_id_t **hca_id_buf_p, u_int32_t *num_of_hcas){ VAPI_hca_id_t *hca_id_buf; VAPI_ret_t rc; u_int32_t local_num_of_hcas; MPIDI_STATE_DECL(MPID_STATE_IBUI_GET_LIST_OF_HCAS); MPIDI_FUNC_ENTER(MPID_STATE_IBUI_GET_LIST_OF_HCAS); MPIU_DBG_PRINTF(("entering ibui_get_list_of_hcas\n")); *hca_id_buf_p = NULL; rc = EVAPI_list_hcas(0, &local_num_of_hcas, NULL); switch ( rc ) { case VAPI_OK: *num_of_hcas = local_num_of_hcas; MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_LIST_OF_HCAS); return VAPI_OK; case VAPI_EAGAIN: hca_id_buf = MPIU_Malloc(sizeof(VAPI_hca_id_t)*local_num_of_hcas); if ( !hca_id_buf ) { MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_LIST_OF_HCAS); return VAPI_ERR; } rc = EVAPI_list_hcas(local_num_of_hcas, &local_num_of_hcas, hca_id_buf); if ( rc != VAPI_OK ) { MPIU_Free(hca_id_buf); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_LIST_OF_HCAS); return VAPI_ERR; } *num_of_hcas = local_num_of_hcas; *hca_id_buf_p = hca_id_buf; MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_LIST_OF_HCAS); return VAPI_OK; } MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_LIST_OF_HCAS); return VAPI_ERR;}/* For the style checker: allowing one instance of malloc and free (instead of MPIU_Malloc and MPIU_Free) below because they are being passed to a function *//* style: allow:malloc:1 sig:0 *//* style: allow:free:1 sig:0 */#undef FUNCNAME#define FUNCNAME ibu_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_init(){ VAPI_ret_t status; VAPI_hca_id_t id = "blablabla"; VAPI_hca_id_t *hca_id_buf; u_int32_t num_of_hcas; VAPI_hca_vendor_t vendor; VAPI_hca_cap_t hca_cap; MPIDI_STATE_DECL(MPID_STATE_IBU_INIT); MPIDI_FUNC_ENTER(MPID_STATE_IBU_INIT); MPIU_DBG_PRINTF(("entering ibu_init\n")); /* FIXME: This is a temporary solution to prevent cached pointers from pointing to old physical memory pages. A better solution might be to add a user hook to free() to remove cached pointers at that time. */#ifdef MPIDI_CH3_CHANNEL_RNDV /* taken from the OSU mvapich source: */ /* Set glibc/stdlib malloc options to prevent handing * memory back to the system (brk) upon free. * Also, dont allow MMAP memory for large allocations. */#ifdef M_TRIM_THRESHOLD mallopt(M_TRIM_THRESHOLD, -1);#endif#ifdef M_MMAP_MAX mallopt(M_MMAP_MAX, 0);#endif /* End of OSU code */#ifdef HAVE_WINDOWS_H#ifdef USE_DEBUG_ALLOCATION_HOOK _CrtSetAllocHook(ibu_allocation_hook);#endif#endif#endif /* Initialize globals */ IBU_Process.num_send_cqe = 0; status = ibui_get_list_of_hcas(&hca_id_buf, &num_of_hcas); if (status != VAPI_OK || num_of_hcas == 0) { MPIU_Internal_error_printf("ibu_init: ibui_get_list_of_hcas failed, status %s, num_of_hcas: %d\n", VAPI_strerror(status),num_of_hcas); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } MPIU_Strncpy(id, hca_id_buf[0], sizeof(VAPI_hca_id_t)); if ( hca_id_buf ) MPIU_Free(hca_id_buf); status = EVAPI_get_hca_hndl(id, &IBU_Process.hca_handle); if (status != VAPI_OK) { MPIU_Internal_error_printf("ibu_init: EVAPI_get_hca_hndl failed, status %s\n", VAPI_strerror(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } status = VAPI_query_hca_cap(IBU_Process.hca_handle, &vendor, &hca_cap); if (status != VAPI_OK) { MPIU_Internal_error_printf("ibu_init: VAPI_query_hca_cap failed, status %s\n", VAPI_strerror(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } IBU_Process.port = 1; IBU_Process.cq_size = hca_cap.max_num_ent_cq; MPIU_DBG_PRINTF(("cq size: %d\n", IBU_Process.cq_size)); /* get a protection domain handle */ status = VAPI_alloc_pd(IBU_Process.hca_handle, &IBU_Process.pd_handle); if (status != VAPI_OK) { MPIU_Internal_error_printf("ibu_init: VAPI_alloc_pd failed, status %s\n", VAPI_strerror(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } /* get the lid */ status = VAPI_query_hca_port_prop(IBU_Process.hca_handle, (IB_port_t)1, &IBU_Process.hca_port); if (status != VAPI_OK) { MPIU_Internal_error_printf("ibu_init: VAPI_query_hca_port_prop failed, status %s\n", VAPI_strerror(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } IBU_Process.lid = IBU_Process.hca_port.lid; /* MPIU_DBG_PRINTF(("infiniband:\n mtu: %d\n msg_size: %d\n", IBU_Process.attr_p->port_static_info_p->mtu, IBU_Process.attr_p->port_static_info_p->msg_size)); */ /* non infiniband initialization */ IBU_Process.unex_finished_list = NULL; IBU_Process.workAllocator = ibuBlockAllocInit(sizeof(ibu_work_id_handle_t), 256, 256, malloc, free); MPIU_DBG_PRINTF(("exiting ibu_init\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_finalize(){ MPIDI_STATE_DECL(MPID_STATE_IBU_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_FINALIZE); MPIU_DBG_PRINTF(("entering ibu_finalize\n")); ibuBlockAllocFinalize(&IBU_Process.workAllocator); MPIU_DBG_PRINTF(("exiting ibu_finalize\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_FINALIZE); return IBU_SUCCESS;}void FooBar(VAPI_hca_hndl_t hca_handle, VAPI_cq_hndl_t cq_handle, void* p){ MPIU_Error_printf("FooBar\n");
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?