📄 ch3u_eager.c
字号:
not consume all data) or a too-short buffer. We need to distinguish between these two types. */ rreq->status.count = (int)last; if (rreq->dev.recv_data_sz <= userbuf_sz) { MPIU_ERR_SETSIMPLE(rreq->status.MPI_ERROR,MPI_ERR_TYPE, "**dtypemismatch"); } /* --END ERROR HANDLING-- */ } rreq->dev.OnDataAvail = 0; } } else { MPIDI_msg_sz_t data_sz; /* This is easy; copy the data into a temporary buffer. To begin with, we use the same temporary location as is used in receiving eager unexpected data. */ /* FIXME: When eagershort is enabled, provide a preallocated space for short messages (which is used even if eager short is not used), since we don't want to have a separate check to figure out which buffer we're using (or perhaps we should have a free-buffer-pointer, which can be null if it isn't a buffer that we've allocated). */ /* printf( "Allocating into tmp\n" ); fflush(stdout); */ data_sz = rreq->dev.recv_data_sz; rreq->dev.tmpbuf = MPIU_Malloc(data_sz); if (!rreq->dev.tmpbuf) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem"); } rreq->dev.tmpbuf_sz = data_sz; /* Copy the payload. We could optimize this if data_sz & 0x3 == 0 (copy (data_sz >> 2) ints, inline that since data size is currently limited to 4 ints */ { unsigned char const * restrict p = (unsigned char *)eagershort_pkt->data; unsigned char * restrict bufp = (unsigned char *)rreq->dev.tmpbuf; int i; for (i=0; i<data_sz; i++) { *bufp++ = *p++; } } /* printf( "Unexpected eager short\n" ); fflush(stdout); */ /* These next two indicate that once matched, there is one more step (the unpack into the user buffer) to perform. */ rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_UnpackUEBufComplete; rreq->dev.recv_pending_count = 1; } if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_EAGERSHORT_SEND"); } } /* The semantics of the packet handlers is that a returned request means that additional actions are required on the request */ /* We also signal completion (without this, the progress engine may fail to return from a Progress_wait; the probe-unexp test failed without this Progress_signal_completion call) */ MPIDI_CH3_Progress_signal_completion(); fn_fail: return mpi_errno;}#endif/* Send a contiguous eager message that can be cancelled (e.g., a nonblocking eager send). We'll want to optimize (and possibly inline) this Make sure that buf is at the beginning of the data to send; adjust by adding dt_true_lb if necessary */#undef FUNCNAME#define FUNCNAME MPIDI_EagerContigIsend#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_EagerContigIsend( MPID_Request **sreq_p, MPIDI_CH3_Pkt_type_t reqtype, const void * buf, MPIDI_msg_sz_t data_sz, int rank, int tag, MPID_Comm * comm, int context_offset ){ int mpi_errno = MPI_SUCCESS; MPIDI_VC_t * vc; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_eager_send_t * const eager_pkt = &upkt.eager_send; MPID_Request *sreq = *sreq_p; MPID_IOV iov[MPID_IOV_LIMIT]; MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST, "sending contiguous eager message, data_sz=" MPIDI_MSG_SZ_FMT, data_sz)); sreq->dev.OnDataAvail = 0; MPIDI_Pkt_init(eager_pkt, reqtype); eager_pkt->match.rank = comm->rank; eager_pkt->match.tag = tag; eager_pkt->match.context_id = comm->context_id + context_offset; eager_pkt->sender_req_id = sreq->handle; eager_pkt->data_sz = data_sz; iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)eager_pkt; iov[0].MPID_IOV_LEN = sizeof(*eager_pkt); iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) buf; iov[1].MPID_IOV_LEN = data_sz; MPIDI_Comm_get_vc(comm, rank, &vc); MPIDI_VC_FAI_send_seqnum(vc, seqnum); MPIDI_Pkt_set_seqnum(eager_pkt, seqnum); MPIDI_Request_set_seqnum(sreq, seqnum); MPIU_DBG_MSGPKT(vc,tag,eager_pkt->match.context_id,rank,data_sz,"EagerIsend"); mpi_errno = MPIU_CALL(MPIDI_CH3,iSendv(vc, sreq, iov, 2 )); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPIU_Object_set_ref(sreq, 0); MPIDI_CH3_Request_destroy(sreq); *sreq_p = NULL; mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|eagermsg", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ fn_exit: return mpi_errno;}/* * Here are the routines that are called by the progress engine to handle * the various rendezvous message requests (cancel of sends is in * mpid_cancel_send.c). */ #define set_request_info(rreq_, pkt_, msg_type_) \{ \ (rreq_)->status.MPI_SOURCE = (pkt_)->match.rank; \ (rreq_)->status.MPI_TAG = (pkt_)->match.tag; \ (rreq_)->status.count = (pkt_)->data_sz; \ (rreq_)->dev.sender_req_id = (pkt_)->sender_req_id; \ (rreq_)->dev.recv_data_sz = (pkt_)->data_sz; \ MPIDI_Request_set_seqnum((rreq_), (pkt_)->seqnum); \ MPIDI_Request_set_msg_type((rreq_), (msg_type_)); \}/* FIXME: This is not optimized for short messages, which should have the data in the same packet when the data is particularly short (e.g., one 8 byte long word) */int MPIDI_CH3_PktHandler_EagerSend( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_eager_send_t * eager_pkt = &pkt->eager_send; MPID_Request * rreq; int found; int complete; char *data_buf; MPIDI_msg_sz_t data_len; int mpi_errno = MPI_SUCCESS; MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST, "received eager send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d", eager_pkt->sender_req_id, eager_pkt->match.rank, eager_pkt->match.tag, eager_pkt->match.context_id)); MPIU_DBG_MSGPKT(vc,eager_pkt->match.tag,eager_pkt->match.context_id, eager_pkt->match.rank,eager_pkt->data_sz, "ReceivedEager"); rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&eager_pkt->match, &found); if (rreq == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq"); } set_request_info(rreq, eager_pkt, MPIDI_REQUEST_EAGER_MSG); data_len = ((*buflen - sizeof(MPIDI_CH3_Pkt_t) >= rreq->dev.recv_data_sz) ? rreq->dev.recv_data_sz : *buflen - sizeof(MPIDI_CH3_Pkt_t)); data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t); if (rreq->dev.recv_data_sz == 0) { /* return the number of bytes processed in this function */ *buflen = sizeof(MPIDI_CH3_Pkt_t); MPIDI_CH3U_Request_complete(rreq); *rreqp = NULL; } else { if (found) { mpi_errno = MPIDI_CH3U_Receive_data_found( rreq, data_buf, &data_len, &complete ); } else { mpi_errno = MPIDI_CH3U_Receive_data_unexpected( rreq, data_buf, &data_len, &complete ); } if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_EAGER_SEND"); } /* return the number of bytes processed in this function */ *buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len; if (complete) { MPIDI_CH3U_Request_complete(rreq); *rreqp = NULL; } else { *rreqp = rreq; } } fn_fail: return mpi_errno;}int MPIDI_CH3_PktHandler_ReadySend( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_ready_send_t * ready_pkt = &pkt->ready_send; MPID_Request * rreq; int found; int complete; char *data_buf; MPIDI_msg_sz_t data_len; int mpi_errno = MPI_SUCCESS; MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST, "received ready send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d", ready_pkt->sender_req_id, ready_pkt->match.rank, ready_pkt->match.tag, ready_pkt->match.context_id)); MPIU_DBG_MSGPKT(vc,ready_pkt->match.tag,ready_pkt->match.context_id, ready_pkt->match.rank,ready_pkt->data_sz, "ReceivedReady"); rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&ready_pkt->match, &found); if (rreq == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomemreq"); } set_request_info(rreq, ready_pkt, MPIDI_REQUEST_EAGER_MSG); data_len = ((*buflen - sizeof(MPIDI_CH3_Pkt_t) >= rreq->dev.recv_data_sz) ? rreq->dev.recv_data_sz : *buflen - sizeof(MPIDI_CH3_Pkt_t)); data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t); if (found) { if (rreq->dev.recv_data_sz == 0) { /* return the number of bytes processed in this function */ *buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len;; MPIDI_CH3U_Request_complete(rreq); *rreqp = NULL; } else { mpi_errno = MPIDI_CH3U_Receive_data_found(rreq, data_buf, &data_len, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_READY_SEND"); } /* return the number of bytes processed in this function */ *buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len; if (complete) { MPIDI_CH3U_Request_complete(rreq); *rreqp = NULL; } else { *rreqp = rreq; } } } else { /* FIXME: an error packet should be sent back to the sender indicating that the ready-send failed. On the send side, the error handler for the communicator can be invoked even if the ready-send request has already completed. */ /* We need to consume any outstanding associated data and mark the request with an error. */ rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**rsendnomatch", "**rsendnomatch %d %d", ready_pkt->match.rank, ready_pkt->match.tag); rreq->status.count = 0; if (rreq->dev.recv_data_sz > 0) { /* force read of extra data */ *rreqp = rreq; rreq->dev.segment_first = 0; rreq->dev.segment_size = 0; mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|loadrecviov"); } } else { /* mark data transfer as complete and decrement CC */ MPIDI_CH3U_Request_complete(rreq); *rreqp = NULL; } /* we didn't process anything but the header in this case */ *buflen = sizeof(MPIDI_CH3_Pkt_t); } fn_fail: return mpi_errno;}/* * Define the routines that can print out the cancel packets if * debugging is enabled. */#ifdef MPICH_DBG_OUTPUTint MPIDI_CH3_PktPrint_EagerSend( FILE *fp, MPIDI_CH3_Pkt_t *pkt ){ MPIU_DBG_PRINTF((" type ......... EAGER_SEND\n")); MPIU_DBG_PRINTF((" sender_reqid . 0x%08X\n", pkt->eager_send.sender_req_id)); MPIU_DBG_PRINTF((" context_id ... %d\n", pkt->eager_send.match.context_id)); MPIU_DBG_PRINTF((" tag .......... %d\n", pkt->eager_send.match.tag)); MPIU_DBG_PRINTF((" rank ......... %d\n", pkt->eager_send.match.rank)); MPIU_DBG_PRINTF((" data_sz ...... %d\n", pkt->eager_send.data_sz));#ifdef MPID_USE_SEQUENCE_NUMBERS MPIU_DBG_PRINTF((" seqnum ....... %d\n", pkt->eager_send.seqnum));#endif}#if defined(USE_EAGER_SHORT)int MPIDI_CH3_PktPrint_EagerShortSend( FILE *fp, MPIDI_CH3_Pkt_t *pkt ){ int datalen; unsigned char *p = (unsigned char *)pkt->eagershort_send.data; MPIU_DBG_PRINTF((" type ......... EAGERSHORT_SEND\n")); MPIU_DBG_PRINTF((" context_id ... %d\n", pkt->eagershort_send.match.context_id)); MPIU_DBG_PRINTF((" tag .......... %d\n", pkt->eagershort_send.match.tag)); MPIU_DBG_PRINTF((" rank ......... %d\n", pkt->eagershort_send.match.rank)); MPIU_DBG_PRINTF((" data_sz ...... %d\n", pkt->eagershort_send.data_sz));#ifdef MPID_USE_SEQUENCE_NUMBERS MPIU_DBG_PRINTF((" seqnum ....... %d\n", pkt->eagershort_send.seqnum));#endif datalen = pkt->eagershort_send.data_sz; if (datalen > 0) { char databytes[64+1]; int i; if (datalen > 32) datalen = 32; for (i=0; i<datalen; i++) { MPIU_Snprintf( &databytes[2*i], 64 - 2*i, "%2x", p[i] ); } MPIU_DBG_PRINTF((" data ......... %s\n", databytes)); }}#endif /* defined(USE_EAGER_SHORT) */int MPIDI_CH3_PktPrint_ReadySend( FILE *fp, MPIDI_CH3_Pkt_t *pkt ){ MPIU_DBG_PRINTF((" type ......... READY_SEND\n")); MPIU_DBG_PRINTF((" sender_reqid . 0x%08X\n", pkt->ready_send.sender_req_id)); MPIU_DBG_PRINTF((" context_id ... %d\n", pkt->ready_send.match.context_id)); MPIU_DBG_PRINTF((" tag .......... %d\n", pkt->ready_send.match.tag)); MPIU_DBG_PRINTF((" rank ......... %d\n", pkt->ready_send.match.rank)); MPIU_DBG_PRINTF((" data_sz ...... %d\n", pkt->ready_send.data_sz));#ifdef MPID_USE_SEQUENCE_NUMBERS MPIU_DBG_PRINTF((" seqnum ....... %d\n", pkt->ready_send.seqnum));#endif}#endif /* MPICH_DBG_OUTPUT */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -