📄 ch3u_handle_recv_pkt.c
字号:
MPIDI_CH3U_Request_complete(sreq); /* brad : seen this segfault in ssm dynamic process...? */ *rreqp = NULL; break; } case MPIDI_CH3_PKT_RNDV_REQ_TO_SEND: { MPID_Request * rreq; int found; MPIDI_CH3_Pkt_rndv_req_to_send_t * rts_pkt = &pkt->rndv_req_to_send; MPIDI_DBG_PRINTF((30, FCNAME, "received rndv RTS pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d, data_sz=%d", rts_pkt->sender_req_id, rts_pkt->match.rank, rts_pkt->match.tag, rts_pkt->match.context_id, rts_pkt->data_sz)); rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&rts_pkt->match, &found); if (rreq == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq"); } set_request_info(rreq, rts_pkt, MPIDI_REQUEST_RNDV_MSG); if (found) { MPID_Request * cts_req; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_rndv_clr_to_send_t * cts_pkt = &upkt.rndv_clr_to_send; MPIDI_DBG_PRINTF((30, FCNAME, "posted request found")); /* FIXME: What if the receive user buffer is not big enough to hold the data about to be cleared for sending? */ MPIDI_DBG_PRINTF((30, FCNAME, "sending rndv CTS packet")); MPIDI_Pkt_init(cts_pkt, MPIDI_CH3_PKT_RNDV_CLR_TO_SEND); cts_pkt->sender_req_id = rts_pkt->sender_req_id; cts_pkt->receiver_req_id = rreq->handle; mpi_errno = MPIDI_CH3_iStartMsg(vc, cts_pkt, sizeof(*cts_pkt), &cts_req); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|ctspkt"); } if (cts_req != NULL) { MPID_Request_release(cts_req); } } else { MPIDI_DBG_PRINTF((30, FCNAME, "unexpected request allocated")); MPID_Request_initialized_set(rreq); /* * A MPID_Probe() may be waiting for the request we just inserted, so we need to tell the progress engine to exit. * * FIXME: This will cause MPID_Progress_wait() to return to the MPI layer each time an unexpected RTS packet is * received. MPID_Probe() should atomically increment a counter and MPIDI_CH3_Progress_signal_completion() * should only be called if that counter is greater than zero. */ MPIDI_CH3_Progress_signal_completion(); } *rreqp = NULL; break; } case MPIDI_CH3_PKT_RNDV_CLR_TO_SEND: { MPIDI_CH3_Pkt_rndv_clr_to_send_t * cts_pkt = &pkt->rndv_clr_to_send; MPID_Request * sreq; MPID_Request * rts_sreq; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_rndv_send_t * rs_pkt = &upkt.rndv_send; int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t data_sz; MPID_Datatype * dt_ptr; MPID_IOV iov[MPID_IOV_LIMIT]; int iov_n; int mpi_errno = MPI_SUCCESS; MPIDI_DBG_PRINTF((30, FCNAME, "received rndv CTS pkt")); MPID_Request_get_ptr(cts_pkt->sender_req_id, sreq); MPIU_DBG_PRINTF(("received cts, count=%d\n", sreq->dev.user_count)); /* Release the RTS request if one exists. MPID_Request_fetch_and_clear_rts_sreq() needs to be atomic to prevent cancel send from cancelling the wrong (future) request. If MPID_Request_fetch_and_clear_rts_sreq() returns a NULL rts_sreq, then MPID_Cancel_send() is responsible for releasing the RTS request object. */ MPIDI_Request_fetch_and_clear_rts_sreq(sreq, &rts_sreq); if (rts_sreq != NULL) { MPID_Request_release(rts_sreq); } MPIDI_Pkt_init(rs_pkt, MPIDI_CH3_PKT_RNDV_SEND); rs_pkt->receiver_req_id = cts_pkt->receiver_req_id; iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rs_pkt; iov[0].MPID_IOV_LEN = sizeof(*rs_pkt); MPIDI_Datatype_get_info(sreq->dev.user_count, sreq->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb); if (dt_contig) { MPIDI_DBG_PRINTF((30, FCNAME, "sending contiguous rndv data, data_sz=" MPIDI_MSG_SZ_FMT, data_sz)); sreq->dev.ca = MPIDI_CH3_CA_COMPLETE; iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *)sreq->dev.user_buf + dt_true_lb); iov[1].MPID_IOV_LEN = data_sz; iov_n = 2; } else { MPID_Segment_init(sreq->dev.user_buf, sreq->dev.user_count, sreq->dev.datatype, &sreq->dev.segment, 0); iov_n = MPID_IOV_LIMIT - 1; sreq->dev.segment_first = 0; sreq->dev.segment_size = data_sz; mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[1], &iov_n); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|loadsendiov"); } iov_n += 1; } mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iov_n); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|senddata"); } *rreqp = NULL; break; } case MPIDI_CH3_PKT_RNDV_SEND: { MPIDI_CH3_Pkt_rndv_send_t * rs_pkt = &pkt->rndv_send; MPIDI_DBG_PRINTF((30, FCNAME, "received rndv send (data) pkt")); MPID_Request_get_ptr(rs_pkt->receiver_req_id, *rreqp); mpi_errno = MPIDI_CH3U_Post_data_receive(TRUE, rreqp); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_RNDV_SEND"); } break; } case MPIDI_CH3_PKT_CANCEL_SEND_REQ: { MPIDI_CH3_Pkt_cancel_send_req_t * req_pkt = &pkt->cancel_send_req; MPID_Request * rreq; int ack; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_cancel_send_resp_t * resp_pkt = &upkt.cancel_send_resp; MPID_Request * resp_sreq; MPIDI_DBG_PRINTF((30, FCNAME, "received cancel send req pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d", req_pkt->sender_req_id, req_pkt->match.rank, req_pkt->match.tag, req_pkt->match.context_id)); rreq = MPIDI_CH3U_Recvq_FDU(req_pkt->sender_req_id, &req_pkt->match); if (rreq != NULL) { MPIDI_DBG_PRINTF((35, FCNAME, "message cancelled")); if (MPIDI_Request_get_msg_type(rreq) == MPIDI_REQUEST_EAGER_MSG && rreq->dev.recv_data_sz > 0) { MPIU_Free(rreq->dev.tmpbuf); } MPID_Request_release(rreq); ack = TRUE; } else { MPIDI_DBG_PRINTF((35, FCNAME, "unable to cancel message")); ack = FALSE; } MPIDI_Pkt_init(resp_pkt, MPIDI_CH3_PKT_CANCEL_SEND_RESP); resp_pkt->sender_req_id = req_pkt->sender_req_id; resp_pkt->ack = ack; mpi_errno = MPIDI_CH3_iStartMsg(vc, resp_pkt, sizeof(*resp_pkt), &resp_sreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|cancelresp"); } if (resp_sreq != NULL) { MPID_Request_release(resp_sreq); } *rreqp = NULL; break; } case MPIDI_CH3_PKT_CANCEL_SEND_RESP: { MPIDI_CH3_Pkt_cancel_send_resp_t * resp_pkt = &pkt->cancel_send_resp; MPID_Request * sreq; MPIDI_DBG_PRINTF((30, FCNAME, "received cancel send resp pkt, sreq=0x%08x, ack=%d", resp_pkt->sender_req_id, resp_pkt->ack)); MPID_Request_get_ptr(resp_pkt->sender_req_id, sreq); if (resp_pkt->ack) { sreq->status.cancelled = TRUE; if (MPIDI_Request_get_msg_type(sreq) == MPIDI_REQUEST_RNDV_MSG || MPIDI_Request_get_type(sreq) == MPIDI_REQUEST_TYPE_SSEND) { int cc; /* decrement the CC one additional time for the CTS/sync ack that is never going to arrive */ MPIDI_CH3U_Request_decrement_cc(sreq, &cc); } MPIDI_DBG_PRINTF((35, FCNAME, "message cancelled")); } else { MPIDI_DBG_PRINTF((35, FCNAME, "unable to cancel message")); } MPIDI_CH3U_Request_complete(sreq); *rreqp = NULL; break; } case MPIDI_CH3_PKT_PUT: { MPIDI_CH3_Pkt_put_t * put_pkt = &pkt->put; MPID_Request *req; MPIDI_DBG_PRINTF((30, FCNAME, "received put pkt")); if (put_pkt->count == 0) { MPID_Win *win_ptr; /* it's a 0-byte message sent just to decrement the completion counter. This happens only in post/start/complete/wait sync model; therefore, no need to check lock queue. */ if (put_pkt->target_win_handle != MPI_WIN_NULL) { MPID_Win_get_ptr(put_pkt->target_win_handle, win_ptr); /* FIXME: MT: this has to be done atomically */ win_ptr->my_counter -= 1; } MPIDI_CH3_Progress_signal_completion(); *rreqp = NULL; } else { req = MPID_Request_create(); MPIU_Object_set_ref(req, 1); req->dev.user_buf = put_pkt->addr; req->dev.user_count = put_pkt->count; req->dev.target_win_handle = put_pkt->target_win_handle; req->dev.source_win_handle = put_pkt->source_win_handle; if (HANDLE_GET_KIND(put_pkt->datatype) == HANDLE_KIND_BUILTIN) { MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RESP); req->dev.datatype = put_pkt->datatype; MPID_Datatype_get_size_macro(put_pkt->datatype, type_size); req->dev.recv_data_sz = type_size * put_pkt->count; *rreqp = req; mpi_errno = MPIDI_CH3U_Post_data_receive(TRUE, rreqp); } else { /* derived datatype */ MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RESP_DERIVED_DT); req->dev.datatype = MPI_DATATYPE_NULL; req->dev.dtype_info = (MPIDI_RMA_dtype_info *) MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info)); if (! req->dev.dtype_info) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem"); } req->dev.dataloop = MPIU_Malloc(put_pkt->dataloop_size); if (! req->dev.dataloop) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem"); } req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)req->dev.dtype_info; req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info); req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)req->dev.dataloop; req->dev.iov[1].MPID_IOV_LEN = put_pkt->dataloop_size; req->dev.iov_count = 2; req->dev.ca = MPIDI_CH3_CA_COMPLETE; *rreqp = req; } /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT"); } /* --END ERROR HANDLING-- */ } break; } case MPIDI_CH3_PKT_ACCUMULATE: { MPIDI_CH3_Pkt_accum_t * accum_pkt = &pkt->accum; MPID_Request *req; MPI_Aint true_lb, true_extent, extent; void *tmp_buf; MPIDI_DBG_PRINTF((30, FCNAME, "received accumulate pkt")); req = MPID_Request_create(); MPIU_Object_set_ref(req, 1); *rreqp = req; req->dev.user_count = accum_pkt->count; req->dev.op = accum_pkt->op; req->dev.real_user_buf = accum_pkt->addr; req->dev.target_win_handle = accum_pkt->target_win_handle; req->dev.source_win_handle = accum_pkt->source_win_handle; if (HANDLE_GET_KIND(accum_pkt->datatype) == HANDLE_KIND_BUILTIN) { MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP); req->dev.datatype = accum_pkt->datatype; MPIR_Nest_incr(); mpi_errno = NMPI_Type_get_true_extent(accum_pkt->datatype, &true_lb, &true_extent); MPIR_Nest_decr(); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { MPIR_Err_create_code(mpi_errno , MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent); tmp_buf = MPIU_Malloc(accum_pkt->count * (MPIR_MAX(extent,true_extent))); /* --BEGIN ERROR HANDLING-- */ if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); return mpi_errno; } /* --END ERROR HANDLING-- */ /* adjust for potential negative lower bound in datatype */ tmp_buf = (void *)((char*)tmp_buf - true_lb); req->dev.user_buf = tmp_buf; MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size); req->dev.recv_data_sz = type_size * accum_pkt->count; mpi_errno = MPIDI_CH3U_Post_data_receive(TRUE, rreqp); } else { MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP_DERIVED_DT); req->dev.datatype = MPI_DATATYPE_NULL; req->dev.dtype_info = (MPIDI_RMA_dtype_info *) MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -