nt_ipvishm_nrndv.c

来自「MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程」· C语言 代码 · 共 631 行 · 第 1/2 页

C
631
字号
#include "mpid.h"#include "mpiddev.h"#include "mpimem.h"#include "reqalloc.h"#include "flow.h"#include "chpackflow.h"/* NonBlocking Rendezvous *//* Prototype definitions */int MPID_NT_Rndvn_send ANSI_ARGS((void *, int, int, int, int, int, 								 MPID_Msgrep_t));int MPID_NT_Rndvn_isend ANSI_ARGS((void *, int, int, int, int, int, 								  MPID_Msgrep_t, MPIR_SHANDLE *));int MPID_NT_Rndvn_irecv ANSI_ARGS((MPIR_RHANDLE *, int, void *));int MPID_NT_Rndvn_save ANSI_ARGS((MPIR_RHANDLE *, int, void *));int MPID_NT_Rndvn_unxrecv_start ANSI_ARGS((MPIR_RHANDLE *, void *));int MPID_NT_Rndvn_unxrecv_end ANSI_ARGS((MPIR_RHANDLE *));int MPID_NT_Rndvn_unxrecv_test_end ANSI_ARGS((MPIR_RHANDLE *));int MPID_NT_Rndvn_ok_to_send  ANSI_ARGS((MPID_Aint, MPID_RNDV_T, int));int MPID_NT_Rndvn_ack ANSI_ARGS((void *, int));int MPID_NT_Rndvn_send_test ANSI_ARGS((MPIR_SHANDLE *));int MPID_NT_Rndvn_send_wait ANSI_ARGS((MPIR_SHANDLE *));int MPID_NT_Rndvn_send_test_ack ANSI_ARGS((MPIR_SHANDLE *));int MPID_NT_Rndvn_send_wait_ack ANSI_ARGS((MPIR_SHANDLE *));void MPID_NT_Rndvn_delete ANSI_ARGS((MPID_Protocol *));int MPID_NT_Rndvn_save_self ANSI_ARGS((MPIR_RHANDLE *rhandle,int from,void *in_pkt));int MPID_NT_Rndvn_unxrecv_start_self ANSI_ARGS((MPIR_RHANDLE *rhandle,void *in_runex));/* Globals for this protocol *//* This should be state in the protocol/device ?? */static int CurTag    = 1024;static int TagsInUse = 0;/** This is really the same as the blocking version, since the * nonblocking operations occur only in the data transmission.*/int MPID_NT_Rndvn_isend(						void *buf, 						int len, 						int src_lrank, 						int tag, 						int context_id, 						int dest,						MPID_Msgrep_t msgrep, 						MPIR_SHANDLE  *shandle){    MPID_PKT_REQUEST_SEND_T  pkt;        DEBUG_PRINT_MSG("S Starting Rndvn_isend");#ifdef MPID_PACK_CONTROL    while (!MPID_PACKET_CHECK_OK(dest)) 	{  		MPID_DeviceCheck(MPID_BLOCKING);    }	    MPID_PACKET_ADD_SENT(MPID_MyWorldRank, dest)#endif			pkt.mode	   = MPID_PKT_REQUEST_SEND;    pkt.context_id = context_id;    pkt.lrank	   = src_lrank;    pkt.to         = dest;	pkt.src        = MPID_MyWorldRank; // <----------------------------    pkt.seqnum     = sizeof(MPID_PKT_REQUEST_SEND_T);    pkt.tag		   = tag;    pkt.len	       = len;    MPID_DO_HETERO(pkt.msgrep = (int)msgrep);	    MPID_AINT_SET(pkt.send_id, shandle);	    /* Store info in the request for completing the message */    shandle->is_complete     = 0;    shandle->start	         = buf;    shandle->bytes_as_contig = len;    /* Set the test/wait functions */    shandle->wait	         = MPID_NT_Rndvn_send_wait_ack;    shandle->test            = MPID_NT_Rndvn_send_test_ack;	shandle->finish          = 0; // <----------------------------    /* Store partners rank in request in case message is cancelled */    shandle->partner         = dest;    /* shandle->finish must NOT be set here; it must be cleared/set	when the request is created */    DEBUG_PRINT_BASIC_SEND_PKT("S Sending rndv message",&pkt)	MPID_PKT_PACK(&pkt, sizeof(pkt), dest);    MPID_DRAIN_INCOMING_FOR_TINY(1);    MPID_n_pending++;    MPID_SendControlBlock(&pkt, sizeof(pkt), dest);	    return MPI_SUCCESS;}int MPID_NT_Rndvn_send(					   void *buf, 					   int len, 					   int src_lrank, 					   int tag, 					   int context_id, 					   int dest,					   MPID_Msgrep_t msgrep){    MPIR_SHANDLE shandle;	    DEBUG_INIT_STRUCT(&shandle, sizeof(shandle));    MPIR_SET_COOKIE((&shandle), MPIR_REQUEST_COOKIE);    MPID_SendInit(&shandle);    MPID_NT_Rndvn_isend(buf, len, src_lrank, tag, context_id, dest,	msgrep, &shandle);    DEBUG_TEST_FCN(shandle.wait,"req->wait");    shandle.wait(&shandle);    return MPI_SUCCESS;}/** This is the routine called when a packet of type MPID_PKT_REQUEST_SEND is* seen and the receive has been posted.  Note the use of a nonblocking* receiver BEFORE sending the ack.*/int MPID_NT_Rndvn_irecv(						MPIR_RHANDLE *rhandle, 						int          from, 						void         *in_pkt){    MPID_PKT_REQUEST_SEND_T *pkt = (MPID_PKT_REQUEST_SEND_T *)in_pkt;    int    msglen, err = MPI_SUCCESS;#if defined(MPID_RNDV_SELF)	MPIR_SHANDLE *shandle;#endif    MPID_RNDV_T rtag;	#ifdef MPID_PACK_CONTROL    if (MPID_PACKET_RCVD_GET(pkt->src)) 	{		MPID_SendProtoAck(pkt->to, pkt->src);    }    MPID_PACKET_ADD_RCVD(pkt->to, pkt->src);#endif	    DEBUG_PRINT_MSG("R Starting rndvn irecv");		/* A request packet is a little larger than the basic packet size and 	may need to be unpacked (in the heterogeneous case) */	MPID_PKT_UNPACK((MPID_PKT_HEAD_T *)in_pkt + 1, 		sizeof(MPID_PKT_REQUEST_SEND_T) - sizeof(MPID_PKT_HEAD_T),		from);	    msglen = pkt->len;    /* Check for truncation */    MPID_CHK_MSGLEN(rhandle, msglen, err)    /* Note that if we truncate, We really must receive the message in two 	parts; the part that we can store, and the part that we discard.	This case is not yet handled. */	MPIR_SET_COOKIE((rhandle), MPIR_REQUEST_COOKIE);     rhandle->s.count	  = msglen;    rhandle->s.MPI_TAG	  = pkt->tag;    rhandle->s.MPI_SOURCE = pkt->lrank;    rhandle->s.MPI_ERROR  = err;    rhandle->send_id      = pkt->send_id;	rhandle->from         = from; // <----------------------------#if defined(MPID_RNDV_SELF)	if (from == MPID_MyWorldRank) 	{		DEBUG_PRINT_MSG("R Starting a receive transfer from self");		MPID_AINT_GET(shandle, pkt->send_id);#ifdef MPIR_HAS_COOKIES		if (shandle->cookie != MPIR_REQUEST_COOKIE) 		{			FPRINTF(stderr, "shandle is %lx\n", (long)shandle);			FPRINTF(stderr, "shandle cookie is %lx\n", shandle->cookie);			MPID_Print_shandle(stderr, shandle);			MPID_Abort((struct MPIR_COMMUNICATOR *)0, 1, "MPI internal", 				"Bad address in Rendezvous send (irecv-self)");		}#endif			/* Copy directly from the shandle */		MEMCPY(rhandle->buf, shandle->start, shandle->bytes_as_contig);				shandle->is_complete = 1;		if (shandle->finish) 			(shandle->finish)(shandle);		MPID_n_pending--;				/* Update all of the rhandle information */		rhandle->wait	 = 0;		rhandle->test	 = 0;		rhandle->push	 = 0;				rhandle->is_complete = 1;		if (rhandle->finish) 			(rhandle->finish)(rhandle);		return err;	}#endif	rhandle->send_id = pkt->send_id;#ifdef MPID_PACK_CONTROL    while (!MPID_PACKET_CHECK_OK(from)) 	{  		/* begin while !ok loop */		/* Wait for a protocol ACK packet */		MPID_DeviceCheck(MPID_BLOCKING);    }  /* end while !ok loop */	    MPID_PACKET_ADD_SENT(pkt->to, from)#endif		    DEBUG_PRINT_MSG("Starting a nonblocking receive transfer");	MPID_CreateRecvTransfer(0, 0, from, &rtag);    MPID_StartNBRecvTransfer(rhandle->buf, rhandle->len, from, rtag, rhandle, rhandle->rid);    MPID_NT_Rndvn_ok_to_send(rhandle->send_id, rtag, from);    rhandle->recv_handle = rtag;    rhandle->wait	 = MPID_NT_Rndvn_unxrecv_end;    rhandle->test	 = MPID_NT_Rndvn_unxrecv_test_end;    rhandle->push	 = 0;    /* Must NOT zero finish in case it has already been set */	//rhandle->from        = from; // <----------------------------    rhandle->is_complete = 0;        return err;}/* Save an unexpected message in rhandle.  This is the same asMPID_NT_Rndvb_save except for the "push" function */int MPID_NT_Rndvn_save(					   MPIR_RHANDLE *rhandle, 					   int          from, 					   void         *in_pkt){    MPID_PKT_REQUEST_SEND_T   *pkt = (MPID_PKT_REQUEST_SEND_T *)in_pkt;		/* A request packet is a little larger than the basic packet size and 	may need to be unpacked (in the heterogeneous case) */	MPID_PKT_UNPACK((MPID_PKT_HEAD_T *)in_pkt + 1, 		sizeof(MPID_PKT_REQUEST_SEND_T) - sizeof(MPID_PKT_HEAD_T),		from);    DEBUG_PRINT_MSG("Saving info on unexpected message");#ifdef MPID_PACK_CONTROL	if (MPID_PACKET_RCVD_GET(pkt->src)) 	{		MPID_SendProtoAck(pkt->to, pkt->src);	}	MPID_PACKET_ADD_RCVD(pkt->to, pkt->src);#endif	#if defined(MPID_RNDV_SELF)	if (from == MPID_MyWorldRank) 	{		return MPID_NT_Rndvn_save_self(rhandle, from, in_pkt);	}#endif    rhandle->s.MPI_TAG	  = pkt->tag;    rhandle->s.MPI_SOURCE = pkt->lrank;    rhandle->s.MPI_ERROR  = 0;    rhandle->s.count      = pkt->len;    rhandle->is_complete  = 0;    rhandle->from         = from;    rhandle->partner      = pkt->to;    rhandle->send_id      = pkt->send_id;    MPID_DO_HETERO(rhandle->msgrep = (MPID_Msgrep_t)pkt->msgrep);    /* Need to set the push etc routine to complete this transfer */    rhandle->push         = MPID_NT_Rndvn_unxrecv_start;    return 0;}/** This is an internal routine to return an OK TO SEND packet.* It is the same as the Rndvb version*/int MPID_NT_Rndvn_ok_to_send(							 MPID_Aint   send_id, 							 MPID_RNDV_T rtag, 							 int         from){    MPID_PKT_OK_TO_SEND_T pkt;	    pkt.mode   = MPID_PKT_OK_TO_SEND;    pkt.lrank  = MPID_MyWorldRank;    pkt.to     = from;	pkt.src    = MPID_MyWorldRank; // <----------------------------    pkt.seqnum = sizeof(MPID_PKT_OK_TO_SEND_T);    MPID_AINT_SET(pkt.send_id, send_id);    pkt.recv_handle = rtag;    DEBUG_PRINT_BASIC_SEND_PKT("S Ok send", &pkt);    MPID_PKT_PACK(&pkt, sizeof(pkt), from);    MPID_SendControlBlock(&pkt, sizeof(pkt), from);    return MPI_SUCCESS;}/* * This routine is called when it is time to receive an unexpected* message.  Note that we start a nonblocking receive FIRST.*/int MPID_NT_Rndvn_unxrecv_start(								MPIR_RHANDLE *rhandle, 								void         *in_runex){    MPIR_RHANDLE *runex = (MPIR_RHANDLE *)in_runex;    MPID_RNDV_T rtag;/*#ifdef MPID_PACK_CONTROL    if (MPID_PACKET_RCVD_GET(runex->from)) 	{		MPID_SendProtoAck(runex->partner, runex->from);    }    MPID_PACKET_ADD_RCVD(runex->partner, runex->from);#endif//*/	#ifdef MPID_PACK_CONTROL    while (!MPID_PACKET_CHECK_OK(runex->from)) 	{  		/* begin while !ok loop */		/* Wait for a protocol ACK packet */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?