⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3u_handle_recv_pkt.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidimpl.h"#define set_request_info(rreq_, pkt_, msg_type_)		\{								\    (rreq_)->status.MPI_SOURCE = (pkt_)->match.rank;		\    (rreq_)->status.MPI_TAG = (pkt_)->match.tag;		\    (rreq_)->status.count = (pkt_)->data_sz;			\    (rreq_)->dev.sender_req_id = (pkt_)->sender_req_id;		\    (rreq_)->dev.recv_data_sz = (pkt_)->data_sz;		\    MPIDI_Request_set_seqnum((rreq_), (pkt_)->seqnum);		\    MPIDI_Request_set_msg_type((rreq_), (msg_type_));		\}#ifdef MPIDI_CH3_CHANNEL_RNDV#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_recv_rndv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_recv_rndv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, MPID_Request ** rreqp, int *foundp){    int mpi_errno = MPI_SUCCESS;    MPID_Request *rreq;    MPIDI_CH3_Pkt_rndv_req_to_send_t * rts_pkt = &pkt->rndv_req_to_send;    rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&rts_pkt->match, foundp);    if (rreq == NULL) {	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq");    }    set_request_info(rreq, rts_pkt, MPIDI_REQUEST_RNDV_MSG);    if (!*foundp)    {	MPIDI_DBG_PRINTF((30, FCNAME, "unexpected request allocated"));	MPID_Request_initialized_set(rreq);	/*	 * An MPID_Probe() may be waiting for the request we just inserted, 	 * so we need to tell the progress engine to exit.	 *	 * FIXME: This will cause MPID_Progress_wait() to return to the MPI 	 * layer each time an unexpected RTS packet is	 * received.  MPID_Probe() should atomically increment a counter and 	 * MPIDI_CH3_Progress_signal_completion()	 * should only be called if that counter is greater than zero.	 */	MPIDI_CH3_Progress_signal_completion();    }    /* return the request */    *rreqp = rreq; fn_fail:    return mpi_errno;}#endif/* * MPIDI_CH3U_Handle_recv_pkt() * * NOTE: Multiple threads may NOT simultaneously call this routine with the same VC.  This constraint eliminates the need to * lock the VC.  If simultaneous upcalls are a possible, the calling routine for serializing the calls. */int MPIDI_CH3U_Handle_unordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt);int MPIDI_CH3U_Handle_ordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt);#if defined(MPIDI_CH3_MSGS_UNORDERED)#define MPIDI_CH3U_Handle_unordered_recv_pkt MPIDI_CH3U_Handle_recv_pkt#else#define MPIDI_CH3U_Handle_ordered_recv_pkt MPIDI_CH3U_Handle_recv_pkt #endif#if defined(MPIDI_CH3_MSGS_UNORDERED)#define MPIDI_CH3U_Pkt_send_container_alloc() (MPIU_Malloc(sizeof(MPIDI_CH3_Pkt_send_container_t)))#define MPIDI_CH3U_Pkt_send_container_free(pc_) MPIU_Free(pc_)#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_unordered_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_unordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, MPID_Request ** rreqp){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_UNORDERED_RECV_PKT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_UNORDERED_RECV_PKT);    MPIDI_DBG_PRINTF((10, FCNAME, "entering"));    rreqp = NULL;        switch(pkt->type)    {	case MPIDI_CH3_PKT_EAGER_SEND:	case MPIDI_CH3_PKT_EAGER_SYNC_SEND:	case MPIDI_CH3_PKT_READY_SEND:	case MPIDI_CH3_PKT_RNDV_REQ_TO_SEND:	{	    MPIDI_CH3_Pkt_send_t * send_pkt = (MPIDI_CH3_Pkt_send_t *) pkt;	    MPIDI_CH3_Pkt_send_container_t * pc_cur;	    MPIDI_CH3_Pkt_send_container_t * pc_last;	    	    MPIDI_DBG_PRINTF((30, FCNAME, "received (potentially) out-of-order send pkt"));	    MPIDI_DBG_PRINTF((30, FCNAME, "rank=%d, tag=%d, context=%d seqnum=%d",			      send_pkt->match.rank, send_pkt->match.tag, send_pkt->match.context_id, send_pkt->seqnum));	    MPIDI_DBG_PRINTF((30, FCNAME, "vc - seqnum_send=%d seqnum_recv=%d reorder_msg_queue=0x%08lx",			      vc->seqnum_send, vc->seqnum_recv, (unsigned long) vc->msg_reorder_queue));	    	    if (send_pkt->seqnum == vc->seqnum_recv)	    {		mpi_errno = MPIDI_CH3U_Handle_ordered_recv_pkt(vc, pkt, rreqp);		/* --BEGIN ERROR HANDLING-- */		if (mpi_errno != MPI_SUCCESS)		{		    goto fn_exit;		}		/* --END ERROR HANDLING-- */		vc->seqnum_recv++;		pc_cur = vc->msg_reorder_queue;		while(pc_cur != NULL && vc->seqnum_recv == pc_cur->pkt.seqnum)		{		    pkt = (MPIDI_CH3_Pkt_t *) &pc_cur->pkt;		    mpi_errno = MPIDI_CH3U_Handle_ordered_recv_pkt(vc, pkt, rreqp);		    /* --BEGIN ERROR HANDLING-- */		    if (mpi_errno != MPI_SUCCESS)		    {			mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,							 "**ch3|pktordered", 0);			goto fn_exit;		    }		    /* --END ERROR HANDLING-- */		    vc->seqnum_recv++;		    pc_last = pc_cur;		    pc_cur = pc_cur->next;		    MPIDI_CH3U_Pkt_send_container_free(pc_last);		}		vc->msg_reorder_queue = pc_cur;	    }	    else	    {		MPIDI_CH3_Pkt_send_container_t * pc_new;			/* allocate container and copy packet */		pc_new = MPIDI_CH3U_Pkt_send_container_alloc();		/* --BEGIN ERROR HANDLING-- */		if (pc_new == NULL)		{		    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,						     "**ch3|nopktcontainermem", 0);		    goto fn_exit;		}		/* --END ERROR HANDLING-- */		pc_new->pkt = *send_pkt;		/* insert packet into reorder queue */		pc_last = NULL;		pc_cur = vc->msg_reorder_queue;		while (pc_cur != NULL)		{		    /* the current recv seqnum is subtracted from both the seqnums prior to comparision so as to remove any wrap		       around effects. */		    if (pc_new->pkt.seqnum - vc->seqnum_recv < pc_cur->pkt.seqnum - vc->seqnum_recv)		    {			break;		    }		    pc_last = pc_cur;		    pc_cur = pc_cur->next;		}		if (pc_last == NULL)		{		    pc_new->next = pc_cur;		    vc->msg_reorder_queue = pc_new;		}		else		{		    pc_new->next = pc_cur;		    pc_last->next = pc_new;		}	    }	    break;	}	case MPIDI_CH3_PKT_CANCEL_SEND_REQ:	{	    /* --BEGIN ERROR HANDLING-- */	    /* FIXME: processing send cancel requests requires that we be aware of pkts in the reorder queue */	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER,					     "**ch3|ooocancelreq", 0);	    goto fn_exit;	    break;	    /* --END ERROR HANDLING-- */	}		default:	{	    mpi_errno = MPIDI_CH3U_Handle_ordered_recv_pkt(vc, pkt, rreqp);	    break;	}    }  fn_exit:    MPIDI_DBG_PRINTF((10, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_HANDLE_UNORDERED_RECV_PKT);    return mpi_errno;}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_ordered_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_ordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, 				       MPID_Request ** rreqp){    int type_size;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT);    MPIDI_DBG_PRINTF((10, FCNAME, "entering"));    MPIDI_DBG_Print_packet(pkt);    switch(pkt->type)    {	/* FIXME: This is not optimized for short messages, which 	   should have the data in the same packet when the data is	   particularly short (e.g., one 8 byte long word) */	case MPIDI_CH3_PKT_EAGER_SEND:	{	    MPIDI_CH3_Pkt_eager_send_t * eager_pkt = &pkt->eager_send;	    MPID_Request * rreq;	    int found;	    MPIDI_DBG_PRINTF((30, FCNAME, "received eager send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d",			      eager_pkt->sender_req_id, eager_pkt->match.rank, eager_pkt->match.tag, eager_pkt->match.context_id));	    	    rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&eager_pkt->match, &found);	    if (rreq == NULL) {		MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq");	    }	    	    set_request_info(rreq, eager_pkt, MPIDI_REQUEST_EAGER_MSG);	    *rreqp = rreq;	    /* FIXME: What is the logic here?  On an eager receive, the data	       should be available already, and we should be optimizing	       for short messages */	    mpi_errno = MPIDI_CH3U_Post_data_receive(found, rreqp);	    if (mpi_errno != MPI_SUCCESS) {		MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv",			      "**ch3|postrecv %s", "MPIDI_CH3_PKT_EAGER_SEND");	    }	    break;	}		case MPIDI_CH3_PKT_READY_SEND:	{	    MPIDI_CH3_Pkt_ready_send_t * ready_pkt = &pkt->ready_send;	    MPID_Request * rreq;	    int found;	    	    MPIDI_DBG_PRINTF((30, FCNAME, "received ready send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d",			      ready_pkt->sender_req_id, ready_pkt->match.rank, ready_pkt->match.tag, ready_pkt->match.context_id));	    	    rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&ready_pkt->match, &found);	    if (rreq == NULL) {		MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq");	    }	    	    set_request_info(rreq, ready_pkt, MPIDI_REQUEST_EAGER_MSG);	    *rreqp = rreq;	    if (found)	    {		mpi_errno = MPIDI_CH3U_Post_data_receive(TRUE, rreqp);		if (mpi_errno != MPI_SUCCESS) {		    MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, 					 "**ch3|postrecv",			      "**ch3|postrecv %s", "MPIDI_CH3_PKT_READY_SEND");		}	    }	    else	    {		/* FIXME: an error packet should be sent back to the sender indicating that the ready-send failed.  On the send                   side, the error handler for the communicator can be invoked even if the ready-send request has already                   completed. */		/* We need to consume any outstanding associated data and mark the request with an error. */		MPID_Request_initialized_set(rreq);		rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER,							      "**rsendnomatch", "**rsendnomatch %d %d", ready_pkt->match.rank,							      ready_pkt->match.tag);		rreq->status.count = 0;		if (rreq->dev.recv_data_sz > 0)		{		     /* force read of extra data */		    rreq->dev.segment_first = 0;		    rreq->dev.segment_size = 0;		    mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);		    if (mpi_errno != MPI_SUCCESS) {			MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,					    "**ch3|loadrecviov");		    }		}		else		{		    /* mark data transfer as complete and decrement CC */		    MPIDI_CH3U_Request_complete(rreq);		    *rreqp = NULL;		}	    }	    break;	}		case MPIDI_CH3_PKT_EAGER_SYNC_SEND:	{	    MPIDI_CH3_Pkt_eager_send_t * es_pkt = &pkt->eager_send;	    MPID_Request * rreq;	    int found;	    MPIDI_DBG_PRINTF((30, FCNAME, "received eager sync send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d",			      es_pkt->sender_req_id, es_pkt->match.rank, es_pkt->match.tag, es_pkt->match.context_id));	    	    rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&es_pkt->match, &found);	    if (rreq == NULL) {		MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq");	    }	    	    set_request_info(rreq, es_pkt, MPIDI_REQUEST_EAGER_MSG);	    *rreqp = rreq;	    mpi_errno = MPIDI_CH3U_Post_data_receive(found, rreqp);	    if (mpi_errno != MPI_SUCCESS) {		MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv",			 "**ch3|postrecv %s", "MPIDI_CH3_PKT_EAGER_SYNC_SEND");	    }	    	    if (found)	    {		MPIDI_CH3_Pkt_t upkt;		MPIDI_CH3_Pkt_eager_sync_ack_t * const esa_pkt = &upkt.eager_sync_ack;		MPID_Request * esa_req;		    		MPIDI_DBG_PRINTF((30, FCNAME, "sending eager sync ack"));					MPIDI_Pkt_init(esa_pkt, MPIDI_CH3_PKT_EAGER_SYNC_ACK);		esa_pkt->sender_req_id = rreq->dev.sender_req_id;		mpi_errno = MPIDI_CH3_iStartMsg(vc, esa_pkt, sizeof(*esa_pkt), &esa_req);		if (mpi_errno != MPI_SUCCESS) {		    MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,					"**ch3|syncack");		}		if (esa_req != NULL) {		    MPID_Request_release(esa_req);		}	    }	    else	    {		MPIDI_Request_set_sync_send_flag(rreq, TRUE);	    }	    	    break;	}	case MPIDI_CH3_PKT_EAGER_SYNC_ACK:	{	    MPIDI_CH3_Pkt_eager_sync_ack_t * esa_pkt = &pkt->eager_sync_ack;	    MPID_Request * sreq;	    	    MPIDI_DBG_PRINTF((30, FCNAME, "received eager sync ack pkt, sreq=0x%08x", esa_pkt->sender_req_id));	    	    MPID_Request_get_ptr(esa_pkt->sender_req_id, sreq);	    /* decrement CC (but don't mark data transfer as complete since the transfer could still be in progress) */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -