ch3u_recvq.c
来自「mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环」· C语言 代码 · 共 554 行
C
554 行
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidimpl.h"/* FIXME: * Are locks required here? Not when the current code uses the "one big lock" * approach. In addition, some routines can be implemented in a lock-free * fashion (because the user is required to guarantee non-conflicting * accesses, such as doing a probe and a receive that matches in different * threads). * * There are a lot of routines here. Do we really need them all? * * The search criteria can be implemented in a single 64 bit compare on * systems with efficient 64-bit operations. The rank and contextid can also * in many cases be combined into a single 32-bit word for the comparison * (in which case the message info should be stored in the queue in a * naturally aligned, 64-bit word. * */#if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED)#define MPIDI_Recvq_lock() MPID_Thread_lock(&MPIDI_Process.recvq_mutex)#define MPIDI_Recvq_unlock() MPID_Thread_unlock(&MPIDI_Process.recvq_mutex)#else#define MPIDI_Recvq_lock()#define MPIDI_Recvq_unlock()#endifstatic MPID_Request * recvq_posted_head = 0;static MPID_Request * recvq_posted_tail = 0;static MPID_Request * recvq_unexpected_head = 0;static MPID_Request * recvq_unexpected_tail = 0;/* * MPIDI_CH3U_Recvq_FU() * * Search for a matching request in the unexpected receive queue. Return * true if one is found, false otherwise. If the status arguement is * not MPI_STATUS_IGNORE, return information about the request in that * parameter. This routine is used by mpid_probe and mpid_iprobe. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FU#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Recvq_FU(int source, int tag, int context_id, MPI_Status *s){ MPID_Request * rreq; int found = 0; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FU); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FU); MPIDI_Recvq_lock(); if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) { rreq = recvq_unexpected_head; while(rreq != NULL) { if (rreq->dev.match.context_id == context_id && rreq->dev.match.rank == source && rreq->dev.match.tag == tag) { break; } rreq = rreq->dev.next; } } else { MPIDI_Message_match match; MPIDI_Message_match mask; match.context_id = context_id; mask.context_id = ~0; if (tag == MPI_ANY_TAG) { match.tag = 0; mask.tag = 0; } else { match.tag = tag; mask.tag = ~0; } if (source == MPI_ANY_SOURCE) { match.rank = 0; mask.rank = 0; } else { match.rank = source; mask.rank = ~0; } rreq = recvq_unexpected_head; while (rreq != NULL) { if (rreq->dev.match.context_id == match.context_id && (rreq->dev.match.rank & mask.rank) == match.rank && (rreq->dev.match.tag & mask.tag) == match.tag) { break; } rreq = rreq->dev.next; } } /* Save the information about the request before releasing the queue */ if (rreq) { if (s != MPI_STATUS_IGNORE) { /* Avoid setting "extra" fields like MPI_ERROR */ s->MPI_SOURCE = rreq->status.MPI_SOURCE; s->MPI_TAG = rreq->status.MPI_TAG; s->count = rreq->status.count; s->cancelled = rreq->status.cancelled; } found = 1; } MPIDI_Recvq_unlock(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FU); return found;}/* * MPIDI_CH3U_Recvq_FDU() * * Find a request in the unexpected queue and dequeue it; otherwise return NULL. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FDU#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3U_Recvq_FDU(MPI_Request sreq_id, MPIDI_Message_match * match){ MPID_Request * rreq; MPID_Request * prev_rreq; MPID_Request * cur_rreq; MPID_Request * matching_prev_rreq; MPID_Request * matching_cur_rreq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU); matching_prev_rreq = NULL; matching_cur_rreq = NULL; prev_rreq = NULL; MPIDI_Recvq_lock(); { cur_rreq = recvq_unexpected_head; while(cur_rreq != NULL) { if (cur_rreq->dev.sender_req_id == sreq_id && cur_rreq->dev.match.context_id == match->context_id && cur_rreq->dev.match.rank == match->rank && cur_rreq->dev.match.tag == match->tag) { matching_prev_rreq = prev_rreq; matching_cur_rreq = cur_rreq; } prev_rreq = cur_rreq; cur_rreq = cur_rreq->dev.next; } if (matching_cur_rreq != NULL) { if (matching_prev_rreq != NULL) { matching_prev_rreq->dev.next = matching_cur_rreq->dev.next; } else { recvq_unexpected_head = matching_cur_rreq->dev.next; } if (matching_cur_rreq->dev.next == NULL) { recvq_unexpected_tail = matching_prev_rreq; } rreq = matching_cur_rreq; } else { rreq = NULL; } } MPIDI_Recvq_unlock(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU); return rreq;}/* * MPIDI_CH3U_Recvq_FDU_or_AEP() * * Atomically find a request in the unexpected queue and dequeue it, or * allocate a new request and enqueue it in the posted queue */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FDU_or_AEP#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3U_Recvq_FDU_or_AEP(int source, int tag, int context_id, int * foundp){ int found; MPID_Request * rreq, *prev_rreq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP); MPIDI_Recvq_lock(); { /* Optimize this loop for an empty unexpected receive queue */ rreq = recvq_unexpected_head; if (rreq) { prev_rreq = NULL; if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) { do { if (rreq->dev.match.context_id == context_id && rreq->dev.match.rank == source && rreq->dev.match.tag == tag) { if (prev_rreq != NULL) { prev_rreq->dev.next = rreq->dev.next; } else { recvq_unexpected_head = rreq->dev.next; } if (rreq->dev.next == NULL) { recvq_unexpected_tail = prev_rreq; } found = TRUE; goto lock_exit; } prev_rreq = rreq; rreq = rreq->dev.next; } while (rreq); } else { MPIDI_Message_match match; MPIDI_Message_match mask; match.context_id = context_id; mask.context_id = ~0; if (tag == MPI_ANY_TAG) { match.tag = 0; mask.tag = 0; } else { match.tag = tag; mask.tag = ~0; } if (source == MPI_ANY_SOURCE) { match.rank = 0; mask.rank = 0; } else { match.rank = source; mask.rank = ~0; } do { if (rreq->dev.match.context_id == match.context_id && (rreq->dev.match.rank & mask.rank) == match.rank && (rreq->dev.match.tag & mask.tag) == match.tag) { if (prev_rreq != NULL) { prev_rreq->dev.next = rreq->dev.next; } else { recvq_unexpected_head = rreq->dev.next; } if (rreq->dev.next == NULL) { recvq_unexpected_tail = prev_rreq; } found = TRUE; goto lock_exit; } prev_rreq = rreq; rreq = rreq->dev.next; } while (rreq); } } /* A matching request was not found in the unexpected queue, so we need to allocate a new request and add it to the posted queue */ rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.match.tag = tag; rreq->dev.match.rank = source; rreq->dev.match.context_id = context_id; rreq->dev.next = NULL; if (recvq_posted_tail != NULL) { recvq_posted_tail->dev.next = rreq; } else { recvq_posted_head = rreq; } recvq_posted_tail = rreq; } found = FALSE; } lock_exit: MPIDI_Recvq_unlock(); /* FIXME: Do we need this in the single threaded case? */ if (!found) { MPID_Request_initialized_clear(rreq); } else { MPID_Request_initialized_wait(rreq); } *foundp = found; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP); return rreq;}/* * MPIDI_CH3U_Recvq_DP() * * Given an existing request, dequeue that request from the posted queue, or return NULL if the request was not in the posted * queued */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_DP#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Recvq_DP(MPID_Request * rreq){ int found; MPID_Request * cur_rreq; MPID_Request * prev_rreq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_DP); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_DP); found = FALSE; prev_rreq = NULL; MPIDI_Recvq_lock(); { cur_rreq = recvq_posted_head; while (cur_rreq != NULL) { if (cur_rreq == rreq) { if (prev_rreq != NULL) { prev_rreq->dev.next = cur_rreq->dev.next; } else { recvq_posted_head = cur_rreq->dev.next; } if (cur_rreq->dev.next == NULL) { recvq_posted_tail = prev_rreq; } found = TRUE; break; } prev_rreq = cur_rreq; cur_rreq = cur_rreq->dev.next; } } MPIDI_Recvq_unlock(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_DP); return found;}/* FIXME: No-one uses this routine. */#if 0/* * MPIDI_CH3U_Recvq_FDP * * Locate a request in the posted queue and dequeue it, or return NULL. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FDP#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3U_Recvq_FDP(MPIDI_Message_match * match){ MPID_Request * rreq; MPID_Request * prev_rreq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDP); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDP); prev_rreq = NULL; MPIDI_Recvq_lock(); { rreq = recvq_posted_head; while (rreq != NULL) { if ((rreq->dev.match.context_id == match->context_id) && (rreq->dev.match.rank == match->rank || rreq->dev.match.rank == MPI_ANY_SOURCE) && (rreq->dev.match.tag == match->tag || rreq->dev.match.tag == MPI_ANY_TAG)) { if (prev_rreq != NULL) { prev_rreq->dev.next = rreq->dev.next; } else { recvq_posted_head = rreq->dev.next; } if (rreq->dev.next == NULL) { recvq_posted_tail = prev_rreq; } break; } prev_rreq = rreq; rreq = rreq->dev.next; } } MPIDI_Recvq_unlock(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDP); return rreq;}#endif /* Unused routine *//* * MPIDI_CH3U_Recvq_FDP_or_AEU() * * Locate a request in the posted queue and dequeue it, or allocate a new * request and enqueue it in the unexpected queue */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Recvq_FDP_or_AEU#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match, int * foundp){ int found; MPID_Request * rreq; MPID_Request * prev_rreq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU); prev_rreq = NULL; MPIDI_Recvq_lock(); { rreq = recvq_posted_head; while (rreq != NULL) { if ((rreq->dev.match.context_id == match->context_id) && (rreq->dev.match.rank == match->rank || rreq->dev.match.rank == MPI_ANY_SOURCE) && (rreq->dev.match.tag == match->tag || rreq->dev.match.tag == MPI_ANY_TAG)) { if (prev_rreq != NULL) { prev_rreq->dev.next = rreq->dev.next; } else { recvq_posted_head = rreq->dev.next; } if (rreq->dev.next == NULL) { recvq_posted_tail = prev_rreq; } found = TRUE; goto lock_exit; } prev_rreq = rreq; rreq = rreq->dev.next; } /* A matching request was not found in the posted queue, so we need to allocate a new request and add it to the unexpected queue */ rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.match = *match; rreq->dev.next = NULL; if (recvq_unexpected_tail != NULL) { recvq_unexpected_tail->dev.next = rreq; } else { recvq_unexpected_head = rreq; } recvq_unexpected_tail = rreq; } found = FALSE; } lock_exit: MPIDI_Recvq_unlock(); if (!found) { MPID_Request_initialized_clear(rreq); } else { MPID_Request_initialized_wait(rreq); } *foundp = found; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU); return rreq;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?