📄 ch3u_request.c
字号:
data_sz = tmpbuf_sz; } rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *) rreq->dev.tmpbuf + rreq->dev.tmpbuf_off); rreq->dev.iov[0].MPID_IOV_LEN = data_sz; rreq->dev.iov_count = 1; MPIU_Assert(rreq->dev.segment_first + data_sz + rreq->dev.tmpbuf_off <= rreq->dev.recv_data_sz); if (rreq->dev.segment_first + data_sz + rreq->dev.tmpbuf_off == rreq->dev.recv_data_sz) { MPIU_DBG_MSG(CH3_CHANNEL,VERBOSE, "updating rreq to read the remaining data into the SRBuf"); rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_UnpackSRBufComplete; } else { MPIU_DBG_MSG(CH3_CHANNEL,VERBOSE, "updating rreq to read more data into the SRBuf"); rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_UnpackSRBufReloadIOV; } goto fn_exit; } last = rreq->dev.segment_size; rreq->dev.iov_count = MPID_IOV_LIMIT; MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,(MPIU_DBG_FDEST, "pre-upv: first=" MPIDI_MSG_SZ_FMT ", last=" MPIDI_MSG_SZ_FMT ", iov_n=%d", rreq->dev.segment_first, last, rreq->dev.iov_count)); MPIU_Assert(rreq->dev.segment_first < last); MPIU_Assert(last > 0); MPID_Segment_unpack_vector(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, rreq->dev.iov, &rreq->dev.iov_count); MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,(MPIU_DBG_FDEST, "post-upv: first=" MPIDI_MSG_SZ_FMT ", last=" MPIDI_MSG_SZ_FMT ", iov_n=%d", rreq->dev.segment_first, last, rreq->dev.iov_count)); MPIU_Assert(rreq->dev.iov_count >= 0 && rreq->dev.iov_count <= MPID_IOV_LIMIT); /* --BEGIN ERROR HANDLING-- */ if (rreq->dev.iov_count == 0) { /* If the data can't be unpacked, the we have a mis-match between the datatype and the amount of data received. Adjust the segment info so that the remaining data is received and thrown away. */ rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TYPE, "**dtypemismatch", 0); rreq->status.count = (int)rreq->dev.segment_first; rreq->dev.segment_size = rreq->dev.segment_first; mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); goto fn_exit; } /* --END ERROR HANDLING-- */ if (last == rreq->dev.recv_data_sz) { MPIU_DBG_MSG(CH3_CHANNEL,VERBOSE, "updating rreq to read the remaining data directly into the user buffer"); /* Eventually, use OnFinal for this instead */ rreq->dev.OnDataAvail = 0; } else if (last == rreq->dev.segment_size || (last - rreq->dev.segment_first) / rreq->dev.iov_count >= MPIDI_IOV_DENSITY_MIN) { MPIU_DBG_MSG(CH3_CHANNEL,VERBOSE, "updating rreq to read more data directly into the user buffer"); rreq->dev.segment_first = last; rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReloadIOV; } else { /* Too little data would have been received using an IOV. We will start receiving data into a SRBuf and unpacking it later. */ MPIU_Assert(MPIDI_Request_get_srbuf_flag(rreq) == FALSE); MPIDI_CH3U_SRBuf_alloc(rreq, rreq->dev.segment_size - rreq->dev.segment_first); rreq->dev.tmpbuf_off = 0; /* --BEGIN ERROR HANDLING-- */ if (rreq->dev.tmpbuf_sz == 0) { /* FIXME - we should drain the data off the pipe here, but we don't have a buffer to drain it into. should this be a fatal error? */ MPIU_DBG_MSG(CH3_CHANNEL,VERBOSE,"SRBuf allocation failure"); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); rreq->status.MPI_ERROR = mpi_errno; goto fn_exit; } /* --END ERROR HANDLING-- */ /* fill in the IOV using a recursive call */ mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); } } else { /* receive and toss any extra data that does not fit in the user's buffer */ MPIDI_msg_sz_t data_sz; data_sz = rreq->dev.recv_data_sz - rreq->dev.segment_first; if (!MPIDI_Request_get_srbuf_flag(rreq)) { MPIDI_CH3U_SRBuf_alloc(rreq, data_sz); /* --BEGIN ERROR HANDLING-- */ if (rreq->dev.tmpbuf_sz == 0) { MPIU_DBG_MSG(CH3_CHANNEL,TYPICAL,"SRBuf allocation failure"); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); rreq->status.MPI_ERROR = mpi_errno; goto fn_exit; } /* --END ERROR HANDLING-- */ } if (data_sz <= rreq->dev.tmpbuf_sz) { MPIU_DBG_MSG(CH3_CHANNEL,VERBOSE, "updating rreq to read overflow data into the SRBuf and complete"); rreq->dev.iov[0].MPID_IOV_LEN = data_sz; MPIU_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_RECV); /* Eventually, use OnFinal for this instead */ rreq->dev.OnDataAvail = 0; } else { MPIU_DBG_MSG(CH3_CHANNEL,VERBOSE, "updating rreq to read overflow data into the SRBuf and reload IOV"); rreq->dev.iov[0].MPID_IOV_LEN = rreq->dev.tmpbuf_sz; rreq->dev.segment_first += rreq->dev.tmpbuf_sz; rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReloadIOV; } rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rreq->dev.tmpbuf; rreq->dev.iov_count = 1; } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_REQUEST_LOAD_RECV_IOV); return mpi_errno;}/* * MPIDI_CH3U_Request_unpack_srbuf * * Unpack data from a send/receive buffer into the user buffer. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Request_unpack_srbuf#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Request_unpack_srbuf(MPID_Request * rreq){ MPIDI_msg_sz_t last; int tmpbuf_last; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_REQUEST_UNPACK_SRBUF); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_REQUEST_UNPACK_SRBUF); tmpbuf_last = (int)(rreq->dev.segment_first + rreq->dev.tmpbuf_sz); if (rreq->dev.segment_size < tmpbuf_last) { tmpbuf_last = (int)rreq->dev.segment_size; } last = tmpbuf_last; MPID_Segment_unpack(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, rreq->dev.tmpbuf); if (last == 0 || last == rreq->dev.segment_first) { /* --BEGIN ERROR HANDLING-- */ /* If no data can be unpacked, then we have a datatype processing problem. Adjust the segment info so that the remaining data is received and thrown away. */ rreq->status.count = (int)rreq->dev.segment_first; rreq->dev.segment_size = rreq->dev.segment_first; rreq->dev.segment_first += tmpbuf_last; rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TYPE, "**dtypemismatch", 0); /* --END ERROR HANDLING-- */ } else if (tmpbuf_last == rreq->dev.segment_size) { /* --BEGIN ERROR HANDLING-- */ if (last != tmpbuf_last) { /* received data was not entirely consumed by unpack() because too few bytes remained to fill the next basic datatype. Note: the segment_first field is set to segment_last so that if this is a truncated message, extra data will be read off the pipe. */ rreq->status.count = (int)last; rreq->dev.segment_size = last; rreq->dev.segment_first = tmpbuf_last; rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TYPE, "**dtypemismatch", 0); } /* --END ERROR HANDLING-- */ } else { rreq->dev.tmpbuf_off = (int)(tmpbuf_last - last); if (rreq->dev.tmpbuf_off > 0) { /* move any remaining data to the beginning of the buffer. Note: memmove() is used since the data regions could overlap. */ memmove(rreq->dev.tmpbuf, (char *) rreq->dev.tmpbuf + (last - rreq->dev.segment_first), rreq->dev.tmpbuf_off); } rreq->dev.segment_first = last; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_REQUEST_UNPACK_SRBUF); return mpi_errno;}/* * MPIDI_CH3U_Request_unpack_uebuf * * Copy/unpack data from an "unexpected eager buffer" into the user buffer. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Request_unpack_uebuf#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Request_unpack_uebuf(MPID_Request * rreq){ int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t userbuf_sz; MPID_Datatype * dt_ptr; MPIDI_msg_sz_t unpack_sz; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_REQUEST_UNPACK_UEBUF); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_REQUEST_UNPACK_UEBUF); MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, userbuf_sz, dt_ptr, dt_true_lb); if (rreq->dev.recv_data_sz <= userbuf_sz) { unpack_sz = rreq->dev.recv_data_sz; } else { /* --BEGIN ERROR HANDLING-- */ MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,(MPIU_DBG_FDEST, "receive buffer overflow; message truncated, msg_sz=" MPIDI_MSG_SZ_FMT ", buf_sz=" MPIDI_MSG_SZ_FMT, rreq->dev.recv_data_sz, userbuf_sz)); unpack_sz = userbuf_sz; rreq->status.count = (int)userbuf_sz; rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE, "**truncate", "**truncate %d %d", rreq->dev.recv_data_sz, userbuf_sz); /* --END ERROR HANDLING-- */ } if (unpack_sz > 0) { if (dt_contig) { /* TODO - check that amount of data is consistent with datatype. In other words, if we were to use Segment_unpack() would last = unpack? If not we should return an error (unless configured with --enable-fast) */ MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy((char *)rreq->dev.user_buf + dt_true_lb, rreq->dev.tmpbuf, unpack_sz); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); } else { MPID_Segment seg; MPIDI_msg_sz_t last; MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, &seg, 0); last = unpack_sz; MPID_Segment_unpack(&seg, 0, &last, rreq->dev.tmpbuf); if (last != unpack_sz) { /* --BEGIN ERROR HANDLING-- */ /* received data was not entirely consumed by unpack() because too few bytes remained to fill the next basic datatype */ rreq->status.count = (int)last; rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TYPE, "**dtypemismatch", 0); /* --END ERROR HANDLING-- */ } } } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_REQUEST_UNPACK_UEBUF); return mpi_errno;}/* * Export the function to set a request as completed for use by * the generalized request functions in mpich2/src/pt2pt/greq_complete.c */void MPID_Request_set_completed( MPID_Request *req ){ MPID_REQUEST_SET_COMPLETED(req);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -