📄 ch3u_handle_recv_pkt.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidimpl.h"#define set_request_info(rreq_, pkt_, msg_type_) \{ \ (rreq_)->status.MPI_SOURCE = (pkt_)->match.rank; \ (rreq_)->status.MPI_TAG = (pkt_)->match.tag; \ (rreq_)->status.count = (pkt_)->data_sz; \ (rreq_)->dev.sender_req_id = (pkt_)->sender_req_id; \ (rreq_)->dev.recv_data_sz = (pkt_)->data_sz; \ MPIDI_Request_set_seqnum((rreq_), (pkt_)->seqnum); \ MPIDI_Request_set_msg_type((rreq_), (msg_type_)); \}#ifdef MPIDI_CH3_CHANNEL_RNDV#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_recv_rndv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_recv_rndv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, MPID_Request ** rreqp, int *foundp){ int mpi_errno = MPI_SUCCESS; MPID_Request *rreq; MPIDI_CH3_Pkt_rndv_req_to_send_t * rts_pkt = &pkt->rndv_req_to_send; rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&rts_pkt->match, foundp); if (rreq == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq"); } set_request_info(rreq, rts_pkt, MPIDI_REQUEST_RNDV_MSG); if (!*foundp) { MPIDI_DBG_PRINTF((30, FCNAME, "unexpected request allocated")); MPID_Request_initialized_set(rreq); /* * An MPID_Probe() may be waiting for the request we just inserted, * so we need to tell the progress engine to exit. * * FIXME: This will cause MPID_Progress_wait() to return to the MPI * layer each time an unexpected RTS packet is * received. MPID_Probe() should atomically increment a counter and * MPIDI_CH3_Progress_signal_completion() * should only be called if that counter is greater than zero. */ MPIDI_CH3_Progress_signal_completion(); } /* return the request */ *rreqp = rreq; fn_fail: return mpi_errno;}#endif/* * MPIDI_CH3U_Handle_recv_pkt() * * NOTE: Multiple threads may NOT simultaneously call this routine with the same VC. This constraint eliminates the need to * lock the VC. If simultaneous upcalls are a possible, the calling routine for serializing the calls. */int MPIDI_CH3U_Handle_unordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt);int MPIDI_CH3U_Handle_ordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt);#if defined(MPIDI_CH3_MSGS_UNORDERED)#define MPIDI_CH3U_Handle_unordered_recv_pkt MPIDI_CH3U_Handle_recv_pkt#else#define MPIDI_CH3U_Handle_ordered_recv_pkt MPIDI_CH3U_Handle_recv_pkt #endif#if defined(MPIDI_CH3_MSGS_UNORDERED)#define MPIDI_CH3U_Pkt_send_container_alloc() (MPIU_Malloc(sizeof(MPIDI_CH3_Pkt_send_container_t)))#define MPIDI_CH3U_Pkt_send_container_free(pc_) MPIU_Free(pc_)#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_unordered_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_unordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, MPID_Request ** rreqp){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_UNORDERED_RECV_PKT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_UNORDERED_RECV_PKT); MPIDI_DBG_PRINTF((10, FCNAME, "entering")); rreqp = NULL; switch(pkt->type) { case MPIDI_CH3_PKT_EAGER_SEND: case MPIDI_CH3_PKT_EAGER_SYNC_SEND: case MPIDI_CH3_PKT_READY_SEND: case MPIDI_CH3_PKT_RNDV_REQ_TO_SEND: { MPIDI_CH3_Pkt_send_t * send_pkt = (MPIDI_CH3_Pkt_send_t *) pkt; MPIDI_CH3_Pkt_send_container_t * pc_cur; MPIDI_CH3_Pkt_send_container_t * pc_last; MPIDI_DBG_PRINTF((30, FCNAME, "received (potentially) out-of-order send pkt")); MPIDI_DBG_PRINTF((30, FCNAME, "rank=%d, tag=%d, context=%d seqnum=%d", send_pkt->match.rank, send_pkt->match.tag, send_pkt->match.context_id, send_pkt->seqnum)); MPIDI_DBG_PRINTF((30, FCNAME, "vc - seqnum_send=%d seqnum_recv=%d reorder_msg_queue=0x%08lx", vc->seqnum_send, vc->seqnum_recv, (unsigned long) vc->msg_reorder_queue)); if (send_pkt->seqnum == vc->seqnum_recv) { mpi_errno = MPIDI_CH3U_Handle_ordered_recv_pkt(vc, pkt, rreqp); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { goto fn_exit; } /* --END ERROR HANDLING-- */ vc->seqnum_recv++; pc_cur = vc->msg_reorder_queue; while(pc_cur != NULL && vc->seqnum_recv == pc_cur->pkt.seqnum) { pkt = (MPIDI_CH3_Pkt_t *) &pc_cur->pkt; mpi_errno = MPIDI_CH3U_Handle_ordered_recv_pkt(vc, pkt, rreqp); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|pktordered", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ vc->seqnum_recv++; pc_last = pc_cur; pc_cur = pc_cur->next; MPIDI_CH3U_Pkt_send_container_free(pc_last); } vc->msg_reorder_queue = pc_cur; } else { MPIDI_CH3_Pkt_send_container_t * pc_new; /* allocate container and copy packet */ pc_new = MPIDI_CH3U_Pkt_send_container_alloc(); /* --BEGIN ERROR HANDLING-- */ if (pc_new == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|nopktcontainermem", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ pc_new->pkt = *send_pkt; /* insert packet into reorder queue */ pc_last = NULL; pc_cur = vc->msg_reorder_queue; while (pc_cur != NULL) { /* the current recv seqnum is subtracted from both the seqnums prior to comparision so as to remove any wrap around effects. */ if (pc_new->pkt.seqnum - vc->seqnum_recv < pc_cur->pkt.seqnum - vc->seqnum_recv) { break; } pc_last = pc_cur; pc_cur = pc_cur->next; } if (pc_last == NULL) { pc_new->next = pc_cur; vc->msg_reorder_queue = pc_new; } else { pc_new->next = pc_cur; pc_last->next = pc_new; } } break; } case MPIDI_CH3_PKT_CANCEL_SEND_REQ: { /* --BEGIN ERROR HANDLING-- */ /* FIXME: processing send cancel requests requires that we be aware of pkts in the reorder queue */ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|ooocancelreq", 0); goto fn_exit; break; /* --END ERROR HANDLING-- */ } default: { mpi_errno = MPIDI_CH3U_Handle_ordered_recv_pkt(vc, pkt, rreqp); break; } } fn_exit: MPIDI_DBG_PRINTF((10, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_HANDLE_UNORDERED_RECV_PKT); return mpi_errno;}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_ordered_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_ordered_recv_pkt(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, MPID_Request ** rreqp){ int type_size; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT); MPIDI_DBG_PRINTF((10, FCNAME, "entering")); MPIDI_DBG_Print_packet(pkt); switch(pkt->type) { /* FIXME: This is not optimized for short messages, which should have the data in the same packet when the data is particularly short (e.g., one 8 byte long word) */ case MPIDI_CH3_PKT_EAGER_SEND: { MPIDI_CH3_Pkt_eager_send_t * eager_pkt = &pkt->eager_send; MPID_Request * rreq; int found; MPIDI_DBG_PRINTF((30, FCNAME, "received eager send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d", eager_pkt->sender_req_id, eager_pkt->match.rank, eager_pkt->match.tag, eager_pkt->match.context_id)); rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&eager_pkt->match, &found); if (rreq == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq"); } set_request_info(rreq, eager_pkt, MPIDI_REQUEST_EAGER_MSG); *rreqp = rreq; /* FIXME: What is the logic here? On an eager receive, the data should be available already, and we should be optimizing for short messages */ mpi_errno = MPIDI_CH3U_Post_data_receive(found, rreqp); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_EAGER_SEND"); } break; } case MPIDI_CH3_PKT_READY_SEND: { MPIDI_CH3_Pkt_ready_send_t * ready_pkt = &pkt->ready_send; MPID_Request * rreq; int found; MPIDI_DBG_PRINTF((30, FCNAME, "received ready send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d", ready_pkt->sender_req_id, ready_pkt->match.rank, ready_pkt->match.tag, ready_pkt->match.context_id)); rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&ready_pkt->match, &found); if (rreq == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq"); } set_request_info(rreq, ready_pkt, MPIDI_REQUEST_EAGER_MSG); *rreqp = rreq; if (found) { mpi_errno = MPIDI_CH3U_Post_data_receive(TRUE, rreqp); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_READY_SEND"); } } else { /* FIXME: an error packet should be sent back to the sender indicating that the ready-send failed. On the send side, the error handler for the communicator can be invoked even if the ready-send request has already completed. */ /* We need to consume any outstanding associated data and mark the request with an error. */ MPID_Request_initialized_set(rreq); rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**rsendnomatch", "**rsendnomatch %d %d", ready_pkt->match.rank, ready_pkt->match.tag); rreq->status.count = 0; if (rreq->dev.recv_data_sz > 0) { /* force read of extra data */ rreq->dev.segment_first = 0; rreq->dev.segment_size = 0; mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|loadrecviov"); } } else { /* mark data transfer as complete and decrement CC */ MPIDI_CH3U_Request_complete(rreq); *rreqp = NULL; } } break; } case MPIDI_CH3_PKT_EAGER_SYNC_SEND: { MPIDI_CH3_Pkt_eager_send_t * es_pkt = &pkt->eager_send; MPID_Request * rreq; int found; MPIDI_DBG_PRINTF((30, FCNAME, "received eager sync send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d", es_pkt->sender_req_id, es_pkt->match.rank, es_pkt->match.tag, es_pkt->match.context_id)); rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&es_pkt->match, &found); if (rreq == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq"); } set_request_info(rreq, es_pkt, MPIDI_REQUEST_EAGER_MSG); *rreqp = rreq; mpi_errno = MPIDI_CH3U_Post_data_receive(found, rreqp); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_EAGER_SYNC_SEND"); } if (found) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_eager_sync_ack_t * const esa_pkt = &upkt.eager_sync_ack; MPID_Request * esa_req; MPIDI_DBG_PRINTF((30, FCNAME, "sending eager sync ack")); MPIDI_Pkt_init(esa_pkt, MPIDI_CH3_PKT_EAGER_SYNC_ACK); esa_pkt->sender_req_id = rreq->dev.sender_req_id; mpi_errno = MPIDI_CH3_iStartMsg(vc, esa_pkt, sizeof(*esa_pkt), &esa_req); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|syncack"); } if (esa_req != NULL) { MPID_Request_release(esa_req); } } else { MPIDI_Request_set_sync_send_flag(rreq, TRUE); } break; } case MPIDI_CH3_PKT_EAGER_SYNC_ACK: { MPIDI_CH3_Pkt_eager_sync_ack_t * esa_pkt = &pkt->eager_sync_ack; MPID_Request * sreq; MPIDI_DBG_PRINTF((30, FCNAME, "received eager sync ack pkt, sreq=0x%08x", esa_pkt->sender_req_id)); MPID_Request_get_ptr(esa_pkt->sender_req_id, sreq); /* decrement CC (but don't mark data transfer as complete since the transfer could still be in progress) */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -