ibu.vapi.c
来自「mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环」· C语言 代码 · 共 1,542 行 · 第 1/4 页
C
1,542 行
#ifdef MPICH_DBG_OUTPUT MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t*)ack_pkt);#endif mpi_errno = ibu_write(ibu, ack_pkt, (int)sizeof(MPIDI_CH3_Pkt_t), &num_bytes); /* Mellanox - write with special ack pkt header */ if (mpi_errno != MPI_SUCCESS || num_bytes != sizeof(MPIDI_CH3_Pkt_t)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_ACK_WRITE); return mpi_errno; } MPIU_DBG_PRINTF(("exiting ibui_post_ack_write\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_ACK_WRITE); return mpi_errno;}/* ibu functions *//* Mellanox August 25th*/#undef FUNCNAME#define FUNCNAME ibui_post_rndv_cts_iov_reg_err#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_post_rndv_cts_iov_reg_err(ibu_t ibu, MPID_Request * rreq){ int mpi_errno = MPI_SUCCESS; int num_bytes; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_rndv_reg_error_t* const cts_iov_reg_err_pkt = &upkt.rndv_reg_error; MPIDI_STATE_DECL(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR); MPIDI_FUNC_ENTER(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR); MPIU_DBG_PRINTF(("entering ibui_post_rndv_cts_iov_reg_err\n")); cts_iov_reg_err_pkt->iov_len = 0; cts_iov_reg_err_pkt->type = MPIDI_CH3_PKT_RNDV_CTS_IOV_REG_ERR; cts_iov_reg_err_pkt->sreq = rreq->dev.sender_req_id; cts_iov_reg_err_pkt->rreq = rreq->handle;#ifdef MPICH_DBG_OUTPUT MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t*)cts_iov_reg_err_pkt);#endif mpi_errno = ibu_write(ibu, cts_iov_reg_err_pkt, (int)sizeof(MPIDI_CH3_Pkt_t), &num_bytes); /* Mellanox - write with special cts registration errorr pkt header */ if (mpi_errno != MPI_SUCCESS || num_bytes != sizeof(MPIDI_CH3_Pkt_t)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR); return mpi_errno; } MPIU_DBG_PRINTF(("exiting ibui_post_rndv_cts_iov_reg_err\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR); return mpi_errno;}/* Mellanox July 12th send_wqe_info_fifo_empty - check if fifo is emptry or not:return TRUE if empty, FALSE otherwise*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_empty#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int send_wqe_info_fifo_empty(int head, int tail){ if (head == tail) { return TRUE; } return FALSE;}/* Mellanox July 12th send_wqe_info_fifo_pop - pop entry to send_wqe_fifo:Update signaled_wqes counter, and advance tail*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_pop#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void send_wqe_info_fifo_pop(ibu_t ibu, ibui_send_wqe_info_t* entry){ int head, tail; head = ibu->send_wqe_info_fifo.head; tail = ibu->send_wqe_info_fifo.tail; if (!send_wqe_info_fifo_empty(head, tail)) { *entry = ibu->send_wqe_info_fifo.entries[tail]; if (entry->RDMA_type == IBU_RDMA_EAGER_SIGNALED || entry->RDMA_type == IBU_RDMA_RDNV_SIGNALED) { ibu->send_wqe_info_fifo.num_of_signaled_wqes--; IBU_Process.num_send_cqe--; } } else { entry = NULL; } ibu->send_wqe_info_fifo.tail = (tail + 1) % IBU_DEFAULT_MAX_WQE;}/* Mellanox July 12th check if send_wqe_fifo if full and cannot be pushed into:return TRUE if full, FALSE otherwise*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_full#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int send_wqe_info_fifo_full(int head, int tail){ if (((head + 1) % IBU_DEFAULT_MAX_WQE) == tail) { return TRUE; } return FALSE;}/* Mellanox July 12th push entry to send_wqe_fifo:Update RDMA type, signaled_wqes counter, length, mem_ptr and advance header*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_push#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void send_wqe_info_fifo_push(ibu_t ibu, ibu_rdma_type_t entry_type, void* mem_ptr, int length){ int head,tail; head = ibu->send_wqe_info_fifo.head; tail = ibu->send_wqe_info_fifo.tail; if (!send_wqe_info_fifo_full(head,tail)) { ibu->send_wqe_info_fifo.entries[head].RDMA_type = entry_type; if (entry_type == IBU_RDMA_EAGER_SIGNALED || entry_type == IBU_RDMA_RDNV_SIGNALED ) { ibu->send_wqe_info_fifo.num_of_signaled_wqes++; IBU_Process.num_send_cqe++; } ibu->send_wqe_info_fifo.entries[head].length = length; ibu->send_wqe_info_fifo.entries[head].mem_ptr = mem_ptr; ibu->send_wqe_info_fifo.head = (head + 1) % IBU_DEFAULT_MAX_WQE; }}/* Mellanox July 11th Given the next posted index, determines weather this description postingis singnalled or not.returns signal bit according to VAPI definitions.*/#undef FUNCNAME#define FUNCNAME ibui_signaled_completion#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_signaled_completion(ibu_t ibu){ int signaled = VAPI_UNSIGNALED; if (ibu->send_wqe_info_fifo.head % (IBU_PACKET_COUNT >> 1) == 0) { signaled = VAPI_SIGNALED; } return signaled;}/* Mellanox END ibui_signaled_completion July 11th*/#undef FUNCNAME#define FUNCNAME ibu_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_write(ibu_t ibu, void *buf, int len, int *num_bytes_ptr){ VAPI_ret_t status; VAPI_sg_lst_entry_t data; VAPI_sr_desc_t work_req; void* mem_ptr; unsigned int length, msg_size; int signaled_type = VAPI_UNSIGNALED; int total = 0; ibu_work_id_handle_t *id_ptr = NULL; ibu_rdma_type_t entry_type; MPIDI_STATE_DECL(MPID_STATE_IBU_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WRITE); MPIU_DBG_PRINTF(("entering ibu_write\n")); while (len) { length = min(len, IBU_EAGER_PACKET_SIZE); len -= length; if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head) + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS) < 2) { /* Mellanox - check if packet is update limit packet. if not - return and enqueue */ if (((MPIDI_CH3_Pkt_t*)buf)->type != MPIDI_CH3_PKT_LMT_UPT) { /*printf("ibu_write: no remote packets available\n");fflush(stdout);*/ MPIDI_DBG_PRINTF((60, FCNAME, "no more remote packets available")); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .", ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head - limit = %d ", ((ibu->remote_RDMA_head - ibu->remote_RDMA_limit + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS))); *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS; } MPIDI_DBG_PRINTF((60, FCNAME, "Going to send update limit pkt")); /* Mellanox - check for update limit packet if sending is available*/ if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head) + IBU_NUM_OF_RDMA_BUFS ) % IBU_NUM_OF_RDMA_BUFS) < 1) { /* No send is available. Pretend as if sent and ignore ibu_write request */ *num_bytes_ptr = len; MPIDI_DBG_PRINTF((60, FCNAME, "update limit is not available. exiting ibu_write")); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .", ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head - limit = %d ", ((ibu->remote_RDMA_head - ibu->remote_RDMA_limit + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS))); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS; } } MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .",ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); mem_ptr = ibuBlockAllocIB(ibu->allocator); MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockAllocIB returned address %p\n",mem_ptr)); if (mem_ptr == NULL) { MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockAllocIB returned NULL\n")); *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS; } /* Mellanox - adding the RDMA header */ if (((MPIDI_CH3_Pkt_t*)buf)->type == MPIDI_CH3_PKT_LMT_UPT) { length = 0; /* Only footer will be sent, no other body message */ } msg_size = length + sizeof(ibu_rdma_buf_footer_t); /* Added size of additional header*/ /*((ibu_rdma_buf_t*)mem_ptr)->footer.cur_offset = 0;*/ ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_VALID_RDMA_BUF; ((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit = ((ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) - 1) % IBU_NUM_OF_RDMA_BUFS; /* Piggybacked update of remote Q state */ ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size = msg_size; /* Mellanox - END adding the RDMA header */ memcpy(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size), buf, length); total += length; data.len = msg_size; /*Mellanox descriptor holds additional header */ /* Data.addr points to Beginning of original buffer */ data.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size)); data.lkey = GETLKEY(mem_ptr); /*MPIU_Assert(data.lkey == s_lkey);*/ work_req.opcode = VAPI_RDMA_WRITE; /* Mellanox dafna changed from SEND July 11th*/ signaled_type = ibui_signaled_completion(ibu); /* Mellanox dafna July 11th*/ work_req.comp_type = signaled_type; work_req.sg_lst_p = &data; work_req.sg_lst_len = 1; work_req.imm_data = 0; work_req.fence = 0; work_req.remote_ah = 0; work_req.remote_qp = 0; work_req.remote_qkey = 0; work_req.ethertype = 0; work_req.eecn = 0; work_req.set_se = 0; work_req.remote_addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(ibu->remote_RDMA_buf_base + (ibu->remote_RDMA_head + 1)); work_req.remote_addr -= msg_size; work_req.r_key = ibu->remote_RDMA_buf_hndl.rkey; work_req.compare_add = 0; work_req.swap = 0; MPIU_DBG_PRINTF((" work_req remote_addr = %p \n",work_req.remote_addr )); /* Mellanox Allocate id_ptr only if wqe is signaled */ if (signaled_type == VAPI_SIGNALED) { id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator); *((ibu_work_id_handle_t**)&work_req.id) = id_ptr; if (id_ptr == NULL) { MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlocAlloc returned NULL")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_FAIL; } id_ptr->ibu = ibu; id_ptr->mem = (void*)mem_ptr; id_ptr->length = msg_size; } /*sanity_check_send(&work_req);*/ if (msg_size < ibu->max_inline_size) { MPIDI_DBG_PRINTF((60, FCNAME, "calling EVAPI_post_inline_sr(%d bytes)", msg_size)); status = EVAPI_post_inline_sr( IBU_Process.hca_handle, ibu->qp_handle, &work_req); } else { MPIDI_DBG_PRINTF((60, FCNAME, "calling VAPI_post_sr(%d bytes)", msg_size)); status = VAPI_post_sr( IBU_Process.hca_handle, ibu->qp_handle, &work_req); } if (status != VAPI_OK) { /* Free id_ptr if was signaled and VAPI posting failed */ if (signaled_type == VAPI_SIGNALED) { ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr); } MPIU_Internal_error_printf("%s: Error: failed to post ib send, status = %s\n", FCNAME, VAPI_strerror(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_FAIL; } /* Mellanox push entry to send_wqe_fifo */ entry_type = (signaled_type == VAPI_SIGNALED)? IBU_RDMA_EAGER_SIGNALED : IBU_RDMA_EAGER_UNSIGNALED; send_wqe_info_fifo_push(ibu, entry_type, mem_ptr, msg_size); /* Mellanox update head of remote RDMA buffer to write to */ ibu->remote_RDMA_head = (ibu->remote_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS; /* Mellanox update remote RDMA limit to what was sent in the packet */ ibu->local_last_updated_RDMA_limit = (( ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS )- 1) % IBU_NUM_OF_RDMA_BUFS; /* Mellanox change of print. use remote_RDMA_head/remote_RMDA_limit instead of nAvailRemote use new local RDMA limit for nUnacked */ MPIU_DBG_PRINTF(("send posted, nAvailRemote: %d, local_last_updated_RDMA_limit: %d\n", (ibu->remote_RDMA_limit - ibu->remote_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS, ((ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS)- 1) % IBU_NUM_OF_RDMA_BUFS)); buf = (char*)buf + length; } *num_bytes_ptr = total; MPIU_DBG_PRINTF(("exiting ibu_write\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_writev(ibu_t ibu, MPID_IOV *iov, int n, int *num_bytes_ptr){ VAPI_ret_t status; VAPI_sg_lst_entry_t data; VAPI_sr_desc_t work_req; void* mem_ptr; unsigned int len, msg_size; int cur_msg_total, total = 0; unsigned int num_avail; unsigned char *buf; int cur_index, msg_calc_cur_index; /* Mellanox added */ unsigned int cur_len, msg_calc_cur_len; char *cur_buf; int signaled_type = VAPI_UNSIGNALED; ibu_work_id_handle_t *id_ptr = NULL; ibu_rdma_type_t entry_type; MPIDI_STATE_DECL(MPID_STATE_IBU_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WRITEV); MPIU_DBG_PRINTF(("entering ibu_writev\n")); cur_index = 0; msg_calc_cur_index = 0; cur_len = iov[cur_index].MPID_IOV_LEN; cur_buf = iov[cur_index].MPID_IOV_BUF; msg_calc_cur_len = iov[cur_index].MPID_IOV_LEN; do { cur_msg_total = 0; if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head) + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS) < 2) { /*printf("ibu_writev: no remote packets available\n");fflush(stdout);*/
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?