📄 ch3_progress.c
字号:
MPIDI_CH3_Pkt_lock_get_unlock_t * lock_get_unlock_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->lock_get_unlock; fprintf(stdout,"ERROR : MPIDI_CH3_PKT_LOCK_GET_UNLOCK not handled (yet) \n"); } break; default: { /* nothing */ } } if (!in_fbox) { MPIDI_VC_t *vc; MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE (cell), &vc); MPID_nem_mpich2_release_cell (cell, vc); } else { MPID_nem_mpich2_release_fbox (cell); } if(*foundp == FALSE) { /* the cell does not match the request: create one */ /* this is the request that sould be returned ! */ goto make_req; } } else { make_req: rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.match.tag = tag; rreq->dev.match.rank = source; rreq->dev.match.context_id = context_id; rreq->dev.next = NULL; if (*MPID_Recvq_posted_tail_ptr != NULL) { (*MPID_Recvq_posted_tail_ptr)->dev.next = rreq; } else { *MPID_Recvq_posted_head_ptr = rreq; } *MPID_Recvq_posted_tail_ptr = rreq; MPIDI_POSTED_RECV_ENQUEUE_HOOK (rreq); } } } exit_fn: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_POKE_WITH_MATCHING); return rreq;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_ipoke_with_matching#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request * MPIDI_CH3_Progress_ipoke_with_matching (int source, int tag, MPID_Comm *comm,int context_id,int *foundp, void *buf, int count, MPI_Datatype datatype,MPI_Status * status){ int mpi_errno = MPI_SUCCESS; MPID_Request *rreq = NULL; MPID_nem_cell_ptr_t cell = NULL; int in_fbox; int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t userbuf_sz; MPID_Datatype *dt_ptr; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_IPOKE_WITH_MATCHING); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_IPOKE_WITH_MATCHING); MPIDI_DBG_PRINTF((50, FCNAME, "entering, buf=%p, count=%d, dtype=%d", buf,count,datatype)); *foundp = FALSE ; MPIDI_Datatype_get_info(count, datatype, dt_contig, userbuf_sz, dt_ptr, dt_true_lb); /* handle only contiguous types (for now) and one-cell packets */ if((dt_contig) && (( userbuf_sz <= MPID_NEM__BYPASS_Q_MAX_VAL))) { MPID_nem_mpich2_test_recv_wait (&cell, &in_fbox,1000); if (cell) { char *cell_buf = cell->pkt.mpich2.payload; switch(((MPIDI_CH3_Pkt_t *)cell_buf)->type) { case MPIDI_CH3_PKT_EAGER_SEND: { MPIDI_CH3_Pkt_eager_send_t *eager_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->eager_send; int payload_len = eager_pkt->data_sz; cell_buf += sizeof (MPIDI_CH3_Pkt_t); if(((eager_pkt->match.tag == tag )||(tag == MPI_ANY_TAG )) && ((eager_pkt->match.rank == source)||(source == MPI_ANY_SOURCE)) && (eager_pkt->match.context_id == context_id)) { /* cell matches */ *foundp = TRUE; if (payload_len > 0) { if (payload_len <= userbuf_sz) { MPID_NEM_MEMCPY((char *)(buf+ dt_true_lb), cell_buf,payload_len); } else { /* error : truncate */ MPID_NEM_MEMCPY((char *)(buf+dt_true_lb),cell_buf, userbuf_sz); status->MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE,"**truncate", "**truncate %d %d %d %d", status->MPI_SOURCE,status->MPI_TAG,payload_len, userbuf_sz ); mpi_errno = status->MPI_ERROR; goto exit_fn; } } } else { /* create a request for the cell, enqueue it on the unexpected queue */ rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.match.tag = eager_pkt->match.tag ; rreq->dev.match.rank = eager_pkt->match.rank; rreq->dev.match.context_id = eager_pkt->match.context_id; rreq->dev.tmpbuf = MPIU_Malloc(userbuf_sz); MPID_NEM_MEMCPY((char *)(rreq->dev.tmpbuf),cell_buf, userbuf_sz); rreq->dev.next = NULL; if (MPIDI_Process.recvq_unexpected_tail != NULL) { MPIDI_Process.recvq_unexpected_tail->dev.next = rreq; } else { MPIDI_Process.recvq_unexpected_head = rreq; } MPIDI_Process.recvq_unexpected_tail = rreq; } } } break; case MPIDI_CH3_PKT_READY_SEND: { MPIDI_CH3_Pkt_ready_send_t *ready_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->ready_send; fprintf(stdout,"ERROR : MPIDI_CH3_PKT_READY_SEND not handled (yet) \n"); } break; case MPIDI_CH3_PKT_EAGER_SYNC_SEND: { MPIDI_CH3_Pkt_eager_send_t *es_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->eager_send; int payload_len = es_pkt->data_sz; cell_buf += sizeof (MPIDI_CH3_Pkt_t); if(((es_pkt->match.tag == tag )||(tag == MPI_ANY_TAG )) && ((es_pkt->match.rank == source)||(source == MPI_ANY_SOURCE)) && (es_pkt->match.context_id == context_id)) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_eager_sync_ack_t * const esa_pkt = &upkt.eager_sync_ack; MPID_Request * esa_req = NULL; MPIDI_VC_t *vc; /* cell matches */ *foundp = TRUE; if (payload_len > 0) { if (payload_len <= userbuf_sz) { MPID_NEM_MEMCPY((char *)(buf+ dt_true_lb), cell_buf,payload_len); } else { /* error : truncate */ MPID_NEM_MEMCPY((char *)(buf+dt_true_lb),cell_buf, userbuf_sz); status->MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE,"**truncate", "**truncate %d %d %d %d", status->MPI_SOURCE,status->MPI_TAG,payload_len, userbuf_sz ); mpi_errno = status->MPI_ERROR; goto exit_fn; } } /* send Ack back */ MPIDI_Pkt_init(esa_pkt, MPIDI_CH3_PKT_EAGER_SYNC_ACK); esa_pkt->sender_req_id = es_pkt->sender_req_id; if (in_fbox) { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE (cell), &vc); } else { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE (cell), &vc); } mpi_errno = MPIDI_CH3_iStartMsg(vc, esa_pkt, sizeof(*esa_pkt), &esa_req); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,"**ch3|syncack", 0); goto exit_fn; } /* --END ERROR HANDLING-- */ if (esa_req != NULL) { MPID_Request_release(esa_req); } } else { /* create a request for the cell, enqueue it on the unexpected queue */ rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.match.tag = es_pkt->match.tag ; rreq->dev.match.rank = es_pkt->match.rank; rreq->dev.match.context_id = es_pkt->match.context_id; rreq->dev.tmpbuf = MPIU_Malloc(userbuf_sz); MPID_NEM_MEMCPY((char *)(rreq->dev.tmpbuf),cell_buf, userbuf_sz); rreq->dev.next = NULL; if (MPIDI_Process.recvq_unexpected_tail != NULL) { MPIDI_Process.recvq_unexpected_tail->dev.next = rreq; } else { MPIDI_Process.recvq_unexpected_head = rreq; } MPIDI_Process.recvq_unexpected_tail = rreq; MPIDI_Request_set_sync_send_flag(rreq,TRUE); } } } break; case MPIDI_CH3_PKT_EAGER_SYNC_ACK: { MPIDI_CH3_Pkt_eager_sync_ack_t * esa_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->eager_sync_ack; MPID_Request * sreq; MPID_Request_get_ptr(esa_pkt->sender_req_id, sreq); MPIDI_CH3U_Request_complete(sreq); } break; case MPIDI_CH3_PKT_RNDV_REQ_TO_SEND: { /* this case in currently disabled since cells are smaller than eager msgs, but ... */ MPIDI_CH3_Pkt_rndv_req_to_send_t *rts_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->rndv_req_to_send; rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.next = NULL; } if(((rts_pkt->match.tag == tag )||(tag == MPI_ANY_TAG )) && ((rts_pkt->match.rank == source)||(source == MPI_ANY_SOURCE)) && (rts_pkt->match.context_id == context_id)) { *foundp = TRUE; rreq->dev.match.tag = tag; rreq->dev.match.rank = source; rreq->dev.match.context_id = context_id; rreq->comm = comm; MPIR_Comm_add_ref(comm); rreq->dev.user_buf = buf; rreq->dev.user_count = count; rreq->dev.datatype = datatype; set_request_info(rreq,rts_pkt, MPIDI_REQUEST_RNDV_MSG); } else { /* enqueue rreq on the unexp queue */ rreq->dev.match.tag = rts_pkt->match.tag; rreq->dev.match.rank = rts_pkt->match.rank; rreq->dev.match.context_id = rts_pkt->match.context_id; if (MPIDI_Process.recvq_unexpected_tail != NULL) { MPIDI_Process.recvq_unexpected_tail->dev.next = rreq; } else { MPIDI_Process.recvq_unexpected_head = rreq; } MPIDI_Process.recvq_unexpected_tail = rreq; } } break; case MPIDI_CH3_PKT_RNDV_CLR_TO_SEND: { MPIDI_CH3_Pkt_rndv_clr_to_send_t *cts_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->rndv_clr_to_send; MPID_Request *sreq; MPID_Request *rts_sreq; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_rndv_send_t * rs_pkt = &upkt.rndv_send; int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t data_sz; MPID_Datatype *dt_ptr; MPID_IOV iov[MPID_IOV_LIMIT]; int iov_n; MPIDI_VC_t *vc; MPID_Request_get_ptr(cts_pkt->sender_req_id, sreq); MPIDI_Request_fetch_and_clear_rts_sreq(sreq, &rts_sreq); if (rts_sreq != NULL) { MPID_Request_release(rts_sreq); } MPIDI_Pkt_init(rs_pkt, MPIDI_CH3_PKT_RNDV_SEND); rs_pkt->receiver_req_id = cts_pkt->receiver_req_id; iov[0].MPID_IOV_BUF = (void*)rs_pkt; iov[0].MPID_IOV_LEN = sizeof(*rs_pkt); MPIDI_Datatype_get_info(sreq->dev.user_count, sreq->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb); if (dt_contig) { sreq->dev.OnDataAvail = 0; iov[1].MPID_IOV_BUF = (char *)sreq->dev.user_buf + dt_true_lb; iov[1].MPID_IOV_LEN = data_sz; iov_n = 2; } else { MPID_Segment_init(sreq->dev.user_buf, sreq->dev.user_count, sreq->dev.datatype, &sreq->dev.segment,0); iov_n = MPID_IOV_LIMIT - 1; sreq->dev.segment_first = 0; sreq->dev.segment_size = data_sz; mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[1], &iov_n); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|loadsendiov", 0); goto exit_fn; } /* --END ERROR HANDLING-- */ iov_n += 1; } if (in_fbox) { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE (cell), &vc); } else { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE (cell), &vc); } mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iov_n); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|senddata", 0); goto exit_fn; } /* --END ERROR HANDLING-- */ } break; case MPIDI_CH3_PKT_RNDV_SEND: { /* this case can't happen since there is a posted request for the recv */ /* this code is only active when both queues are empty */ } break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -