ibu.vapi.c

来自「mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环」· C语言 代码 · 共 1,542 行 · 第 1/4 页

C
1,542
字号
    fflush(stdout);}void FooBar2(VAPI_hca_hndl_t hca_handle, VAPI_event_record_t *event, void *p){    MPIU_Error_printf("FooBar2\n");    fflush(stdout);}#undef FUNCNAME#define FUNCNAME ibu_create_set#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_create_set(ibu_set_t *set){    VAPI_ret_t status;    VAPI_cqe_num_t max_cq_entries = IBU_Process.cq_size;    MPIDI_STATE_DECL(MPID_STATE_IBU_CREATE_SET);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_CREATE_SET);    MPIU_DBG_PRINTF(("entering ibu_create_set\n"));    /* create the completion queue */    status = VAPI_create_cq(	IBU_Process.hca_handle, 	IBU_Process.cq_size,	set,	&max_cq_entries);    if (status != VAPI_OK)    {	MPIU_Internal_error_printf("ibu_create_set: VAPI_create_cq failed, error %s\n", VAPI_strerror(status));	MPIDI_FUNC_EXIT(MPID_STATE_IBU_CREATE_SET);	return status;    }    MPIU_DBG_PRINTF(("exiting ibu_create_set\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBU_CREATE_SET);    return status;}#undef FUNCNAME#define FUNCNAME ibu_destroy_set#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_destroy_set(ibu_set_t set){    VAPI_ret_t status;    MPIDI_STATE_DECL(MPID_STATE_IBU_DESTROY_SET);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_DESTROY_SET);    MPIU_DBG_PRINTF(("entering ibu_destroy_set\n"));    status = VAPI_destroy_cq(IBU_Process.hca_handle, set);    MPIU_DBG_PRINTF(("exiting ibu_destroy_set\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBU_DESTROY_SET);    return status;}#undef FUNCNAME#define FUNCNAME ibui_buffer_unex_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_buffer_unex_read(ibu_t ibu, void *mem_ptr, unsigned int offset, unsigned int num_bytes){    ibu_unex_read_t *p;    MPIDI_STATE_DECL(MPID_STATE_IBUI_BUFFER_UNEX_READ);    MPIDI_FUNC_ENTER(MPID_STATE_IBUI_BUFFER_UNEX_READ);    MPIU_DBG_PRINTF(("entering ibui_buffer_unex_read\n"));    MPIDI_DBG_PRINTF((60, FCNAME, "%d bytes\n", num_bytes));    p = (ibu_unex_read_t *)MPIU_Malloc(sizeof(ibu_unex_read_t));    p->mem_ptr = mem_ptr;    p->buf = (unsigned char *)mem_ptr + offset;    p->length = num_bytes;    p->next = ibu->unex_list;    ibu->unex_list = p;    MPIU_DBG_PRINTF(("exiting ibui_buffer_unex_read\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBUI_BUFFER_UNEX_READ);    return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibui_read_unex#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_read_unex(ibu_t ibu){    unsigned int len;    ibu_unex_read_t *temp;    MPIDI_STATE_DECL(MPID_STATE_IBUI_READ_UNEX);    MPIDI_FUNC_ENTER(MPID_STATE_IBUI_READ_UNEX);    MPIU_DBG_PRINTF(("entering ibui_read_unex\n"));    MPIU_Assert(ibu->unex_list);    /* copy the received data */    while (ibu->unex_list)    {	len = min(ibu->unex_list->length, ibu->read.bufflen);	memcpy(ibu->read.buffer, ibu->unex_list->buf, len);	/* advance the user pointer */	ibu->read.buffer = (char*)(ibu->read.buffer) + len;	ibu->read.bufflen -= len;	ibu->read.total += len;	if (len != ibu->unex_list->length)	{	    ibu->unex_list->length -= len;	    ibu->unex_list->buf += len;	}	else	{	    /* put the receive packet back in the pool */	    if (ibu->unex_list->mem_ptr == NULL)	    {		MPIU_Internal_error_printf("ibui_read_unex: mem_ptr == NULL\n");	    }	    MPIU_Assert(ibu->unex_list->mem_ptr != NULL);	    /* free the unexpected data node */	    temp = ibu->unex_list;	    ibu->unex_list = ibu->unex_list->next;	    MPIU_Free(temp);	}	/* check to see if the entire message was received */	if (ibu->read.bufflen == 0)	{	    /* place this ibu in the finished list so it will be completed by ibu_wait */	    ibu->state &= ~IBU_READING;	    ibu->unex_finished_queue = IBU_Process.unex_finished_list;	    IBU_Process.unex_finished_list = ibu;	    MPIDI_DBG_PRINTF((60, FCNAME, "finished read saved in IBU_Process.unex_finished_list\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBUI_READ_UNEX);	    return IBU_SUCCESS;	}    }    MPIU_DBG_PRINTF(("exiting ibui_read_unex\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBUI_READ_UNEX);    return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibui_readv_unex#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_readv_unex(ibu_t ibu){    unsigned int num_bytes;    ibu_unex_read_t *temp;    MPIDI_STATE_DECL(MPID_STATE_IBUI_READV_UNEX);    MPIDI_FUNC_ENTER(MPID_STATE_IBUI_READV_UNEX);    MPIU_DBG_PRINTF(("entering ibui_readv_unex"));    while (ibu->unex_list)    {	while (ibu->unex_list->length && ibu->read.iovlen)	{	    num_bytes = min(ibu->unex_list->length, ibu->read.iov[ibu->read.index].MPID_IOV_LEN);	    MPIDI_DBG_PRINTF((60, FCNAME, "copying %d bytes\n", num_bytes));	    /* copy the received data */	    memcpy(ibu->read.iov[ibu->read.index].MPID_IOV_BUF, ibu->unex_list->buf, num_bytes);	    ibu->read.total += num_bytes;	    ibu->unex_list->buf += num_bytes;	    ibu->unex_list->length -= num_bytes;	    /* update the iov */	    ibu->read.iov[ibu->read.index].MPID_IOV_LEN -= num_bytes;	    ibu->read.iov[ibu->read.index].MPID_IOV_BUF = 		(char*)(ibu->read.iov[ibu->read.index].MPID_IOV_BUF) + num_bytes;	    if (ibu->read.iov[ibu->read.index].MPID_IOV_LEN == 0)	    {		ibu->read.index++;		ibu->read.iovlen--;	    }	}	if (ibu->unex_list->length == 0)	{	    /* put the receive packet back in the pool */	    if (ibu->unex_list->mem_ptr == NULL)	    {		MPIU_Internal_error_printf("ibui_readv_unex: mem_ptr == NULL\n");	    }	    MPIU_Assert(ibu->unex_list->mem_ptr != NULL);	    MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockFreeIB(mem_ptr)"));	    /* free the unexpected data node */	    temp = ibu->unex_list;	    ibu->unex_list = ibu->unex_list->next;	    MPIU_Free(temp);	}	if (ibu->read.iovlen == 0)	{	    ibu->state &= ~IBU_READING;	    ibu->unex_finished_queue = IBU_Process.unex_finished_list;	    IBU_Process.unex_finished_list = ibu;	    MPIDI_DBG_PRINTF((60, FCNAME, "finished read saved in IBU_Process.unex_finished_list\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBUI_READV_UNEX);	    return IBU_SUCCESS;	}    }    MPIU_DBG_PRINTF(("exiting ibui_readv_unex\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBUI_READV_UNEX);    return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_set_vc_ptr#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_set_vc_ptr(ibu_t ibu, void *vc_ptr){    MPIDI_STATE_DECL(MPID_STATE_IBU_SET_USER_PTR);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_SET_USER_PTR);    MPIU_DBG_PRINTF(("entering ibu_set_vc_ptr\n"));    if (ibu == IBU_INVALID_QP)    {	MPIDI_FUNC_EXIT(MPID_STATE_IBU_SET_USER_PTR);	return IBU_FAIL;    }    ibu->vc_ptr = vc_ptr;    MPIU_DBG_PRINTF(("exiting ibu_set_vc_ptr\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBU_SET_USER_PTR);    return IBU_SUCCESS;}/* non-blocking functions */#undef FUNCNAME#define FUNCNAME ibu_post_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_post_read(ibu_t ibu, void *buf, int len){    MPIDI_STATE_DECL(MPID_STATE_IBU_POST_READ);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_READ);    MPIU_DBG_PRINTF(("entering ibu_post_read\n"));    ibu->read.total = 0;    ibu->read.buffer = buf;    ibu->read.bufflen = len;    ibu->read.use_iov = FALSE;    ibu->state |= IBU_READING;    ibu->vc_ptr->ch.reading_pkt = FALSE;    ibu->pending_operations++;    /* copy any pre-received data into the buffer */    if (ibu->unex_list)	ibui_read_unex(ibu);    MPIU_DBG_PRINTF(("exiting ibu_post_read\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_READ);    return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_post_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_post_readv(ibu_t ibu, MPID_IOV *iov, int n){#ifdef MPICH_DBG_OUTPUT    char str[1024] = "ibu_post_readv: ";    char *s;    int i;#endif    MPIDI_STATE_DECL(MPID_STATE_IBU_POST_READV);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_READV);    MPIU_DBG_PRINTF(("entering ibu_post_readv\n"));#ifdef MPICH_DBG_OUTPUT    s = &str[16];    for (i=0; i<n; i++)    {	s += MPIU_Snprintf(s, 1008, "%d,", iov[i].MPID_IOV_LEN);    }    MPIDI_DBG_PRINTF((60, FCNAME, "%s\n", str));#endif    ibu->read.total = 0;    /* This isn't necessary if we require the iov to be valid for the duration of the operation */    /*ibu->read.iov = iov;*/    memcpy(ibu->read.iov, iov, sizeof(MPID_IOV) * n);    ibu->read.iovlen = n;    ibu->read.index = 0;    ibu->read.use_iov = TRUE;    ibu->state |= IBU_READING;    ibu->vc_ptr->ch.reading_pkt = FALSE;    ibu->pending_operations++;    /* copy any pre-received data into the iov */    if (ibu->unex_list)	ibui_readv_unex(ibu);    MPIU_DBG_PRINTF(("exiting ibu_post_readv\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_READV);    return IBU_SUCCESS;}/* extended functions */#undef FUNCNAME#define FUNCNAME ibu_get_lid#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_get_lid(){    MPIDI_STATE_DECL(MPID_STATE_IBU_GET_LID);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_GET_LID);    MPIDI_FUNC_EXIT(MPID_STATE_IBU_GET_LID);    return IBU_Process.lid;}#undef FUNCNAME#define FUNCNAME post_pkt_recv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int post_pkt_recv(MPIDI_VC_t *recv_vc_ptr){    int mpi_errno;    void *mem_ptr;    ibu_t ibu;    ibu_unex_read_t *temp;    MPIDI_STATE_DECL(MPID_STATE_IB_POST_PKT_RECV);    MPIDI_FUNC_ENTER(MPID_STATE_IB_POST_PKT_RECV);    if (recv_vc_ptr->ch.ibu->unex_list == NULL)    {	recv_vc_ptr->ch.reading_pkt = TRUE;			MPIDI_FUNC_EXIT(MPID_STATE_IB_POST_PKT_RECV);	return MPI_SUCCESS;    }    ibu = recv_vc_ptr->ch.ibu;    recv_vc_ptr->ch.reading_pkt = TRUE;    mem_ptr = ibu->unex_list->buf;    if (ibu->unex_list->length < sizeof(MPIDI_CH3_Pkt_t))    {	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);	MPIDI_FUNC_EXIT(MPID_STATE_IB_POST_PKT_RECV);	return mpi_errno;    }    /* This is not correct.  It must handle the same cases that ibu_wait does. */    mpi_errno = MPIDI_CH3U_Handle_recv_pkt(recv_vc_ptr, (MPIDI_CH3_Pkt_t*)mem_ptr, &recv_vc_ptr->ch.recv_active);    if (mpi_errno != MPI_SUCCESS)    {	mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "infiniband read progress unable to handle incoming packet");	MPIDI_FUNC_EXIT(MPID_STATE_IB_POST_PKT_RECV);	return mpi_errno;    }    ibu->unex_list->buf += sizeof(MPIDI_CH3_Pkt_t);    ibu->unex_list->length -= sizeof(MPIDI_CH3_Pkt_t);    if (ibu->unex_list->length == 0)    {	/* put the receive packet back in the pool */	if (ibu->unex_list->mem_ptr == NULL)	{	    MPIU_Internal_error_printf("ibui_readv_unex: mem_ptr == NULL\n");	}	MPIU_Assert(ibu->unex_list->mem_ptr != NULL);	/* free the unexpected data node */	temp = ibu->unex_list;	ibu->unex_list = ibu->unex_list->next;	MPIU_Free(temp);    }    if (recv_vc_ptr->ch.recv_active == NULL)    {	MPIU_DBG_PRINTF(("packet with no data handled.\n"));	recv_vc_ptr->ch.reading_pkt = TRUE;    }    else    {	/*mpi_errno =*/ ibu_post_readv(ibu, recv_vc_ptr->ch.recv_active->dev.iov, recv_vc_ptr->ch.recv_active->dev.iov_count);    }    MPIDI_FUNC_EXIT(MPID_STATE_IB_POST_PKT_RECV);    return mpi_errno;}#endif /* USE_IB_VAPI */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?