📄 ibu.ibal.c
字号:
MPIDI_FUNC_ENTER(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR); MPIU_DBG_PRINTF(("entering ibui_post_rndv_cts_iov_reg_err\n")); cts_iov_reg_err_pkt->iov_len = 0; cts_iov_reg_err_pkt->type = MPIDI_CH3_PKT_RNDV_CTS_IOV_REG_ERR; cts_iov_reg_err_pkt->sreq = rreq->dev.sender_req_id; cts_iov_reg_err_pkt->rreq = rreq->handle;#ifdef MPICH_DBG_OUTPUT MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t*)cts_iov_reg_err_pkt);#endif mpi_errno = ibu_write(ibu, cts_iov_reg_err_pkt, (int)sizeof(MPIDI_CH3_Pkt_t), &num_bytes); /* Mellanox - write with special cts registration errorr pkt header */ if (mpi_errno != MPI_SUCCESS || num_bytes != sizeof(MPIDI_CH3_Pkt_t)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR); return mpi_errno; } MPIU_DBG_PRINTF(("exiting ibui_post_rndv_cts_iov_reg_err\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR); return mpi_errno;}/* Mellanox July 12th send_wqe_info_fifo_empty - check if fifo is emptry or not:return TRUE if empty, FALSE otherwise*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_empty#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int send_wqe_info_fifo_empty(int head, int tail){ if (head == tail) { return TRUE; } return FALSE;}/* Mellanox July 12th send_wqe_info_fifo_pop - pop entry to send_wqe_fifo:Update signaled_wqes counter, and advance tail*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_pop#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void send_wqe_info_fifo_pop(ibu_t ibu, ibui_send_wqe_info_t* entry){ int head, tail; head = ibu->send_wqe_info_fifo.head; tail = ibu->send_wqe_info_fifo.tail; if (!send_wqe_info_fifo_empty(head, tail)) { *entry = ibu->send_wqe_info_fifo.entries[tail]; if (entry->RDMA_type == IBU_RDMA_EAGER_SIGNALED || entry->RDMA_type == IBU_RDMA_RDNV_SIGNALED) { ibu->send_wqe_info_fifo.num_of_signaled_wqes--; IBU_Process.num_send_cqe--; } } else { entry = NULL; } ibu->send_wqe_info_fifo.tail = (tail + 1) % IBU_DEFAULT_MAX_WQE;}/* Mellanox July 12th check if send_wqe_fifo if full and cannot be pushed into:return TRUE if full, FALSE otherwise*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_full#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int send_wqe_info_fifo_full(int head, int tail){ if (((head + 1) % IBU_DEFAULT_MAX_WQE) == tail) { return TRUE; } return FALSE;}/* Mellanox July 12th push entry to send_wqe_fifo:Update RDMA type, signaled_wqes counter, length, mem_ptr and advance header*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_push#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void send_wqe_info_fifo_push(ibu_t ibu, ibu_rdma_type_t entry_type, void* mem_ptr, int length){ int head,tail; head = ibu->send_wqe_info_fifo.head; tail = ibu->send_wqe_info_fifo.tail; if (!send_wqe_info_fifo_full(head,tail)) { ibu->send_wqe_info_fifo.entries[head].RDMA_type = entry_type; if (entry_type == IBU_RDMA_EAGER_SIGNALED || entry_type == IBU_RDMA_RDNV_SIGNALED ) { ibu->send_wqe_info_fifo.num_of_signaled_wqes++; IBU_Process.num_send_cqe++; } ibu->send_wqe_info_fifo.entries[head].length = length; ibu->send_wqe_info_fifo.entries[head].mem_ptr = mem_ptr; ibu->send_wqe_info_fifo.head = (head + 1) % IBU_DEFAULT_MAX_WQE; }}/* Mellanox July 11th Given the next posted index, determines whether this description postingis singnalled or not.returns signal bit according to API definitions.*/#undef FUNCNAME#define FUNCNAME ibui_signaled_completion#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)ib_send_opt_t ibui_signaled_completion(ibu_t ibu){ int signaled = 0x00000000 /* unsignalled */; if (ibu->send_wqe_info_fifo.head % (IBU_PACKET_COUNT >> 1) == 0) { signaled = IB_SEND_OPT_SIGNALED; } return signaled;}//Mellanox, dafna April 11th finished added funcions#undef FUNCNAME#define FUNCNAME ibu_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_write(ibu_t ibu, void *buf, int len, int *num_bytes_ptr){ ib_api_status_t status; ib_local_ds_t data; ib_send_wr_t work_req; void *mem_ptr; //Mellanox, dafna April 11th added msg_size unsigned int length, msg_size; int signaled_type = 0x00000000; /*UNSIGNALED*/ int total = 0; ibu_work_id_handle_t *id_ptr = NULL; ibu_rdma_type_t entry_type; /* Mellanox, dafna April 11th added entry_type */ MPIDI_STATE_DECL(MPID_STATE_IBU_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WRITE); MPIU_DBG_PRINTF(("entering ibu_write\n")); while (len) { length = min(len, IBU_EAGER_PACKET_SIZE); /*Mellanox, dafna April 11th replaced IBU_PACKET_SIZE*/ len -= length; /* Mellanox, dafna April 11th replaced nAvailRemote with rmote_RDMA_limit */ if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head) + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS) < 2) { /* Mellanox - check if packet is update limit packet. if not - return and enqueue */ if (((MPIDI_CH3_Pkt_t*)buf)->type != MPIDI_CH3_PKT_LMT_UPT) { /*printf("ibu_write: no remote packets available\n");fflush(stdout);*/ MPIDI_DBG_PRINTF((60, FCNAME, "no more remote packets available")); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .", ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head - limit = %d ", ((ibu->remote_RDMA_head - ibu->remote_RDMA_limit + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS))); *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS; } MPIDI_DBG_PRINTF((60, FCNAME, "Going to send update limit pkt")); /* Mellanox - check for update limit packet if sending is available*/ if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head) + IBU_NUM_OF_RDMA_BUFS ) % IBU_NUM_OF_RDMA_BUFS) < 1) { /* No send is available. Pretend as if sent and ignore ibu_write request */ *num_bytes_ptr = len; MPIDI_DBG_PRINTF((60, FCNAME, "update limit is not available. exiting ibu_write")); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .", ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head - limit = %d ", ((ibu->remote_RDMA_head - ibu->remote_RDMA_limit + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS))); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS; } } MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .",ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); mem_ptr = ibuBlockAllocIB(ibu->allocator); MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockAllocIB returned address %p\n",mem_ptr)); if (mem_ptr == NULL) { MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockAllocIB returned NULL\n")); *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS; } /* Mellanox - adding the RDMA header */ if (((MPIDI_CH3_Pkt_t*)buf)->type == MPIDI_CH3_PKT_LMT_UPT) { length = 0; /* Only footer will be sent, no other body message */ } msg_size = length + sizeof(ibu_rdma_buf_footer_t); /* Added size of additional header*/ ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_VALID_RDMA_BUF; ((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit = ((ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) - 1) % IBU_NUM_OF_RDMA_BUFS; /* Piggybacked update of remote Q state */ ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size = msg_size; /* Mellanox - END adding the RDMA header */ memcpy(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size), buf, length); total += length; data.length = msg_size; /*Mellanox descriptor holds additional header */ /*Mellanox, dafna April 11th length*/; /* Data.vaddr points to Beginning of original buffer */ data.vaddr = (uint64_t)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size)) /*Mellanox, dafna April 11th mem_ptr*/; data.lkey = GETLKEY(mem_ptr); work_req.p_next = NULL; work_req.wr_type = WR_RDMA_WRITE /*Mellanox, dafna April 11th SEND*/; signaled_type = ibui_signaled_completion(ibu); work_req.send_opt = signaled_type /*Mellanox, dafna April 11th IB_SEND_OPT_SIGNALED*/; work_req.num_ds = 1; work_req.ds_array = &data; work_req.immediate_data = 0; /* Mellanox, dafna April 11th added remote_ops parameters */ work_req.remote_ops.vaddr = (uint64_t)(ibu->remote_RDMA_buf_base + (ibu->remote_RDMA_head + 1)); work_req.remote_ops.vaddr -= msg_size; work_req.remote_ops.rkey = ibu->remote_RDMA_buf_hndl.rkey; work_req.remote_ops.atomic1 = 0x0; work_req.remote_ops.atomic2 = 0x0; MPIU_DBG_PRINTF((" work_req remote_addr = %p \n",work_req.remote_ops.vaddr )); /* Mellanox, dafna April 11th added signaling type */ /* Allocate id_ptr only if wqe is signaled */ if (signaled_type == IB_SEND_OPT_SIGNALED) { id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator/*Mellanox, dafna April 11th g_workAllocator*/); *((ibu_work_id_handle_t**)&work_req.wr_id) = id_ptr; if (id_ptr == NULL) { MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlocAlloc returned NULL")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_FAIL; } id_ptr->ibu = ibu; id_ptr->mem = (void*)mem_ptr; id_ptr->length = msg_size; /* Mellanox, dafna April 11th added msg_size*/ } if (msg_size < ibu->max_inline_size) { MPIDI_DBG_PRINTF((60, FCNAME, "calling ib_post_inline_sr(%d bytes)", msg_size)); work_req.send_opt |= IB_SEND_OPT_INLINE; status = ib_post_send(ibu->qp_handle, &work_req, NULL); } else { MPIDI_DBG_PRINTF((60, FCNAME, "calling ib_post_send(%d bytes)", length)); status = ib_post_send(ibu->qp_handle, &work_req, NULL); } if (status != IB_SUCCESS) { if (signaled_type == IB_SEND_OPT_SIGNALED) { ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr); } MPIU_Internal_error_printf("%s: Error: failed to post ib send, status = %s\n", FCNAME, ib_get_err_str(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_FAIL; } /* Mellanox push entry to send_wqe_fifo */ entry_type = (signaled_type == IB_SEND_OPT_SIGNALED)? IBU_RDMA_EAGER_SIGNALED : IBU_RDMA_EAGER_UNSIGNALED; send_wqe_info_fifo_push(ibu, entry_type, mem_ptr, msg_size); /* Mellanox update head of remote RDMA buffer to write to */ ibu->remote_RDMA_head = (ibu->remote_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS; /* Mellanox update remote RDMA limit to what was sent in the packet */ ibu->local_last_updated_RDMA_limit = (( ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS )- 1) % IBU_NUM_OF_RDMA_BUFS; /* Mellanox change of print. use remote_RDMA_head/remote_RMDA_limit instead of nAvailRemote use new local RDMA limit for nUnacked */ MPIU_DBG_PRINTF(("send posted, nAvailRemote: %d, local_last_updated_RDMA_limit: %d\n", (ibu->remote_RDMA_limit - ibu->remote_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS, ((ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS)- 1) % IBU_NUM_OF_RDMA_BUFS)); //Mellanox, dafna April 11th removed ibu->nAvailRemote--; buf = (char*)buf + length; } *num_bytes_ptr = total; MPIU_DBG_PRINTF(("exiting ibu_write\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_writev(ibu_t ibu, MPID_IOV *iov, int n, int *num_bytes_ptr){ ib_api_status_t status; ib_local_ds_t data; ib_send_wr_t work_req; void *mem_ptr; unsigned int len, msg_size; int cur_msg_total, total = 0; unsigned int num_avail; unsigned char *buf; int cur_index, msg_calc_cur_index; /* Added by Mellanox, dafna April 11th (msg_calc)*/ unsigned int cur_len, msg_calc_cur_len; /* Added by Mellanox, dafna April 11th (msg_calc)*/ unsigned char *cur_buf; int signaled_type = 0x00000000; /*UNSIGNALED*/ ibu_work_id_handle_t *id_ptr = NULL; ibu_rdma_type_t entry_type; /* Added by Mellanox, dafna April 11th */ MPIDI_STATE_DECL(MPID_STATE_IBU_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WRITEV); MPIU_DBG_PRINTF(("entering ibu_writev\n")); cur_index = 0; msg_calc_cur_index = 0; cur_len = iov[cur_index].MPID_IOV_LEN; cur_buf = iov[cur_index].MPID_IOV_BUF; msg_calc_cur_len = iov[cur_index].MPID_IOV_LEN; do { cur_msg_total = 0; if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head) + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS) < 2) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "no more remote packets available.")); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .", ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head - limit = %d ", ((ibu->remote_RDMA_head - ibu->remote_RDMA_limit + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS))); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_SUCCESS; } MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .",ibu->remote_RDMA_head ,ibu->remote_RDMA_limit)); mem_ptr = ibuBlockAllocIB(ibu->allocator); if (mem_ptr == NULL) { MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockAlloc returned NULL.")); *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_SUCCESS; } /*MPIU_DBG_PRINTF(("iov length: %d\n, n));*/ if (2 == n && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < IBU_EAGER_PACKET_SIZE) { total = iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN; msg_size = total + sizeof(ibu_rdma_buf_footer_t); buf = ((ibu_rdma_buf_t *)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size); memcpy(buf,iov[0].MPID_IOV_BUF,iov[0].MPID_IOV_LEN); memcpy(buf+iov[0].MPID_IOV_LEN,iov[1].MPID_IOV_BUF,iov[1].MPID_IOV_LEN); cur_index =n; } else { num_avail = IBU_EAGER_PACKET_SIZE; for (; msg_calc_cur_index < n && num_avail; ) { len = min (num_avail, msg_calc_cur_len); num_avail -= len; cur_msg_total += len; MPIU_DBG_PRINTF((" Cur index IOV[%d] - Adding 0x%x to msg_size. Total is 0x%x \n",msg_calc_cur_index, len, total)); if (msg_calc_cur_len == len) { msg_calc_cur_index++; msg_calc_cur_len = iov[msg_calc_cur_index].MPID_IOV_LEN; } else { msg_calc_cur_len -= len; } } msg_size = cur_msg_total + sizeof(ibu_rdma_buf_footer_t); /* set buf pointer to where data should start . cpy_index will be equal to cur_index after loop*/ buf = ((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size); num_avail = IBU_EAGER_PACKET_SIZE; for (; cur_index < n && num_avail; ) { len = min (num_avail, cur_len); num_avail -= len; total += len; memcpy(buf, cur_buf, len);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -