ch3u_recvq.c

来自「mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环」· C语言 代码 · 共 554 行

C
554
字号
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidimpl.h"/* FIXME:  * Are locks required here?  Not when the current code uses the "one big lock" * approach.  In addition, some routines can be implemented in a lock-free * fashion (because the user is required to guarantee non-conflicting  * accesses, such as doing a probe and a receive that matches in different  * threads).   * * There are a lot of routines here.  Do we really need them all? *  * The search criteria can be implemented in a single 64 bit compare on * systems with efficient 64-bit operations.  The rank and contextid can also * in many cases be combined into a single 32-bit word for the comparison * (in which case the message info should be stored in the queue in a  * naturally aligned, 64-bit word. *  */#if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED)#define MPIDI_Recvq_lock() MPID_Thread_lock(&MPIDI_Process.recvq_mutex)#define MPIDI_Recvq_unlock() MPID_Thread_unlock(&MPIDI_Process.recvq_mutex)#else#define MPIDI_Recvq_lock()#define MPIDI_Recvq_unlock()#endifstatic MPID_Request * recvq_posted_head = 0;static MPID_Request * recvq_posted_tail = 0;static MPID_Request * recvq_unexpected_head = 0;static MPID_Request * recvq_unexpected_tail = 0;/* * MPIDI_CH3U_Recvq_FU() * * Search for a matching request in the unexpected receive queue.  Return  * true if one is found, false otherwise.  If the status arguement is * not MPI_STATUS_IGNORE, return information about the request in that * parameter.  This routine is used by mpid_probe and mpid_iprobe. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FU#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Recvq_FU(int source, int tag, int context_id, MPI_Status *s){    MPID_Request * rreq;    int found = 0;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FU);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FU);    MPIDI_Recvq_lock();    if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE)    {	rreq = recvq_unexpected_head;	while(rreq != NULL)	{	    if (rreq->dev.match.context_id == context_id && 		rreq->dev.match.rank == source && rreq->dev.match.tag == tag)	    {		break;	    }	    	    rreq = rreq->dev.next;	}    }    else    {	MPIDI_Message_match match;	MPIDI_Message_match mask;	match.context_id = context_id;	mask.context_id = ~0;	if (tag == MPI_ANY_TAG)	{	    match.tag = 0;	    mask.tag = 0;	}	else	{	    match.tag = tag;	    mask.tag = ~0;	}	if (source == MPI_ANY_SOURCE)	{	    match.rank = 0;	    mask.rank = 0;	}	else	{	    match.rank = source;	    mask.rank = ~0;	}		rreq = recvq_unexpected_head;	while (rreq != NULL)	{	    if (rreq->dev.match.context_id == match.context_id && 		(rreq->dev.match.rank & mask.rank) == match.rank &&		(rreq->dev.match.tag & mask.tag) == match.tag)	    {		break;	    }	    	    rreq = rreq->dev.next;	}    }    /* Save the information about the request before releasing the        queue */    if (rreq) {	if (s != MPI_STATUS_IGNORE) {	    /* Avoid setting "extra" fields like MPI_ERROR */	    s->MPI_SOURCE = rreq->status.MPI_SOURCE;	    s->MPI_TAG    = rreq->status.MPI_TAG;	    s->count      = rreq->status.count;	    s->cancelled  = rreq->status.cancelled;	}	found = 1;    }    MPIDI_Recvq_unlock();    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FU);    return found;}/* * MPIDI_CH3U_Recvq_FDU() * * Find a request in the unexpected queue and dequeue it; otherwise return NULL. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FDU#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3U_Recvq_FDU(MPI_Request sreq_id, MPIDI_Message_match * match){    MPID_Request * rreq;    MPID_Request * prev_rreq;    MPID_Request * cur_rreq;    MPID_Request * matching_prev_rreq;    MPID_Request * matching_cur_rreq;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);    matching_prev_rreq = NULL;    matching_cur_rreq = NULL;    prev_rreq = NULL;        MPIDI_Recvq_lock();    {	cur_rreq = recvq_unexpected_head;	while(cur_rreq != NULL)	{	    if (cur_rreq->dev.sender_req_id == sreq_id && 		cur_rreq->dev.match.context_id == match->context_id &&		cur_rreq->dev.match.rank == match->rank && 		cur_rreq->dev.match.tag == match->tag)	    {		matching_prev_rreq = prev_rreq;		matching_cur_rreq = cur_rreq;	    }	    	    prev_rreq = cur_rreq;	    cur_rreq = cur_rreq->dev.next;	}	if (matching_cur_rreq != NULL)	{	    if (matching_prev_rreq != NULL)	    {		matching_prev_rreq->dev.next = matching_cur_rreq->dev.next;	    }	    else	    {		recvq_unexpected_head = matching_cur_rreq->dev.next;	    }		    if (matching_cur_rreq->dev.next == NULL)	    {		recvq_unexpected_tail = matching_prev_rreq;	    }	    rreq = matching_cur_rreq;	}	else	{	    rreq = NULL;	}    }    MPIDI_Recvq_unlock();    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);    return rreq;}/* * MPIDI_CH3U_Recvq_FDU_or_AEP() * * Atomically find a request in the unexpected queue and dequeue it, or  * allocate a new request and enqueue it in the posted queue */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FDU_or_AEP#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3U_Recvq_FDU_or_AEP(int source, int tag, 					   int context_id, int * foundp){    int found;    MPID_Request * rreq, *prev_rreq;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);    MPIDI_Recvq_lock();    {	/* Optimize this loop for an empty unexpected receive queue */	rreq = recvq_unexpected_head;	if (rreq) {	    prev_rreq = NULL;	    if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {		do { 		    if (rreq->dev.match.context_id == context_id && 			rreq->dev.match.rank == source && 			rreq->dev.match.tag == tag) {			if (prev_rreq != NULL) {			    prev_rreq->dev.next = rreq->dev.next;			}			else {			    recvq_unexpected_head = rreq->dev.next;			}			if (rreq->dev.next == NULL) {			    recvq_unexpected_tail = prev_rreq;			}			found = TRUE;			goto lock_exit;		    } 	    		prev_rreq = rreq;		rreq      = rreq->dev.next;		} while (rreq);	    }	    else {		MPIDI_Message_match match;		MPIDI_Message_match mask;		match.context_id = context_id;		mask.context_id = ~0;		if (tag == MPI_ANY_TAG) {		    match.tag = 0;		    mask.tag = 0;		}		else {		    match.tag = tag;		    mask.tag = ~0;		}		if (source == MPI_ANY_SOURCE) {		    match.rank = 0;		    mask.rank = 0;		}		else {		    match.rank = source;		    mask.rank = ~0;		}			do {		    if (rreq->dev.match.context_id == match.context_id && 			(rreq->dev.match.rank & mask.rank) == match.rank &&			(rreq->dev.match.tag & mask.tag) == match.tag) {			if (prev_rreq != NULL) {			    prev_rreq->dev.next = rreq->dev.next;			}			else {			    recvq_unexpected_head = rreq->dev.next;			}			if (rreq->dev.next == NULL) {			    recvq_unexpected_tail = prev_rreq;			}			found = TRUE;			goto lock_exit;		    }	    		    prev_rreq = rreq;		    rreq = rreq->dev.next;		} while (rreq);	    }	}	/* A matching request was not found in the unexpected queue, so we 	   need to allocate a new request and add it to the posted queue */	rreq = MPID_Request_create();	if (rreq != NULL)	{	    MPIU_Object_set_ref(rreq, 2);	    rreq->kind = MPID_REQUEST_RECV;	    rreq->dev.match.tag = tag;	    rreq->dev.match.rank = source;	    rreq->dev.match.context_id = context_id;	    rreq->dev.next = NULL;		    if (recvq_posted_tail != NULL)	    {		recvq_posted_tail->dev.next = rreq;	    }	    else	    {		recvq_posted_head = rreq;	    }	    recvq_posted_tail = rreq;	}    	found = FALSE;    }  lock_exit:    MPIDI_Recvq_unlock();    /* FIXME: Do we need this in the single threaded case? */    if (!found)    { 	MPID_Request_initialized_clear(rreq);    }    else    {	MPID_Request_initialized_wait(rreq);    }    *foundp = found;        MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);    return rreq;}/* * MPIDI_CH3U_Recvq_DP() * * Given an existing request, dequeue that request from the posted queue, or return NULL if the request was not in the posted * queued */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_DP#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Recvq_DP(MPID_Request * rreq){    int found;    MPID_Request * cur_rreq;    MPID_Request * prev_rreq;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_DP);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_DP);        found = FALSE;    prev_rreq = NULL;        MPIDI_Recvq_lock();    {	cur_rreq = recvq_posted_head;	while (cur_rreq != NULL)	{	    if (cur_rreq == rreq)	    {		if (prev_rreq != NULL)		{		    prev_rreq->dev.next = cur_rreq->dev.next;		}		else		{		    recvq_posted_head = cur_rreq->dev.next;		}		if (cur_rreq->dev.next == NULL)		{		    recvq_posted_tail = prev_rreq;		}	    		found = TRUE;		break;	    }	    	    prev_rreq = cur_rreq;	    cur_rreq = cur_rreq->dev.next;	}    }    MPIDI_Recvq_unlock();    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_DP);    return found;}/* FIXME: No-one uses this routine.  */#if 0/* * MPIDI_CH3U_Recvq_FDP * * Locate a request in the posted queue and dequeue it, or return NULL. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FDP#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3U_Recvq_FDP(MPIDI_Message_match * match){    MPID_Request * rreq;    MPID_Request * prev_rreq;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDP);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDP);        prev_rreq = NULL;    MPIDI_Recvq_lock();    {	rreq = recvq_posted_head;	while (rreq != NULL)	{	    if ((rreq->dev.match.context_id == match->context_id) &&		(rreq->dev.match.rank == match->rank || 		 rreq->dev.match.rank == MPI_ANY_SOURCE) &&		(rreq->dev.match.tag == match->tag || 		 rreq->dev.match.tag == MPI_ANY_TAG))	    {		if (prev_rreq != NULL)		{		    prev_rreq->dev.next = rreq->dev.next;		}		else		{		    recvq_posted_head = rreq->dev.next;		}		if (rreq->dev.next == NULL)		{		    recvq_posted_tail = prev_rreq;		}		break;	    }	    	    prev_rreq = rreq;	    rreq = rreq->dev.next;	}    }    MPIDI_Recvq_unlock();    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDP);    return rreq;}#endif /* Unused routine *//* * MPIDI_CH3U_Recvq_FDP_or_AEU() * * Locate a request in the posted queue and dequeue it, or allocate a new  * request and enqueue it in the unexpected queue */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FDP_or_AEU#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match, 					   int * foundp){    int found;    MPID_Request * rreq;    MPID_Request * prev_rreq;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);        prev_rreq = NULL;    MPIDI_Recvq_lock();    {	rreq = recvq_posted_head;	while (rreq != NULL)	{	    if ((rreq->dev.match.context_id == match->context_id) &&		(rreq->dev.match.rank == match->rank || 		 rreq->dev.match.rank == MPI_ANY_SOURCE) &&		(rreq->dev.match.tag == match->tag || 		 rreq->dev.match.tag == MPI_ANY_TAG))	    {		if (prev_rreq != NULL)		{		    prev_rreq->dev.next = rreq->dev.next;		}		else		{		    recvq_posted_head = rreq->dev.next;		}		if (rreq->dev.next == NULL)		{		    recvq_posted_tail = prev_rreq;		}		found = TRUE;		goto lock_exit;	    }	    	    prev_rreq = rreq;	    rreq = rreq->dev.next;	}	/* A matching request was not found in the posted queue, so we 	   need to allocate a new request and add it to the unexpected queue */	rreq = MPID_Request_create();	if (rreq != NULL)	{	    MPIU_Object_set_ref(rreq, 2);	    rreq->kind = MPID_REQUEST_RECV;	    rreq->dev.match = *match;	    rreq->dev.next = NULL;		    if (recvq_unexpected_tail != NULL)	    {		recvq_unexpected_tail->dev.next = rreq;	    }	    else	    {		recvq_unexpected_head = rreq;	    }	    recvq_unexpected_tail = rreq;	}    	found = FALSE;    }  lock_exit:    MPIDI_Recvq_unlock();    if (!found)    { 	MPID_Request_initialized_clear(rreq);    }    else    {	MPID_Request_initialized_wait(rreq);    }    *foundp = found;    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);    return rreq;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?