📄 ch3u_handle_recv_pkt.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidimpl.h"#include "mpidrma.h"/* * This file contains the dispatch routine called by the ch3 progress * engine to process messages. * * This file is in transistion * * Where possible, the routines that create and send all packets of * a particular type are in the same file that contains the implementation * of the handlers for that packet type (for example, the CancelSend * packets are created and processed by routines in ch3/src/mpid_cancel_send.c) * This makes is easier to replace or modify functionality within * the ch3 device. */#define set_request_info(rreq_, pkt_, msg_type_) \{ \ (rreq_)->status.MPI_SOURCE = (pkt_)->match.rank; \ (rreq_)->status.MPI_TAG = (pkt_)->match.tag; \ (rreq_)->status.count = (pkt_)->data_sz; \ (rreq_)->dev.sender_req_id = (pkt_)->sender_req_id; \ (rreq_)->dev.recv_data_sz = (pkt_)->data_sz; \ MPIDI_Request_set_seqnum((rreq_), (pkt_)->seqnum); \ MPIDI_Request_set_msg_type((rreq_), (msg_type_)); \}/* * MPIDI_CH3U_Handle_recv_pkt() * * NOTE: Multiple threads may NOT simultaneously call this routine with the * same VC. This constraint eliminates the need to * lock the VC. If simultaneous upcalls are a possible, the calling routine * for serializing the calls. *//* This code and definition is used to allow us to provide a routine that handles packets that are received out-of-order. However, we currently do not support that in the CH3 device. */#define MPIDI_CH3U_Handle_ordered_recv_pkt MPIDI_CH3U_Handle_recv_pkt #undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_ordered_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_ordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, MPIDI_msg_sz_t *buflen, MPID_Request ** rreqp){ int mpi_errno = MPI_SUCCESS; static MPIDI_CH3_PktHandler_Fcn *pktArray[MPIDI_CH3_PKT_END_CH3+1]; static int needsInit = 1; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT); MPIU_DBG_STMT(CH3_OTHER,VERBOSE,MPIDI_DBG_Print_packet(pkt)); /* FIXME: We can turn this into something like MPIU_Assert(pkt->type >= 0 && pkt->type <= MAX_PACKET_TYPE); mpi_errno = MPIDI_CH3_ProgressFunctions[pkt->type](vc,pkt,rreqp); in the progress engine itself. Then this routine is not necessary. */ if (needsInit) { MPIDI_CH3_PktHandler_Init( pktArray, MPIDI_CH3_PKT_END_CH3 ); needsInit = 0; } MPIU_Assert(pkt->type >= 0 && pkt->type <= MPIDI_CH3_PKT_END_CH3); mpi_errno = pktArray[pkt->type](vc, pkt, buflen, rreqp); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT); return mpi_errno;}/* * This function is used to receive data from the receive buffer to * the user buffer. If all data for this message has not been * received, the request is set up to receive the next data to arrive. * In turn, this request is attached to a virtual connection. * * buflen is an I/O parameter. The length of the received data is * passed in. The function returns the number of bytes actually * processed by this function. * * complete is an OUTPUT variable. It is set to TRUE iff all of the * data for the request has been received. This function does not * actually complete the request. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Receive_data_found#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Receive_data_found(MPID_Request *rreq, char *buf, MPIDI_msg_sz_t *buflen, int *complete){ int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t userbuf_sz; MPID_Datatype * dt_ptr = NULL; MPIDI_msg_sz_t data_sz; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_FOUND); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_FOUND); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"posted request found"); MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, userbuf_sz, dt_ptr, dt_true_lb); if (rreq->dev.recv_data_sz <= userbuf_sz) { data_sz = rreq->dev.recv_data_sz; } else { MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST, "receive buffer too small; message truncated, msg_sz=" MPIDI_MSG_SZ_FMT ", userbuf_sz=" MPIDI_MSG_SZ_FMT, rreq->dev.recv_data_sz, userbuf_sz)); rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE, "**truncate", "**truncate %d %d %d %d", rreq->status.MPI_SOURCE, rreq->status.MPI_TAG, rreq->dev.recv_data_sz, userbuf_sz ); rreq->status.count = userbuf_sz; data_sz = userbuf_sz; } if (dt_contig && data_sz == rreq->dev.recv_data_sz) { /* user buffer is contiguous and large enough to store the entire message. However, we haven't yet *read* the data (this code describes how to read the data into the destination) */ /* if all of the data has already been received, unpack it now, otherwise build an iov and let the channel unpack */ if (*buflen >= data_sz) { MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"Copying contiguous data to user buffer"); /* copy data out of the receive buffer */ memcpy((char*)(rreq->dev.user_buf) + dt_true_lb, buf, data_sz); *buflen = data_sz; *complete = TRUE; } else { MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"IOV loaded for contiguous read"); rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char*)(rreq->dev.user_buf) + dt_true_lb); rreq->dev.iov[0].MPID_IOV_LEN = data_sz; rreq->dev.iov_count = 1; *buflen = 0; *complete = FALSE; } /* FIXME: We want to set the OnDataAvail to the appropriate function, which depends on whether this is an RMA request or a pt-to-pt request. */ rreq->dev.OnDataAvail = 0; } else { /* user buffer is not contiguous or is too small to hold the entire message */ rreq->dev.segment_ptr = MPID_Segment_alloc( ); /* if (!rreq->dev.segment_ptr) { MPIU_ERR_POP(); } */ MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, rreq->dev.segment_ptr, 0); rreq->dev.segment_first = 0; rreq->dev.segment_size = data_sz; /* if all of the data has already been received, and the message is not truncated, unpack it now, otherwise build an iov and let the channel unpack */ if (data_sz == rreq->dev.recv_data_sz && *buflen >= data_sz) { MPIDI_msg_sz_t last; MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"Copying noncontiguous data to user buffer"); last = data_sz; MPID_Segment_unpack(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, buf); /* --BEGIN ERROR HANDLING-- */ if (last != data_sz) { /* If the data can't be unpacked, the we have a mismatch between the datatype and the amount of data received. Throw away received data. */ MPIU_ERR_SET(rreq->status.MPI_ERROR, MPI_ERR_TYPE, "**dtypemismatch"); rreq->status.count = (int)rreq->dev.segment_first; *buflen = data_sz; *complete = TRUE; /* FIXME: Set OnDataAvail to 0? If not, why not? */ goto fn_exit; } /* --END ERROR HANDLING-- */ *buflen = data_sz; rreq->dev.OnDataAvail = 0; *complete = TRUE; } else { MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"IOV loaded for non-contiguous read"); mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETFATALANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|loadrecviov"); } *buflen = 0; *complete = FALSE; } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_FOUND); return mpi_errno;fn_fail: goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Receive_data_unexpected#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Receive_data_unexpected(MPID_Request * rreq, char *buf, MPIDI_msg_sz_t *buflen, int *complete){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_UNEXPECTED); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_UNEXPECTED); /* FIXME: to improve performance, allocate temporary buffer from a specialized buffer pool. */ /* FIXME: to avoid memory exhaustion, integrate buffer pool management with flow control */ MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"unexpected request allocated"); rreq->dev.tmpbuf = MPIU_Malloc(rreq->dev.recv_data_sz); if (!rreq->dev.tmpbuf) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem"); } rreq->dev.tmpbuf_sz = rreq->dev.recv_data_sz; /* if all of the data has already been received, copy it now, otherwise build an iov and let the channel copy it */ if (rreq->dev.recv_data_sz <= *buflen) { memcpy(rreq->dev.tmpbuf, buf, rreq->dev.recv_data_sz); *buflen = rreq->dev.recv_data_sz; rreq->dev.recv_pending_count = 1; *complete = TRUE; } else { rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *)rreq->dev.tmpbuf); rreq->dev.iov[0].MPID_IOV_LEN = rreq->dev.recv_data_sz; rreq->dev.iov_count = 1; rreq->dev.recv_pending_count = 2; *buflen = 0; *complete = FALSE; } rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_UnpackUEBufComplete; fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_UNEXPECTED); return mpi_errno;}/* * This function is used to post a receive operation on a request for the * next data to arrive. In turn, this request is attached to a virtual * connection. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Post_data_receive_found#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Post_data_receive_found(MPID_Request * rreq){ int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t userbuf_sz; MPID_Datatype * dt_ptr = NULL; MPIDI_msg_sz_t data_sz; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_FOUND); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_FOUND); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"posted request found"); MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, userbuf_sz, dt_ptr, dt_true_lb); if (rreq->dev.recv_data_sz <= userbuf_sz) { data_sz = rreq->dev.recv_data_sz; } else { MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST, "receive buffer too small; message truncated, msg_sz=" MPIDI_MSG_SZ_FMT ", userbuf_sz=" MPIDI_MSG_SZ_FMT, rreq->dev.recv_data_sz, userbuf_sz)); rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE, "**truncate", "**truncate %d %d %d %d", rreq->status.MPI_SOURCE, rreq->status.MPI_TAG, rreq->dev.recv_data_sz, userbuf_sz ); rreq->status.count = userbuf_sz; data_sz = userbuf_sz; } if (dt_contig && data_sz == rreq->dev.recv_data_sz) { /* user buffer is contiguous and large enough to store the entire message. However, we haven't yet *read* the data (this code describes how to read the data into the destination) */ MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"IOV loaded for contiguous read"); rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char*)(rreq->dev.user_buf) + dt_true_lb); rreq->dev.iov[0].MPID_IOV_LEN = data_sz; rreq->dev.iov_count = 1; /* FIXME: We want to set the OnDataAvail to the appropriate function, which depends on whether this is an RMA request or a pt-to-pt request. */ rreq->dev.OnDataAvail = 0; } else { /* user buffer is not contiguous or is too small to hold the entire message */ int mpi_errno; MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"IOV loaded for non-contiguous read"); rreq->dev.segment_ptr = MPID_Segment_alloc( ); /* if (!rreq->dev.segment_ptr) { MPIU_ERR_POP(); } */ MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, rreq->dev.segment_ptr, 0); rreq->dev.segment_first = 0; rreq->dev.segment_size = data_sz; mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETFATALANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|loadrecviov"); } }fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_FOUND); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Post_data_receive_unexpected#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Post_data_receive_unexpected(MPID_Request * rreq){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_UNEXPECTED); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_UNEXPECTED); /* FIXME: to improve performance, allocate temporary buffer from a specialized buffer pool. */ /* FIXME: to avoid memory exhaustion, integrate buffer pool management with flow control */ MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"unexpected request allocated"); rreq->dev.tmpbuf = MPIU_Malloc(rreq->dev.recv_data_sz); if (!rreq->dev.tmpbuf) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem"); } rreq->dev.tmpbuf_sz = rreq->dev.recv_data_sz; rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rreq->dev.tmpbuf; rreq->dev.iov[0].MPID_IOV_LEN = rreq->dev.recv_data_sz; rreq->dev.iov_count = 1; rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_UnpackUEBufComplete; rreq->dev.recv_pending_count = 2; fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_UNEXPECTED); return mpi_errno;}/* Check if requested lock can be granted. If it can, set win_ptr->current_lock_type to the new lock type and return 1. Else return 0. FIXME: MT: This function must be atomic because two threads could be trying to do the same thing, e.g., the main thread in MPI_Win_lock(source=target) and another thread in the progress engine. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -