📄 ch3u_handle_recv_pkt.c
字号:
MPIDI_DBG_PRINTF((30, FCNAME, "received rndv RTS pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d", rts_pkt->sender_req_id, rts_pkt->match.rank, rts_pkt->match.tag, rts_pkt->match.context_id)); rreq = MPIDI_CH3U_Request_FDP_or_AEU(&rts_pkt->match, &found); assert(rreq != NULL); set_request_info(rreq, rts_pkt, MPIDI_REQUEST_RNDV_MSG); if (found) { MPID_Request * cts_req; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_rndv_clr_to_send_t * cts_pkt = &upkt.rndv_clr_to_send; MPIDI_DBG_PRINTF((30, FCNAME, "posted request found")); /* FIXME: What if the receive user buffer is not big enough to hold the data about to be cleared for sending? */ MPIDI_DBG_PRINTF((30, FCNAME, "sending rndv CTS packet")); cts_pkt->type = MPIDI_CH3_PKT_RNDV_CLR_TO_SEND; cts_pkt->sender_req_id = rts_pkt->sender_req_id; cts_pkt->receiver_req_id = rreq->handle; cts_req = MPIDI_CH3_iStartMsg(vc, cts_pkt, sizeof(*cts_pkt)); if (cts_req != NULL) { MPID_Request_release(cts_req); } } else { MPIDI_DBG_PRINTF((30, FCNAME, "unexpected request allocated")); } break; } case MPIDI_CH3_PKT_RNDV_CLR_TO_SEND: { MPIDI_CH3_Pkt_rndv_clr_to_send_t * cts_pkt = &pkt->rndv_clr_to_send; MPID_Request * sreq; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_rndv_send_t * rs_pkt = &upkt.rndv_send; int dt_contig; MPIDI_msg_sz_t data_sz; MPID_IOV iov[MPID_IOV_LIMIT]; int iov_n; int mpi_errno = MPI_SUCCESS; MPIDI_DBG_PRINTF((30, FCNAME, "received rndv CTS pkt")); MPID_Request_get_ptr(cts_pkt->sender_req_id, sreq); /* release the RTS request if one exists */ /* FIXME - MT: this needs to be atomic to prevent cancel send from cancelling the wrong request */ if (sreq->partner_request) { MPID_Request_release(sreq->partner_request); sreq->partner_request = NULL; } rs_pkt->type = MPIDI_CH3_PKT_RNDV_SEND; rs_pkt->receiver_req_id = cts_pkt->receiver_req_id; iov[0].MPID_IOV_BUF = (void*)rs_pkt; iov[0].MPID_IOV_LEN = sizeof(*rs_pkt); MPIDI_CH3U_Datatype_get_info(sreq->ch3.user_count, sreq->ch3.datatype, dt_contig, data_sz); if (dt_contig) { MPIDI_DBG_PRINTF((30, FCNAME, "sending contiguous rndv data, data_sz=" MPIDI_MSG_SZ_FMT, data_sz)); sreq->ch3.ca = MPIDI_CH3_CA_COMPLETE; iov[1].MPID_IOV_BUF = sreq->ch3.user_buf; iov[1].MPID_IOV_LEN = data_sz; iov_n = 2; } else { MPID_Segment_init(sreq->ch3.user_buf, sreq->ch3.user_count, sreq->ch3.datatype, &sreq->ch3.segment); iov_n = MPID_IOV_LIMIT - 1; sreq->ch3.segment_first = 0; sreq->ch3.segment_size = data_sz; mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[1], &iov_n); if (mpi_errno != MPI_SUCCESS) { MPIDI_ERR_PRINTF((FCNAME, "MPIDI_CH3_PKT_RNDV_CLR_TO_SEND failed to load IOV")); abort(); } iov_n += 1; } MPIDI_CH3_iSendv(vc, sreq, iov, iov_n); break; } case MPIDI_CH3_PKT_RNDV_SEND: { MPIDI_CH3_Pkt_rndv_send_t * rs_pkt = &pkt->rndv_send; MPID_Request * rreq; MPIDI_DBG_PRINTF((30, FCNAME, "received rndv send (data) pkt")); MPID_Request_get_ptr(rs_pkt->receiver_req_id, rreq); post_data_receive(vc, rreq, TRUE); break; } case MPIDI_CH3_PKT_CANCEL_SEND_REQ: { MPIDI_CH3_Pkt_cancel_send_req_t * req_pkt = &pkt->cancel_send_req; MPID_Request * rreq; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_cancel_send_resp_t * resp_pkt = &upkt.cancel_send_resp; MPID_Request * resp_sreq; MPIDI_DBG_PRINTF((30, FCNAME, "received cancel send req pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d", req_pkt->sender_req_id, req_pkt->match.rank, req_pkt->match.tag, req_pkt->match.context_id)); rreq = MPIDI_CH3U_Request_FDU(req_pkt->sender_req_id, &req_pkt->match); if (rreq != NULL) { MPIDI_DBG_PRINTF((35, FCNAME, "message cancelled")); if (MPIDI_Request_get_msg_type(rreq) == MPIDI_REQUEST_EAGER_MSG && rreq->ch3.recv_data_sz > 0) { MPIU_Free(rreq->ch3.tmpbuf); } MPID_Request_release(rreq); resp_pkt->ack = TRUE; } else { MPIDI_DBG_PRINTF((35, FCNAME, "unable to cancel message")); resp_pkt->ack = FALSE; } resp_pkt->type = MPIDI_CH3_PKT_CANCEL_SEND_RESP; resp_pkt->sender_req_id = req_pkt->sender_req_id; resp_sreq = MPIDI_CH3_iStartMsg(vc, resp_pkt, sizeof(*resp_pkt)); if (resp_sreq != NULL) { MPID_Request_release(resp_sreq); } break; } case MPIDI_CH3_PKT_CANCEL_SEND_RESP: { MPIDI_CH3_Pkt_cancel_send_resp_t * resp_pkt = &pkt->cancel_send_resp; MPID_Request * sreq; MPIDI_DBG_PRINTF((30, FCNAME, "received cancel send resp pkt, sreq=0x%08x, ack=%d", resp_pkt->sender_req_id, resp_pkt->ack)); MPID_Request_get_ptr(resp_pkt->sender_req_id, sreq); if (resp_pkt->ack) { sreq->status.cancelled = TRUE; if (MPIDI_Request_get_msg_type(sreq) == MPIDI_REQUEST_RNDV_MSG || MPIDI_Request_get_type(sreq) == MPIDI_REQUEST_TYPE_SSEND) { int cc; /* decrement the CC one additional time for the CTS/sync ack that is never going to arrive */ MPIDI_CH3U_Request_decrement_cc(sreq, &cc); } MPIDI_DBG_PRINTF((35, FCNAME, "message cancelled")); } else { MPIDI_DBG_PRINTF((35, FCNAME, "unable to cancel message")); } MPIDI_CH3U_Request_complete(sreq); break; } case MPIDI_CH3_PKT_PUT: { MPIDI_DBG_PRINTF((30, FCNAME, "received put pkt")); MPIDI_ERR_PRINTF((FCNAME, "MPIDI_CH3_PKT_PUT UMIMPLEMENTED")); abort(); break; } case MPIDI_CH3_PKT_FLOW_CNTL_UPDATE: { MPIDI_DBG_PRINTF((30, FCNAME, "received flow control update pkt")); MPIDI_ERR_PRINTF((FCNAME, "MPIDI_CH3_PKT_FLOW_CNTL_UPDATE UMIMPLEMENTED")); abort(); break; } default: { MPIDI_ERR_PRINTF((FCNAME, "packet type %d not implemented.\n", pkt->type)); abort(); } } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT);}#undef FUNCNAME#define FUNCNAME post_data_receive#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static void post_data_receive(MPIDI_VC * vc, MPID_Request * rreq, int found){ int dt_contig; MPIDI_msg_sz_t userbuf_sz; MPIDI_msg_sz_t data_sz; if (rreq->ch3.recv_data_sz == 0) { MPIDI_DBG_PRINTF((30, FCNAME, "null message, %s, decrementing completion counter", (found ? "posted request found" : "unexpected request allocated"))); /* mark data transfer as complete adn decrment CC */ rreq->ch3.iov_count = 0; MPIDI_CH3U_Request_complete(rreq); goto fn_exit; } if (found) { MPIDI_DBG_PRINTF((30, FCNAME, "posted request found")); MPIDI_CH3U_Datatype_get_info(rreq->ch3.user_count, rreq->ch3.datatype, dt_contig, userbuf_sz); if (rreq->ch3.recv_data_sz <= userbuf_sz) { data_sz = rreq->ch3.recv_data_sz; } else { MPIDI_DBG_PRINTF((35, FCNAME, "receive buffer too small; message truncated, msg_sz=" MPIDI_MSG_SZ_FMT ", userbuf_sz=" MPIDI_MSG_SZ_FMT, rreq->ch3.recv_data_sz, userbuf_sz)); rreq->status.MPI_ERROR = MPI_ERR_TRUNCATE; rreq->status.count = userbuf_sz; data_sz = userbuf_sz; } if (dt_contig && data_sz == rreq->ch3.recv_data_sz) { /* user buffer is contiguous and large enough to store the entire message */ MPIDI_DBG_PRINTF((35, FCNAME, "IOV loaded for contiguous read")); rreq->ch3.iov[0].MPID_IOV_BUF = rreq->ch3.user_buf; rreq->ch3.iov[0].MPID_IOV_LEN = data_sz; rreq->ch3.iov_count = 1; rreq->ch3.ca = MPIDI_CH3_CA_COMPLETE; } else { /* user buffer is not contiguous or is too small to hold the entire message */ int mpi_errno; MPIDI_DBG_PRINTF((35, FCNAME, "IOV loaded for non-contiguous read")); MPID_Segment_init(rreq->ch3.user_buf, rreq->ch3.user_count, rreq->ch3.datatype, &rreq->ch3.segment); rreq->ch3.segment_first = 0; rreq->ch3.segment_size = data_sz; mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); if (mpi_errno != MPI_SUCCESS) { MPIDI_ERR_PRINTF((FCNAME, "failed to load IOV")); abort(); } } } else /* if (!found) */ { /* TODO: to improve performance, allocate temporary buffer from a specialized buffer pool. */ MPIDI_DBG_PRINTF((30, FCNAME, "unexpected request allocated")); rreq->ch3.tmpbuf = MPIU_Malloc(rreq->ch3.recv_data_sz); rreq->ch3.tmpbuf_sz = rreq->ch3.recv_data_sz; rreq->ch3.iov[0].MPID_IOV_BUF = rreq->ch3.tmpbuf; rreq->ch3.iov[0].MPID_IOV_LEN = rreq->ch3.recv_data_sz; rreq->ch3.iov_count = 1; rreq->ch3.ca = MPIDI_CH3_CA_COMPLETE; } MPIDI_DBG_PRINTF((35, FCNAME, "posting iRead")); MPIDI_CH3_iRead(vc, rreq); fn_exit: ;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -