📄 ibu.ibal.c
字号:
#undef FUNCNAME#define FUNCNAME ibu_destroy_set#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_destroy_set(ibu_set_t set){ ib_api_status_t status; MPIDI_STATE_DECL(MPID_STATE_IBU_DESTROY_SET); MPIDI_FUNC_ENTER(MPID_STATE_IBU_DESTROY_SET); MPIU_DBG_PRINTF(("entering ibu_destroy_set\n")); status = ib_destroy_cq(set, NULL); MPIU_DBG_PRINTF(("exiting ibu_destroy_set\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_DESTROY_SET); return status;}#undef FUNCNAME#define FUNCNAME ibui_buffer_unex_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_buffer_unex_read(ibu_t ibu, void *mem_ptr, unsigned int offset, unsigned int num_bytes){ ibu_unex_read_t *p; MPIDI_STATE_DECL(MPID_STATE_IBUI_BUFFER_UNEX_READ); MPIDI_FUNC_ENTER(MPID_STATE_IBUI_BUFFER_UNEX_READ); MPIU_DBG_PRINTF(("entering ibui_buffer_unex_read\n")); MPIDI_DBG_PRINTF((60, FCNAME, "%d bytes\n", num_bytes)); p = (ibu_unex_read_t *)MPIU_Malloc(sizeof(ibu_unex_read_t)); p->mem_ptr = mem_ptr; p->buf = (unsigned char *)mem_ptr + offset; p->length = num_bytes; p->next = ibu->unex_list; ibu->unex_list = p; MPIU_DBG_PRINTF(("exiting ibui_buffer_unex_read\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_BUFFER_UNEX_READ); return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibui_read_unex#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_read_unex(ibu_t ibu){ unsigned int len; ibu_unex_read_t *temp; MPIDI_STATE_DECL(MPID_STATE_IBUI_READ_UNEX); MPIDI_FUNC_ENTER(MPID_STATE_IBUI_READ_UNEX); MPIU_DBG_PRINTF(("entering ibui_read_unex\n")); MPIU_Assert(ibu->unex_list); /* copy the received data */ while (ibu->unex_list) { len = min(ibu->unex_list->length, ibu->read.bufflen); memcpy(ibu->read.buffer, ibu->unex_list->buf, len); /* advance the user pointer */ ibu->read.buffer = (char*)(ibu->read.buffer) + len; ibu->read.bufflen -= len; ibu->read.total += len; if (len != ibu->unex_list->length) { ibu->unex_list->length -= len; ibu->unex_list->buf += len; } else { /* put the receive packet back in the pool */ if (ibu->unex_list->mem_ptr == NULL) { MPIU_Internal_error_printf("ibui_read_unex: mem_ptr == NULL\n"); } MPIU_Assert(ibu->unex_list->mem_ptr != NULL); /* MPIU_Free the unexpected data node */ temp = ibu->unex_list; ibu->unex_list = ibu->unex_list->next; MPIU_Free(temp); } /* check to see if the entire message was received */ if (ibu->read.bufflen == 0) { /* place this ibu in the finished list so it will be completed by ibu_wait */ ibu->state &= ~IBU_READING; ibu->unex_finished_queue = IBU_Process.unex_finished_list; IBU_Process.unex_finished_list = ibu; /* post another receive to replace the consumed one */ /*ibui_post_receive(ibu);*/ MPIDI_DBG_PRINTF((60, FCNAME, "finished read saved in IBU_Process.unex_finished_list\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_READ_UNEX); return IBU_SUCCESS; } } MPIU_DBG_PRINTF(("exiting ibui_read_unex\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_READ_UNEX); return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibui_readv_unex#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibui_readv_unex(ibu_t ibu){ unsigned int num_bytes; ibu_unex_read_t *temp; MPIDI_STATE_DECL(MPID_STATE_IBUI_READV_UNEX); MPIDI_FUNC_ENTER(MPID_STATE_IBUI_READV_UNEX); MPIU_DBG_PRINTF(("entering ibui_readv_unex")); while (ibu->unex_list) { while (ibu->unex_list->length && ibu->read.iovlen) { num_bytes = min(ibu->unex_list->length, ibu->read.iov[ibu->read.index].MPID_IOV_LEN); MPIDI_DBG_PRINTF((60, FCNAME, "copying %d bytes\n", num_bytes)); /* copy the received data */ memcpy(ibu->read.iov[ibu->read.index].MPID_IOV_BUF, ibu->unex_list->buf, num_bytes); ibu->read.total += num_bytes; ibu->unex_list->buf += num_bytes; ibu->unex_list->length -= num_bytes; /* update the iov */ ibu->read.iov[ibu->read.index].MPID_IOV_LEN -= num_bytes; ibu->read.iov[ibu->read.index].MPID_IOV_BUF = (char*)(ibu->read.iov[ibu->read.index].MPID_IOV_BUF) + num_bytes; if (ibu->read.iov[ibu->read.index].MPID_IOV_LEN == 0) { ibu->read.index++; ibu->read.iovlen--; } } if (ibu->unex_list->length == 0) { /* put the receive packet back in the pool */ if (ibu->unex_list->mem_ptr == NULL) { MPIU_Internal_error_printf("ibui_readv_unex: mem_ptr == NULL\n"); } MPIU_Assert(ibu->unex_list->mem_ptr != NULL); MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlockFreeIB(mem_ptr)")); /* MPIU_Free the unexpected data node */ temp = ibu->unex_list; ibu->unex_list = ibu->unex_list->next; MPIU_Free(temp); /* replace the consumed read descriptor */ } if (ibu->read.iovlen == 0) { ibu->state &= ~IBU_READING; ibu->unex_finished_queue = IBU_Process.unex_finished_list; IBU_Process.unex_finished_list = ibu; MPIDI_DBG_PRINTF((60, FCNAME, "finished read saved in IBU_Process.unex_finished_list\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_READV_UNEX); return IBU_SUCCESS; } } MPIU_DBG_PRINTF(("exiting ibui_readv_unex\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_READV_UNEX); return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_set_vc_ptr#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_set_vc_ptr(ibu_t ibu, void *vc_ptr){ MPIDI_STATE_DECL(MPID_STATE_IBU_SET_USER_PTR); MPIDI_FUNC_ENTER(MPID_STATE_IBU_SET_USER_PTR); MPIU_DBG_PRINTF(("entering ibu_set_vc_ptr\n")); if (ibu == IBU_INVALID_QP) { MPIDI_FUNC_EXIT(MPID_STATE_IBU_SET_USER_PTR); return IBU_FAIL; } ibu->vc_ptr = vc_ptr; MPIU_DBG_PRINTF(("exiting ibu_set_vc_ptr\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_SET_USER_PTR); return IBU_SUCCESS;}/* non-blocking functions */#undef FUNCNAME#define FUNCNAME ibu_post_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_post_read(ibu_t ibu, void *buf, int len){ MPIDI_STATE_DECL(MPID_STATE_IBU_POST_READ); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_READ); MPIU_DBG_PRINTF(("entering ibu_post_read\n")); ibu->read.total = 0; ibu->read.buffer = buf; ibu->read.bufflen = len; ibu->read.use_iov = FALSE; ibu->state |= IBU_READING; ibu->vc_ptr->ch.reading_pkt = FALSE; ibu->pending_operations++; /* copy any pre-received data into the buffer */ if (ibu->unex_list) ibui_read_unex(ibu); MPIU_DBG_PRINTF(("exiting ibu_post_read\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_READ); return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_post_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_post_readv(ibu_t ibu, MPID_IOV *iov, int n){#ifdef MPICH_DBG_OUTPUT char str[1024] = "ibu_post_readv: "; char *s; int i;#endif MPIDI_STATE_DECL(MPID_STATE_IBU_POST_READV); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_READV); MPIU_DBG_PRINTF(("entering ibu_post_readv\n"));#ifdef MPICH_DBG_OUTPUT s = &str[16]; for (i=0; i<n; i++) { s += MPIU_Snprintf(s, 1008, "%d,", iov[i].MPID_IOV_LEN); } MPIDI_DBG_PRINTF((60, FCNAME, "%s\n", str));#endif ibu->read.total = 0; /* This isn't necessary if we require the iov to be valid for the duration of the operation */ /*ibu->read.iov = iov;*/ memcpy(ibu->read.iov, iov, sizeof(MPID_IOV) * n); ibu->read.iovlen = n; ibu->read.index = 0; ibu->read.use_iov = TRUE; ibu->state |= IBU_READING; ibu->vc_ptr->ch.reading_pkt = FALSE; ibu->pending_operations++; /* copy any pre-received data into the iov */ if (ibu->unex_list) ibui_readv_unex(ibu); MPIU_DBG_PRINTF(("exiting ibu_post_readv\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_READV); return IBU_SUCCESS;}#if 0#undef FUNCNAME#define FUNCNAME ibu_post_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_post_write(ibu_t ibu, void *buf, int len){ int num_bytes; MPIDI_STATE_DECL(MPID_STATE_IBU_POST_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_WRITE); MPIU_DBG_PRINTF(("entering ibu_post_write\n")); /* ibu->write.total = 0; ibu->write.buffer = buf; ibu->write.bufflen = len; ibu->write.use_iov = FALSE; ibu->state |= IBU_WRITING; ibu->pending_operations++; */ ibu->state |= IBU_WRITING; num_bytes = ibui_post_write(ibu, buf, len, wfn); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_WRITE); MPIDI_DBG_PRINTF((60, FCNAME, "returning %d\n", num_bytes)); return num_bytes;}#endif#if 0#undef FUNCNAME#define FUNCNAME ibu_post_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_post_writev(ibu_t ibu, MPID_IOV *iov, int n){ int num_bytes; MPIDI_STATE_DECL(MPID_STATE_IBU_POST_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_WRITEV); MPIU_DBG_PRINTF(("entering ibu_post_writev\n")); /* This isn't necessary if we require the iov to be valid for the duration of the operation */ /*ibu->write.iov = iov;*/ /* memcpy(ibu->write.iov, iov, sizeof(MPID_IOV) * n); ibu->write.iovlen = n; ibu->write.index = 0; ibu->write.use_iov = TRUE; */ ibu->state |= IBU_WRITING; /* { char str[1024], *s = str; int i; s += sprintf(s, "ibu_post_writev("); for (i=0; i<n; i++) s += sprintf(s, "%d,", iov[i].MPID_IOV_LEN); sprintf(s, ")\n"); MPIU_DBG_PRINTF(("%s", str)); } */ num_bytes = ibui_post_writev(ibu, iov, n, wfn); MPIU_DBG_PRINTF(("exiting ibu_post_writev\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_WRITEV); return IBU_SUCCESS;}#endif/* extended functions */#undef FUNCNAME#define FUNCNAME ibu_get_lid#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_get_lid(){ MPIDI_STATE_DECL(MPID_STATE_IBU_GET_LID); MPIDI_FUNC_ENTER(MPID_STATE_IBU_GET_LID); MPIDI_FUNC_EXIT(MPID_STATE_IBU_GET_LID); return IBU_Process.lid;}#undef FUNCNAME#define FUNCNAME post_pkt_recv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int post_pkt_recv(MPIDI_VC_t *recv_vc_ptr){ int mpi_errno; void *mem_ptr; ibu_t ibu; ibu_unex_read_t *temp; MPIDI_STATE_DECL(MPID_STATE_IB_POST_PKT_RECV); MPIDI_FUNC_ENTER(MPID_STATE_IB_POST_PKT_RECV); if (recv_vc_ptr->ch.ibu->unex_list == NULL) { recv_vc_ptr->ch.reading_pkt = TRUE; MPIDI_FUNC_EXIT(MPID_STATE_IB_POST_PKT_RECV); return MPI_SUCCESS; } ibu = recv_vc_ptr->ch.ibu; recv_vc_ptr->ch.reading_pkt = TRUE; mem_ptr = ibu->unex_list->buf; if (ibu->unex_list->length < sizeof(MPIDI_CH3_Pkt_t)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_IB_POST_PKT_RECV); return mpi_errno; } /* This is not correct. It must handle the same cases that ibu_wait does. */ mpi_errno = MPIDI_CH3U_Handle_recv_pkt(recv_vc_ptr, (MPIDI_CH3_Pkt_t*)mem_ptr, &recv_vc_ptr->ch.recv_active); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "infiniband read progress unable to handle incoming packet"); MPIDI_FUNC_EXIT(MPID_STATE_IB_POST_PKT_RECV); return mpi_errno; } ibu->unex_list->buf += sizeof(MPIDI_CH3_Pkt_t); ibu->unex_list->length -= sizeof(MPIDI_CH3_Pkt_t); if (ibu->unex_list->length == 0) { /* put the receive packet back in the pool */ if (ibu->unex_list->mem_ptr == NULL) { MPIU_Internal_error_printf("ibui_readv_unex: mem_ptr == NULL\n"); } MPIU_Assert(ibu->unex_list->mem_ptr != NULL); /* MPIU_Free the unexpected data node */ temp = ibu->unex_list; ibu->unex_list = ibu->unex_list->next; MPIU_Free(temp); } if (recv_vc_ptr->ch.recv_active == NULL) { MPIU_DBG_PRINTF(("packet with no data handled.\n")); recv_vc_ptr->ch.reading_pkt = TRUE; } else { /*mpi_errno =*/ ibu_post_readv(ibu, recv_vc_ptr->ch.recv_active->dev.iov, recv_vc_ptr->ch.recv_active->dev.iov_count); } MPIDI_FUNC_EXIT(MPID_STATE_IB_POST_PKT_RECV); return mpi_errno;}#endif /* USE_IB_IBAL */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -