📄 ibu_wait.vapi.c
字号:
printf("posting a read of %d buffers\n", recv_vc_ptr->ch.recv_active->dev.iov_count); for (z=0; z<recv_vc_ptr->ch.recv_active->dev.iov_count; z++) { printf(" [%d].len = %d\n", z, recv_vc_ptr->ch.recv_active->dev.iov[z].MPID_IOV_LEN); } fflush(stdout); */ /*mpi_errno =*/ ibu_post_readv(ibu, recv_vc_ptr->ch.recv_active->dev.iov, recv_vc_ptr->ch.recv_active->dev.iov_count); } mem_ptr = (unsigned char *)mem_ptr + sizeof(MPIDI_CH3_Pkt_t); num_bytes -= sizeof(MPIDI_CH3_Pkt_t); if (num_bytes == 0) { *num_bytes_ptr = num_bytes; *op_ptr = IBU_OP_TIMEOUT; *vc_pptr = ibu->vc_ptr; MPIU_DBG_PRINTFX(("exiting ibu_wait num_byte == 0\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return MPI_SUCCESS; break; } if (recv_vc_ptr->ch.recv_active == NULL) { MPIU_DBG_PRINTF(("pkt handled with %d bytes remaining to be buffered.\n", num_bytes)); ibui_buffer_unex_read(ibu, mem_ptr_orig, sizeof(MPIDI_CH3_Pkt_t), num_bytes); break; } pkt_offset = sizeof(MPIDI_CH3_Pkt_t); } MPIDI_DBG_PRINTF((60, FCNAME, "read %d bytes\n", num_bytes)); /*MPIDI_DBG_PRINTF((60, FCNAME, "ibu_wait(recv finished %d bytes)", num_bytes));*/ if (!(ibu->state & IBU_READING)) { MPIU_DBG_PRINTF(("a:buffering %d bytes.\n", num_bytes)); ibui_buffer_unex_read(ibu, mem_ptr_orig, pkt_offset, num_bytes); break; } MPIDI_DBG_PRINTF((60, FCNAME, "read update, total = %d + %d = %d\n", ibu->read.total, num_bytes, ibu->read.total + num_bytes)); if (ibu->read.use_iov) { iter_ptr = mem_ptr; while (num_bytes && ibu->read.iovlen > 0) { if ((int)ibu->read.iov[ibu->read.index].MPID_IOV_LEN <= num_bytes) { /* copy the received data */ memcpy(ibu->read.iov[ibu->read.index].MPID_IOV_BUF, iter_ptr, ibu->read.iov[ibu->read.index].MPID_IOV_LEN); iter_ptr += ibu->read.iov[ibu->read.index].MPID_IOV_LEN; /* update the iov */ num_bytes -= ibu->read.iov[ibu->read.index].MPID_IOV_LEN; ibu->read.index++; ibu->read.iovlen--; } else { /* copy the received data */ memcpy(ibu->read.iov[ibu->read.index].MPID_IOV_BUF, iter_ptr, num_bytes); iter_ptr += num_bytes; /* update the iov */ ibu->read.iov[ibu->read.index].MPID_IOV_LEN -= num_bytes; ibu->read.iov[ibu->read.index].MPID_IOV_BUF = (char*)(ibu->read.iov[ibu->read.index].MPID_IOV_BUF) + num_bytes; num_bytes = 0; } } offset = (long)((unsigned char*)iter_ptr - (unsigned char*)mem_ptr); ibu->read.total += offset; if (num_bytes == 0) { /* put the receive packet back in the pool */ if (mem_ptr == NULL) MPIU_Internal_error_printf("ibu_wait: read mem_ptr == NULL\n"); MPIU_Assert(mem_ptr != NULL); } else { /* save the unused but received data */ MPIU_DBG_PRINTF(("b:buffering %d bytes (offset,pkt = %d,%d).\n", num_bytes, offset, pkt_offset)); ibui_buffer_unex_read(ibu, mem_ptr_orig, offset + pkt_offset, num_bytes); } if (ibu->read.iovlen == 0) { if (recv_vc_ptr->ch.recv_active->kind < MPID_LAST_REQUEST_KIND) { ibu->state &= ~IBU_READING; *num_bytes_ptr = ibu->read.total; *op_ptr = IBU_OP_READ; *vc_pptr = ibu->vc_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { MPIDI_DBG_PRINTF((60, FCNAME, "closing ibu after iov read completed.")); ibu = IBU_INVALID_QP; } MPIU_DBG_PRINTFX(("exiting ibu_wait 6\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return MPI_SUCCESS; }#ifdef MPIDI_CH3_CHANNEL_RNDV else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_RTS_IOV_READ_REQUEST) { int found; MPIU_DBG_PRINTF(("received rts iov_read.\n")); mpi_errno = MPIDI_CH3U_Handle_recv_rndv_pkt(recv_vc_ptr, &recv_vc_ptr->ch.recv_active->ch.pkt, &rreq, &found); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "ibu read progress unable to handle incoming rts(get) packet"); MPIU_DBG_PRINTFX(("exiting ibu_wait v\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ for (i=0; i<recv_vc_ptr->ch.recv_active->dev.rdma_iov_count; i++) { rreq->dev.rdma_iov[i].MPID_IOV_BUF = recv_vc_ptr->ch.recv_active->dev.rdma_iov[i].MPID_IOV_BUF; rreq->dev.rdma_iov[i].MPID_IOV_LEN = recv_vc_ptr->ch.recv_active->dev.rdma_iov[i].MPID_IOV_LEN; rreq->ch.remote_iov_mem[i] = recv_vc_ptr->ch.recv_active->ch.remote_iov_mem[i]; } rreq->dev.rdma_iov_count = recv_vc_ptr->ch.recv_active->dev.rdma_iov_count; rreq->dev.rdma_request = recv_vc_ptr->ch.recv_active->dev.rdma_request; if (found) { mpi_errno = MPIDI_CH3U_Post_data_receive(found, &rreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code (mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ mpi_errno = MPIDI_CH3_iStartRndvTransfer(recv_vc_ptr, rreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code (mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ } rreq = recv_vc_ptr->ch.recv_active; recv_vc_ptr->ch.recv_active = NULL; recv_vc_ptr->ch.reading_pkt = TRUE; /* free the request used to receive the rts packet and iov data */ MPIU_Object_set_ref(rreq, 0); MPIDI_CH3_Request_destroy(rreq); } else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_IOV_READ_REQUEST) { MPIU_DBG_PRINTF(("received iov_read.\n")); rreq = recv_vc_ptr->ch.recv_active; /* A new sender's iov has arrived so set the offset back to zero. */ rreq->ch.req->ch.siov_offset = 0; mpi_errno = MPIDI_CH3_iStartRndvTransfer(recv_vc_ptr, rreq->ch.req); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "ibu read progress unable to handle incoming rts(get) iov"); MPIU_DBG_PRINTFX(("exiting ibu_wait v\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ recv_vc_ptr->ch.recv_active = NULL; recv_vc_ptr->ch.reading_pkt = TRUE; /* free the request used to receive the iov data */ MPIU_Object_set_ref(rreq, 0); MPIDI_CH3_Request_destroy(rreq); } else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_IOV_WRITE_REQUEST) { MPIU_DBG_PRINTF(("received iov_write.\n")); /* A new receiver's iov has arrived so set the offset back to zero. */ recv_vc_ptr->ch.recv_active->ch.req->ch.riov_offset = 0; /* Check rndv status. If failed - need to send CANCEL_IOV packet and wait for confirmation */ if (recv_vc_ptr->ch.recv_active->ch.req->ch.rndv_status == IBU_RNDV_SUCCESS) { mpi_errno = MPIDI_CH3I_rdma_writev(recv_vc_ptr, recv_vc_ptr->ch.recv_active->ch.req); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIU_DBG_PRINTFX(("exiting ibu_wait w\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ } else { /* switch to EAGER sending with special packet type (set in switch routine)*/ MPIDI_CH3_Pkt_t pkt; MPIDI_CH3_Pkt_rndv_eager_send_t * rndv_eager_pkt = &pkt.rndv_eager_send; MPIU_DBG_PRINTF(("sending eager packet instead of rndv failed sender reloading\n")); /* send new eager packet to the receiver */#if defined(MPID_USE_SEQUENCE_NUMBERS) rndv_eager_pkt->seqnum = recv_vc_ptr->seqnum_send;#endif rndv_eager_pkt->match.rank = (recv_vc_ptr->ch.recv_active->ch.req)->comm->rank; rndv_eager_pkt->match.tag = (recv_vc_ptr->ch.recv_active->ch.req)->dev.match.tag; rndv_eager_pkt->match.context_id = (recv_vc_ptr->ch.recv_active->ch.req)->dev.match.context_id; pkt.rndv_eager_send.sender_req_id = (recv_vc_ptr->ch.recv_active->ch.req)->handle; pkt.rndv_eager_send.receiver_req_id = (recv_vc_ptr->ch.recv_active->ch.req)->dev.rdma_request; pkt.rndv_eager_send.type = MPIDI_CH3_PKT_RNDV_EAGER_SEND; mpi_errno = MPIDI_CH3I_Switch_rndv_to_eager(recv_vc_ptr, recv_vc_ptr->ch.recv_active->ch.req, &pkt); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIU_DBG_PRINTFX(("exiting ibu_wait w\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* return from the wait */ MPID_Request_release(recv_vc_ptr->ch.recv_active); recv_vc_ptr->ch.recv_active = NULL; recv_vc_ptr->ch.reading_pkt = TRUE; *num_bytes_ptr = 0; *vc_pptr = recv_vc_ptr; *op_ptr = IBU_OP_WAKEUP; MPIU_DBG_PRINTFX(("exiting ibu_wait x\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return MPI_SUCCESS; break; }#endif /* MPIDI_CH3_CHANNEL_RNDV */ else { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "invalid request type", recv_vc_ptr->ch.recv_active->kind); MPIU_DBG_PRINTFX(("exiting ibu_wait y\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } } } else { if ((unsigned int)num_bytes > ibu->read.bufflen) { /* copy the received data */ memcpy(ibu->read.buffer, mem_ptr, ibu->read.bufflen); ibu->read.total = ibu->read.bufflen; MPIU_DBG_PRINTF(("c:buffering %d bytes.\n", num_bytes - ibu->read.bufflen)); ibui_buffer_unex_read(ibu, mem_ptr_orig, ibu->read.bufflen + pkt_offset, num_bytes - ibu->read.bufflen); ibu->read.bufflen = 0; } else { /* copy the received data */ memcpy(ibu->read.buffer, mem_ptr, num_bytes); ibu->read.total += num_bytes; /* advance the user pointer */ ibu->read.buffer = (char*)(ibu->read.buffer) + num_bytes; ibu->read.bufflen -= num_bytes; } if (ibu->read.bufflen == 0) { ibu->state &= ~IBU_READING; *num_bytes_ptr = ibu->read.total; *op_ptr = IBU_OP_READ; *vc_pptr = ibu->vc_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { MPIDI_DBG_PRINTF((60, FCNAME, "closing ibu after simple read completed.")); ibu = IBU_INVALID_QP; } MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); MPIU_DBG_PRINTFX(("exiting ibu_wait 7\n")); return MPI_SUCCESS; } } break; default: if (completion_data.status != VAPI_SUCCESS) { MPIU_Internal_error_printf("%s: unknown completion status = %s != VAPI_SUCCESS\n", FCNAME, VAPI_strerror(completion_data.status)); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", VAPI_strerror(completion_data.status)); MPIU_DBG_PRINTFX(("exiting ibu_wait 42\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } MPIU_Internal_error_printf("%s: unknown ib opcode: %s\n", FCNAME, op2str(completion_data.opcode)); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", op2str(completion_data.opcode)); MPIU_DBG_PRINTFX(("exiting ibu_wait z\n")); return mpi_errno; break; } } MPIU_DBG_PRINTFX(("exiting ibu_wait 8\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return MPI_SUCCESS;}#endif /* USE_IB_VAPI */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -