⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3u_handle_recv_pkt.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidimpl.h"#include "mpidrma.h"/* * This file contains the dispatch routine called by the ch3 progress  * engine to process messages.   * * This file is in transistion * * Where possible, the routines that create and send all packets of * a particular type are in the same file that contains the implementation  * of the handlers for that packet type (for example, the CancelSend  * packets are created and processed by routines in ch3/src/mpid_cancel_send.c) * This makes is easier to replace or modify functionality within  * the ch3 device. */#define set_request_info(rreq_, pkt_, msg_type_)		\{								\    (rreq_)->status.MPI_SOURCE = (pkt_)->match.rank;		\    (rreq_)->status.MPI_TAG = (pkt_)->match.tag;		\    (rreq_)->status.count = (pkt_)->data_sz;			\    (rreq_)->dev.sender_req_id = (pkt_)->sender_req_id;		\    (rreq_)->dev.recv_data_sz = (pkt_)->data_sz;		\    MPIDI_Request_set_seqnum((rreq_), (pkt_)->seqnum);		\    MPIDI_Request_set_msg_type((rreq_), (msg_type_));		\}/* * MPIDI_CH3U_Handle_recv_pkt() * * NOTE: Multiple threads may NOT simultaneously call this routine with the  * same VC.  This constraint eliminates the need to * lock the VC.  If simultaneous upcalls are a possible, the calling routine  * for serializing the calls. *//* This code and definition is used to allow us to provide a routine   that handles packets that are received out-of-order.  However, we    currently do not support that in the CH3 device. */#define MPIDI_CH3U_Handle_ordered_recv_pkt MPIDI_CH3U_Handle_recv_pkt #undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_ordered_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_ordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, 				       MPIDI_msg_sz_t *buflen, MPID_Request ** rreqp){    int mpi_errno = MPI_SUCCESS;    static MPIDI_CH3_PktHandler_Fcn *pktArray[MPIDI_CH3_PKT_END_CH3+1];    static int needsInit = 1;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT);    MPIU_DBG_STMT(CH3_OTHER,VERBOSE,MPIDI_DBG_Print_packet(pkt));    /* FIXME: We can turn this into something like       MPIU_Assert(pkt->type >= 0 && pkt->type <= MAX_PACKET_TYPE);       mpi_errno = MPIDI_CH3_ProgressFunctions[pkt->type](vc,pkt,rreqp);              in the progress engine itself.  Then this routine is not necessary.    */    if (needsInit) {	MPIDI_CH3_PktHandler_Init( pktArray, MPIDI_CH3_PKT_END_CH3 );	needsInit = 0;    }    MPIU_Assert(pkt->type  >= 0 && pkt->type <= MPIDI_CH3_PKT_END_CH3);    mpi_errno = pktArray[pkt->type](vc, pkt, buflen, rreqp);    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT);    return mpi_errno;}/*  * This function is used to receive data from the receive buffer to * the user buffer.  If all data for this message has not been * received, the request is set up to receive the next data to arrive. * In turn, this request is attached to a virtual connection. * * buflen is an I/O parameter.  The length of the received data is * passed in.  The function returns the number of bytes actually * processed by this function. * * complete is an OUTPUT variable.  It is set to TRUE iff all of the * data for the request has been received.  This function does not * actually complete the request. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Receive_data_found#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Receive_data_found(MPID_Request *rreq, char *buf, MPIDI_msg_sz_t *buflen, int *complete){    int dt_contig;    MPI_Aint dt_true_lb;    MPIDI_msg_sz_t userbuf_sz;    MPID_Datatype * dt_ptr = NULL;    MPIDI_msg_sz_t data_sz;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_FOUND);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_FOUND);    MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"posted request found");	    MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, 			    dt_contig, userbuf_sz, dt_ptr, dt_true_lb);		    if (rreq->dev.recv_data_sz <= userbuf_sz) {	data_sz = rreq->dev.recv_data_sz;    }    else {	MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,               "receive buffer too small; message truncated, msg_sz=" MPIDI_MSG_SZ_FMT ", userbuf_sz="					    MPIDI_MSG_SZ_FMT,				 rreq->dev.recv_data_sz, userbuf_sz));	rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS,                      MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE,		     "**truncate", "**truncate %d %d %d %d", 		     rreq->status.MPI_SOURCE, rreq->status.MPI_TAG, 		     rreq->dev.recv_data_sz, userbuf_sz );	rreq->status.count = userbuf_sz;	data_sz = userbuf_sz;    }    if (dt_contig && data_sz == rreq->dev.recv_data_sz)    {	/* user buffer is contiguous and large enough to store the	   entire message.  However, we haven't yet *read* the data 	   (this code describes how to read the data into the destination) */        /* if all of the data has already been received, unpack it           now, otherwise build an iov and let the channel unpack */        if (*buflen >= data_sz)        {            MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"Copying contiguous data to user buffer");            /* copy data out of the receive buffer */            memcpy((char*)(rreq->dev.user_buf) + dt_true_lb, buf, data_sz);            *buflen = data_sz;            *complete = TRUE;        }        else        {            MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"IOV loaded for contiguous read");                        rreq->dev.iov[0].MPID_IOV_BUF =                 (MPID_IOV_BUF_CAST)((char*)(rreq->dev.user_buf) + dt_true_lb);            rreq->dev.iov[0].MPID_IOV_LEN = data_sz;            rreq->dev.iov_count = 1;            *buflen = 0;            *complete = FALSE;        }        	/* FIXME: We want to set the OnDataAvail to the appropriate 	   function, which depends on whether this is an RMA 	   request or a pt-to-pt request. */	rreq->dev.OnDataAvail = 0;    }    else {	/* user buffer is not contiguous or is too small to hold	   the entire message */        	rreq->dev.segment_ptr = MPID_Segment_alloc( );	/* if (!rreq->dev.segment_ptr) { MPIU_ERR_POP(); } */ 	MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, 			  rreq->dev.datatype, rreq->dev.segment_ptr, 0);	rreq->dev.segment_first = 0;	rreq->dev.segment_size  = data_sz;        /* if all of the data has already been received, and the           message is not truncated, unpack it now, otherwise build an           iov and let the channel unpack */        if (data_sz == rreq->dev.recv_data_sz && *buflen >= data_sz)        {            MPIDI_msg_sz_t last;            MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"Copying noncontiguous data to user buffer");            last = data_sz;            MPID_Segment_unpack(rreq->dev.segment_ptr, rreq->dev.segment_first, 				&last, buf);            /* --BEGIN ERROR HANDLING-- */            if (last != data_sz)            {                /* If the data can't be unpacked, the we have a                   mismatch between the datatype and the amount of                   data received.  Throw away received data. */                MPIU_ERR_SET(rreq->status.MPI_ERROR, MPI_ERR_TYPE, "**dtypemismatch");                rreq->status.count = (int)rreq->dev.segment_first;                *buflen = data_sz;                *complete = TRUE;		/* FIXME: Set OnDataAvail to 0?  If not, why not? */                goto fn_exit;            }            /* --END ERROR HANDLING-- */            *buflen = data_sz;            rreq->dev.OnDataAvail = 0;            *complete = TRUE;        }        else        {               MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"IOV loaded for non-contiguous read");            mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);            if (mpi_errno != MPI_SUCCESS) {                MPIU_ERR_SETFATALANDJUMP(mpi_errno,MPI_ERR_OTHER,                                         "**ch3|loadrecviov");            }            *buflen = 0;            *complete = FALSE;        }    } fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_FOUND);    return mpi_errno;fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Receive_data_unexpected#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Receive_data_unexpected(MPID_Request * rreq, char *buf, MPIDI_msg_sz_t *buflen, int *complete){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_UNEXPECTED);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_UNEXPECTED);    /* FIXME: to improve performance, allocate temporary buffer from a        specialized buffer pool. */    /* FIXME: to avoid memory exhaustion, integrate buffer pool management       with flow control */    MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"unexpected request allocated");        rreq->dev.tmpbuf = MPIU_Malloc(rreq->dev.recv_data_sz);    if (!rreq->dev.tmpbuf) {	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");    }    rreq->dev.tmpbuf_sz = rreq->dev.recv_data_sz;        /* if all of the data has already been received, copy it       now, otherwise build an iov and let the channel copy it */    if (rreq->dev.recv_data_sz <= *buflen)    {        memcpy(rreq->dev.tmpbuf, buf, rreq->dev.recv_data_sz);        *buflen = rreq->dev.recv_data_sz;        rreq->dev.recv_pending_count = 1;        *complete = TRUE;    }    else    {        rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *)rreq->dev.tmpbuf);        rreq->dev.iov[0].MPID_IOV_LEN = rreq->dev.recv_data_sz;        rreq->dev.iov_count = 1;        rreq->dev.recv_pending_count = 2;        *buflen = 0;        *complete = FALSE;    }    rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_UnpackUEBufComplete; fn_fail:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECEIVE_DATA_UNEXPECTED);    return mpi_errno;}/*  * This function is used to post a receive operation on a request for the  * next data to arrive.  In turn, this request is attached to a virtual * connection. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Post_data_receive_found#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Post_data_receive_found(MPID_Request * rreq){    int dt_contig;    MPI_Aint dt_true_lb;    MPIDI_msg_sz_t userbuf_sz;    MPID_Datatype * dt_ptr = NULL;    MPIDI_msg_sz_t data_sz;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_FOUND);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_FOUND);    MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"posted request found");	    MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, 			    dt_contig, userbuf_sz, dt_ptr, dt_true_lb);		    if (rreq->dev.recv_data_sz <= userbuf_sz) {	data_sz = rreq->dev.recv_data_sz;    }    else {	MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,               "receive buffer too small; message truncated, msg_sz=" MPIDI_MSG_SZ_FMT ", userbuf_sz="					    MPIDI_MSG_SZ_FMT,				 rreq->dev.recv_data_sz, userbuf_sz));	rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS,                      MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE,		     "**truncate", "**truncate %d %d %d %d", 		     rreq->status.MPI_SOURCE, rreq->status.MPI_TAG, 		     rreq->dev.recv_data_sz, userbuf_sz );	rreq->status.count = userbuf_sz;	data_sz = userbuf_sz;    }    if (dt_contig && data_sz == rreq->dev.recv_data_sz)    {	/* user buffer is contiguous and large enough to store the	   entire message.  However, we haven't yet *read* the data 	   (this code describes how to read the data into the destination) */	MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"IOV loaded for contiguous read");	rreq->dev.iov[0].MPID_IOV_BUF = 	    (MPID_IOV_BUF_CAST)((char*)(rreq->dev.user_buf) + dt_true_lb);	rreq->dev.iov[0].MPID_IOV_LEN = data_sz;	rreq->dev.iov_count = 1;	/* FIXME: We want to set the OnDataAvail to the appropriate 	   function, which depends on whether this is an RMA 	   request or a pt-to-pt request. */	rreq->dev.OnDataAvail = 0;    }    else {	/* user buffer is not contiguous or is too small to hold	   the entire message */	int mpi_errno;		MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"IOV loaded for non-contiguous read");	rreq->dev.segment_ptr = MPID_Segment_alloc( );	/* if (!rreq->dev.segment_ptr) { MPIU_ERR_POP(); } */	MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, 			  rreq->dev.datatype, rreq->dev.segment_ptr, 0);	rreq->dev.segment_first = 0;	rreq->dev.segment_size = data_sz;	mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_SETFATALANDJUMP(mpi_errno,MPI_ERR_OTHER,				     "**ch3|loadrecviov");	}    }fn_fail:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_FOUND);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Post_data_receive_unexpected#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Post_data_receive_unexpected(MPID_Request * rreq){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_UNEXPECTED);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_UNEXPECTED);    /* FIXME: to improve performance, allocate temporary buffer from a        specialized buffer pool. */    /* FIXME: to avoid memory exhaustion, integrate buffer pool management       with flow control */    MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"unexpected request allocated");        rreq->dev.tmpbuf = MPIU_Malloc(rreq->dev.recv_data_sz);    if (!rreq->dev.tmpbuf) {	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");    }    rreq->dev.tmpbuf_sz = rreq->dev.recv_data_sz;        rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rreq->dev.tmpbuf;    rreq->dev.iov[0].MPID_IOV_LEN = rreq->dev.recv_data_sz;    rreq->dev.iov_count = 1;    rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_UnpackUEBufComplete;    rreq->dev.recv_pending_count = 2; fn_fail:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE_UNEXPECTED);    return mpi_errno;}/* Check if requested lock can be granted. If it can, set    win_ptr->current_lock_type to the new lock type and return 1. Else return 0.   FIXME: MT: This function must be atomic because two threads could be trying    to do the same thing, e.g., the main thread in MPI_Win_lock(source=target)    and another thread in the progress engine. */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -