ibu.vapi.c

来自「mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环」· C语言 代码 · 共 1,542 行 · 第 1/4 页

C
1,542
字号
#ifdef MPICH_DBG_OUTPUT    MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t*)ack_pkt);#endif    mpi_errno = ibu_write(ibu, ack_pkt, (int)sizeof(MPIDI_CH3_Pkt_t), &num_bytes); /* Mellanox - write with special ack pkt header */    if (mpi_errno != MPI_SUCCESS || num_bytes != sizeof(MPIDI_CH3_Pkt_t))    {	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);	MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_ACK_WRITE);	return mpi_errno;    }    MPIU_DBG_PRINTF(("exiting ibui_post_ack_write\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_ACK_WRITE);    return mpi_errno;}/* ibu functions *//* Mellanox August 25th*/#undef FUNCNAME#define FUNCNAME ibui_post_rndv_cts_iov_reg_err#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_post_rndv_cts_iov_reg_err(ibu_t ibu, MPID_Request * rreq){    int mpi_errno = MPI_SUCCESS;    int num_bytes;    MPIDI_CH3_Pkt_t upkt;    MPIDI_CH3_Pkt_rndv_reg_error_t* const cts_iov_reg_err_pkt = &upkt.rndv_reg_error;    MPIDI_STATE_DECL(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR);    MPIDI_FUNC_ENTER(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR);    MPIU_DBG_PRINTF(("entering ibui_post_rndv_cts_iov_reg_err\n"));    cts_iov_reg_err_pkt->iov_len = 0;    cts_iov_reg_err_pkt->type = MPIDI_CH3_PKT_RNDV_CTS_IOV_REG_ERR;    cts_iov_reg_err_pkt->sreq = rreq->dev.sender_req_id;    cts_iov_reg_err_pkt->rreq = rreq->handle;#ifdef MPICH_DBG_OUTPUT    MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t*)cts_iov_reg_err_pkt);#endif    mpi_errno = ibu_write(ibu, cts_iov_reg_err_pkt, (int)sizeof(MPIDI_CH3_Pkt_t), &num_bytes); /* Mellanox - write with special cts registration errorr pkt header */    if (mpi_errno != MPI_SUCCESS || num_bytes != sizeof(MPIDI_CH3_Pkt_t))    {	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);	MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR);	return mpi_errno;    }    MPIU_DBG_PRINTF(("exiting ibui_post_rndv_cts_iov_reg_err\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBUI_POST_RNDV_CTS_IOV_REG_ERR);    return mpi_errno;}/* Mellanox July 12th send_wqe_info_fifo_empty - check if fifo is emptry or not:return TRUE if empty, FALSE otherwise*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_empty#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int send_wqe_info_fifo_empty(int head, int tail){	    if (head == tail)    {	return TRUE;    }    return FALSE;}/* Mellanox July 12th send_wqe_info_fifo_pop - pop entry to send_wqe_fifo:Update signaled_wqes counter, and advance tail*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_pop#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void send_wqe_info_fifo_pop(ibu_t ibu, ibui_send_wqe_info_t* entry){    int head, tail;    head = ibu->send_wqe_info_fifo.head;    tail = ibu->send_wqe_info_fifo.tail;    if (!send_wqe_info_fifo_empty(head, tail))    {	*entry = ibu->send_wqe_info_fifo.entries[tail];	if (entry->RDMA_type == IBU_RDMA_EAGER_SIGNALED || 	    entry->RDMA_type == IBU_RDMA_RDNV_SIGNALED)	{	    ibu->send_wqe_info_fifo.num_of_signaled_wqes--;	    IBU_Process.num_send_cqe--;	}    }    else    {	entry = NULL;    }    ibu->send_wqe_info_fifo.tail = (tail + 1) % IBU_DEFAULT_MAX_WQE;}/* Mellanox July 12th check if send_wqe_fifo if full and cannot be pushed into:return TRUE if full, FALSE otherwise*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_full#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int send_wqe_info_fifo_full(int head, int tail){    if (((head + 1) % IBU_DEFAULT_MAX_WQE) == tail)    {	return TRUE;    }    return FALSE;}/* Mellanox July 12th push entry to send_wqe_fifo:Update RDMA type, signaled_wqes counter, length, mem_ptr and advance header*/#undef FUNCNAME#define FUNCNAME send_wqe_info_fifo_push#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void send_wqe_info_fifo_push(ibu_t ibu, ibu_rdma_type_t entry_type, void* mem_ptr, int length){    int head,tail;    head = ibu->send_wqe_info_fifo.head;    tail = ibu->send_wqe_info_fifo.tail;    if (!send_wqe_info_fifo_full(head,tail))    {	ibu->send_wqe_info_fifo.entries[head].RDMA_type = entry_type;	if (entry_type == IBU_RDMA_EAGER_SIGNALED ||	    entry_type == IBU_RDMA_RDNV_SIGNALED ) 	{	    ibu->send_wqe_info_fifo.num_of_signaled_wqes++;	    IBU_Process.num_send_cqe++;	}	ibu->send_wqe_info_fifo.entries[head].length = length;	ibu->send_wqe_info_fifo.entries[head].mem_ptr = mem_ptr;	ibu->send_wqe_info_fifo.head = (head + 1) % IBU_DEFAULT_MAX_WQE;    }}/* Mellanox July 11th Given the next posted index, determines weather this description postingis singnalled or not.returns signal bit according to VAPI definitions.*/#undef FUNCNAME#define FUNCNAME ibui_signaled_completion#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_signaled_completion(ibu_t ibu){    int signaled = VAPI_UNSIGNALED;	    if (ibu->send_wqe_info_fifo.head % (IBU_PACKET_COUNT >> 1) == 0)    {	signaled = VAPI_SIGNALED;    }    return signaled;}/* Mellanox END ibui_signaled_completion July 11th*/#undef FUNCNAME#define FUNCNAME ibu_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_write(ibu_t ibu, void *buf, int len, int *num_bytes_ptr){    VAPI_ret_t status;    VAPI_sg_lst_entry_t data;    VAPI_sr_desc_t work_req;    void* mem_ptr;    unsigned int length, msg_size;    int signaled_type = VAPI_UNSIGNALED;    int total = 0;    ibu_work_id_handle_t *id_ptr = NULL;    ibu_rdma_type_t entry_type;    MPIDI_STATE_DECL(MPID_STATE_IBU_WRITE);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_WRITE);    MPIU_DBG_PRINTF(("entering ibu_write\n"));    while (len)    {	length = min(len, IBU_EAGER_PACKET_SIZE);	len -= length;	if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head) + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS) < 2)	{	    /* Mellanox - check if packet is update limit packet. if not - return and enqueue */	    if (((MPIDI_CH3_Pkt_t*)buf)->type != MPIDI_CH3_PKT_LMT_UPT)	    {		/*printf("ibu_write: no remote packets available\n");fflush(stdout);*/		MPIDI_DBG_PRINTF((60, FCNAME, "no more remote packets available"));		MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .",		    ibu->remote_RDMA_head ,ibu->remote_RDMA_limit));		MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head - limit = %d ",		    ((ibu->remote_RDMA_head - ibu->remote_RDMA_limit + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS)));		*num_bytes_ptr = total;		MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE);		return IBU_SUCCESS;	    }	    MPIDI_DBG_PRINTF((60, FCNAME, "Going to send update limit pkt"));	    /* Mellanox - check for update limit packet if sending is available*/	    if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head)  + IBU_NUM_OF_RDMA_BUFS ) % IBU_NUM_OF_RDMA_BUFS) < 1)	    {		/* No send is available. Pretend as if sent and ignore ibu_write request */		*num_bytes_ptr = len;		MPIDI_DBG_PRINTF((60, FCNAME, "update limit is not available. exiting ibu_write"));		MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .",		    ibu->remote_RDMA_head ,ibu->remote_RDMA_limit));		MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head - limit = %d ",		    ((ibu->remote_RDMA_head - ibu->remote_RDMA_limit + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS)));		MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE);		return IBU_SUCCESS;	    }	}	MPIDI_DBG_PRINTF((60, FCNAME, "ibu->remote_RDMA_head = %d ibu->remote_RDMA_limit = %d .",ibu->remote_RDMA_head ,ibu->remote_RDMA_limit));	mem_ptr = ibuBlockAllocIB(ibu->allocator);	MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockAllocIB returned address %p\n",mem_ptr));	if (mem_ptr == NULL)	{	    MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockAllocIB returned NULL\n"));	    *num_bytes_ptr = total;	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE);	    return IBU_SUCCESS;	}	/* Mellanox - adding the RDMA header */	if (((MPIDI_CH3_Pkt_t*)buf)->type == MPIDI_CH3_PKT_LMT_UPT)	{	    length = 0; /* Only footer will be sent, no other body message */	}	msg_size = length + sizeof(ibu_rdma_buf_footer_t); /* Added size of additional header*/	/*((ibu_rdma_buf_t*)mem_ptr)->footer.cur_offset = 0;*/	((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_VALID_RDMA_BUF;	((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit = 	    ((ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) - 1) % IBU_NUM_OF_RDMA_BUFS; /* Piggybacked update of remote Q state */	((ibu_rdma_buf_t*)mem_ptr)->footer.total_size = msg_size;	/* Mellanox - END adding the RDMA header */	memcpy(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size), buf, length);	total += length;	data.len = msg_size; /*Mellanox descriptor holds additional header */	/* Data.addr points to Beginning of original buffer */	data.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size));	data.lkey = GETLKEY(mem_ptr);	/*MPIU_Assert(data.lkey == s_lkey);*/	work_req.opcode = VAPI_RDMA_WRITE; /* Mellanox dafna changed from SEND July 11th*/	signaled_type = ibui_signaled_completion(ibu); /* Mellanox dafna July 11th*/	work_req.comp_type = signaled_type;	work_req.sg_lst_p = &data;	work_req.sg_lst_len = 1;	work_req.imm_data = 0;	work_req.fence = 0;	work_req.remote_ah = 0;	work_req.remote_qp = 0;	work_req.remote_qkey = 0;	work_req.ethertype = 0;	work_req.eecn = 0;	work_req.set_se = 0;	work_req.remote_addr	= (VAPI_virt_addr_t)(MT_virt_addr_t)(ibu->remote_RDMA_buf_base + (ibu->remote_RDMA_head + 1));	work_req.remote_addr   -= msg_size;	work_req.r_key		= ibu->remote_RDMA_buf_hndl.rkey;	work_req.compare_add = 0;	work_req.swap = 0;	MPIU_DBG_PRINTF((" work_req remote_addr = %p  \n",work_req.remote_addr ));	/* Mellanox Allocate id_ptr only if wqe is signaled  */	if (signaled_type == VAPI_SIGNALED)	{	    id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator);	    *((ibu_work_id_handle_t**)&work_req.id) = id_ptr;	    if (id_ptr == NULL)	    {		MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlocAlloc returned NULL"));		MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE);		return IBU_FAIL;	    }	    id_ptr->ibu = ibu;	    id_ptr->mem = (void*)mem_ptr;	    id_ptr->length = msg_size;	}	/*sanity_check_send(&work_req);*/	if (msg_size < ibu->max_inline_size)	{	    MPIDI_DBG_PRINTF((60, FCNAME, "calling EVAPI_post_inline_sr(%d bytes)", msg_size));	    status = EVAPI_post_inline_sr( IBU_Process.hca_handle,		ibu->qp_handle, 		&work_req);	}	else	{	    MPIDI_DBG_PRINTF((60, FCNAME, "calling VAPI_post_sr(%d bytes)", msg_size));	    status = VAPI_post_sr( IBU_Process.hca_handle,		ibu->qp_handle, 		&work_req);	}	if (status != VAPI_OK)	{	    /* Free id_ptr if was signaled and VAPI posting failed */	    if (signaled_type == VAPI_SIGNALED)	    {		ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr);	    }	    MPIU_Internal_error_printf("%s: Error: failed to post ib send, status = %s\n", FCNAME, VAPI_strerror(status));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE);	    return IBU_FAIL;	}	/* Mellanox push entry to send_wqe_fifo */	entry_type = (signaled_type == VAPI_SIGNALED)? IBU_RDMA_EAGER_SIGNALED : IBU_RDMA_EAGER_UNSIGNALED;	send_wqe_info_fifo_push(ibu, entry_type, mem_ptr, msg_size);	/* Mellanox update head of remote RDMA buffer to write to */	ibu->remote_RDMA_head = (ibu->remote_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS;	/* Mellanox update remote RDMA limit to what was sent in the packet */	ibu->local_last_updated_RDMA_limit = (( ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS )- 1) % IBU_NUM_OF_RDMA_BUFS;	/* Mellanox change of print. use remote_RDMA_head/remote_RMDA_limit instead of nAvailRemote 	use new local RDMA limit for nUnacked */	MPIU_DBG_PRINTF(("send posted, nAvailRemote: %d, local_last_updated_RDMA_limit: %d\n", 	    (ibu->remote_RDMA_limit - ibu->remote_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS, 	    ((ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS)- 1) % IBU_NUM_OF_RDMA_BUFS));	buf = (char*)buf + length;    }    *num_bytes_ptr = total;    MPIU_DBG_PRINTF(("exiting ibu_write\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE);    return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_writev(ibu_t ibu, MPID_IOV *iov, int n, int *num_bytes_ptr){    VAPI_ret_t status;    VAPI_sg_lst_entry_t data;    VAPI_sr_desc_t work_req;    void* mem_ptr;    unsigned int len, msg_size;    int cur_msg_total, total = 0;    unsigned int num_avail;    unsigned char *buf;    int cur_index, msg_calc_cur_index; /* Mellanox added */    unsigned int cur_len, msg_calc_cur_len;    char *cur_buf;    int signaled_type = VAPI_UNSIGNALED;    ibu_work_id_handle_t *id_ptr = NULL;    ibu_rdma_type_t entry_type;    MPIDI_STATE_DECL(MPID_STATE_IBU_WRITEV);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_WRITEV);    MPIU_DBG_PRINTF(("entering ibu_writev\n"));    cur_index          = 0;    msg_calc_cur_index = 0;    cur_len            = iov[cur_index].MPID_IOV_LEN;    cur_buf            = iov[cur_index].MPID_IOV_BUF;    msg_calc_cur_len   = iov[cur_index].MPID_IOV_LEN;    do    {	cur_msg_total = 0;	if ((((ibu->remote_RDMA_limit - ibu->remote_RDMA_head) + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS) < 2)	{	    /*printf("ibu_writev: no remote packets available\n");fflush(stdout);*/

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?