📄 ch3_progress.c
字号:
{ if (payload_len <= userbuf_sz) { MPID_NEM_MEMCPY((char *)(buf+ dt_true_lb), cell_buf,payload_len); } else { /* error : truncate */ MPID_NEM_MEMCPY((char *)(buf+dt_true_lb),cell_buf, userbuf_sz); status->MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE,"**truncate", "**truncate %d %d %d %d", status->MPI_SOURCE,status->MPI_TAG,payload_len, userbuf_sz ); mpi_errno = status->MPI_ERROR; goto exit_fn; } } /* send Ack back */ MPIDI_Pkt_init(esa_pkt, MPIDI_CH3_PKT_EAGER_SYNC_ACK); esa_pkt->sender_req_id = es_pkt->sender_req_id; if (in_fbox) { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE (cell), &vc); } else { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE (cell), &vc); } mpi_errno = MPIDI_CH3_iStartMsg(vc, esa_pkt, sizeof(*esa_pkt), &esa_req); MPIU_ERR_CHKANDJUMP (mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|syncack"); if (esa_req != NULL) { MPID_Request_release(esa_req); } } else { /* create a request for the cell, enqueue it on the unexpected queue */ rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.match.tag = es_pkt->match.tag ; rreq->dev.match.rank = es_pkt->match.rank; rreq->dev.match.context_id = es_pkt->match.context_id; rreq->dev.tmpbuf = MPIU_Malloc(userbuf_sz); MPID_NEM_MEMCPY((char *)(rreq->dev.tmpbuf),cell_buf, userbuf_sz); rreq->dev.next = NULL; if (*MPID_Recvq_unexpected_tail_ptr != NULL) { (*MPID_Recvq_unexpected_tail_ptr)->dev.next = rreq; } else { *MPID_Recvq_unexpected_head_ptr = rreq; } *MPID_Recvq_unexpected_tail_ptr = rreq; MPIDI_Request_set_sync_send_flag(rreq,TRUE); } } } break; case MPIDI_CH3_PKT_EAGER_SYNC_ACK: { MPIDI_CH3_Pkt_eager_sync_ack_t * esa_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->eager_sync_ack; MPID_Request * sreq; MPID_Request_get_ptr(esa_pkt->sender_req_id, sreq); MPIDI_CH3U_Request_complete(sreq); } break; case MPIDI_CH3_PKT_RNDV_REQ_TO_SEND: { /* this case in currently disabled since cells are smaller than eager msgs, but ... */ MPIDI_CH3_Pkt_rndv_req_to_send_t *rts_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->rndv_req_to_send; rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.next = NULL; } if(((rts_pkt->match.tag == tag )||(tag == MPI_ANY_TAG )) && ((rts_pkt->match.rank == source)||(source == MPI_ANY_SOURCE)) && (rts_pkt->match.context_id == context_id)) { *foundp = TRUE; rreq->dev.match.tag = tag; rreq->dev.match.rank = source; rreq->dev.match.context_id = context_id; rreq->comm = comm; MPIR_Comm_add_ref(comm); rreq->dev.user_buf = buf; rreq->dev.user_count = count; rreq->dev.datatype = datatype; set_request_info(rreq,rts_pkt, MPIDI_REQUEST_RNDV_MSG); } else { /* enqueue rreq on the unexp queue */ rreq->dev.match.tag = rts_pkt->match.tag; rreq->dev.match.rank = rts_pkt->match.rank; rreq->dev.match.context_id = rts_pkt->match.context_id; if (*MPID_Recvq_unexpected_tail_ptr != NULL) { (*MPID_Recvq_unexpected_tail_ptr)->dev.next = rreq; } else { *MPID_Recvq_unexpected_head_ptr = rreq; } *MPID_Recvq_unexpected_tail_ptr = rreq; } } break; case MPIDI_CH3_PKT_RNDV_CLR_TO_SEND: { MPIDI_CH3_Pkt_rndv_clr_to_send_t *cts_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->rndv_clr_to_send; MPID_Request *sreq; MPID_Request *rts_sreq; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_rndv_send_t * rs_pkt = &upkt.rndv_send; int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t data_sz; MPID_Datatype *dt_ptr; MPID_IOV iov[MPID_IOV_LIMIT]; int iov_n; MPIDI_VC_t *vc; MPID_Request_get_ptr(cts_pkt->sender_req_id, sreq); MPIDI_Request_fetch_and_clear_rts_sreq(sreq, &rts_sreq); if (rts_sreq != NULL) { MPID_Request_release(rts_sreq); } MPIDI_Pkt_init(rs_pkt, MPIDI_CH3_PKT_RNDV_SEND); rs_pkt->receiver_req_id = cts_pkt->receiver_req_id; iov[0].MPID_IOV_BUF = (void*)rs_pkt; iov[0].MPID_IOV_LEN = sizeof(*rs_pkt); MPIDI_Datatype_get_info(sreq->dev.user_count, sreq->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb); if (dt_contig) { sreq->dev.OnDataAvail = 0; iov[1].MPID_IOV_BUF = (char *)sreq->dev.user_buf + dt_true_lb; iov[1].MPID_IOV_LEN = data_sz; iov_n = 2; } else { MPID_Segment_init(sreq->dev.user_buf, sreq->dev.user_count, sreq->dev.datatype, &sreq->dev.segment,0); iov_n = MPID_IOV_LIMIT - 1; sreq->dev.segment_first = 0; sreq->dev.segment_size = data_sz; mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[1], &iov_n); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|loadsendiov", 0); goto exit_fn; } /* --END ERROR HANDLING-- */ iov_n += 1; } if (in_fbox) { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE (cell), &vc); } else { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE (cell), &vc); } mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iov_n); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|senddata", 0); goto exit_fn; } /* --END ERROR HANDLING-- */ } break; case MPIDI_CH3_PKT_RNDV_SEND: { /* this case can't happen since there is a posted request for the recv */ /* this code is only active when both queues are empty */ } break; case MPIDI_CH3_PKT_CANCEL_SEND_REQ: { MPIDI_CH3_Pkt_cancel_send_req_t * req_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->cancel_send_req; MPID_Request * rreq; int ack; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_cancel_send_resp_t * resp_pkt = &upkt.cancel_send_resp; MPID_Request * resp_sreq; MPIDI_VC_t *vc; rreq = MPIDI_CH3U_Recvq_FDU(req_pkt->sender_req_id, &req_pkt->match); if (rreq != NULL) { if (MPIDI_Request_get_msg_type(rreq) == MPIDI_REQUEST_EAGER_MSG && rreq->dev.recv_data_sz > 0) { MPIU_Free(rreq->dev.tmpbuf); } MPID_Request_release(rreq); ack = TRUE; } else { ack = FALSE; } MPIDI_Pkt_init(resp_pkt, MPIDI_CH3_PKT_CANCEL_SEND_RESP); resp_pkt->sender_req_id = req_pkt->sender_req_id; resp_pkt->ack = ack; if (in_fbox) { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE (cell), &vc); } else { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE (cell), &vc); } mpi_errno = MPIDI_CH3_iStartMsg(vc, resp_pkt, sizeof(*resp_pkt), &resp_sreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**\ch3|cancelresp", 0); goto exit_fn; } /* --END ERROR HANDLING-- */ if (resp_sreq != NULL) { MPID_Request_release(resp_sreq); } } break; case MPIDI_CH3_PKT_CANCEL_SEND_RESP: { MPIDI_CH3_Pkt_cancel_send_resp_t * resp_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->cancel_send_resp; MPID_Request * sreq; MPID_Request_get_ptr(resp_pkt->sender_req_id, sreq); if (resp_pkt->ack) { sreq->status.cancelled = TRUE; if (MPIDI_Request_get_msg_type(sreq) == MPIDI_REQUEST_RNDV_MSG || MPIDI_Request_get_type(sreq) == MPIDI_REQUEST_TYPE_SSEND) { int cc; MPIDI_CH3U_Request_decrement_cc(sreq, &cc); } } else { MPIDI_DBG_PRINTF((35, FCNAME, "unable to cancel message")); } MPIDI_CH3U_Request_complete(sreq); } break; case MPIDI_CH3_PKT_PUT: { MPIDI_CH3_Pkt_put_t * put_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->put; fprintf(stdout,"ERROR : MPIDI_CH3_PKT_PUT not handled (yet) \n"); } break; case MPIDI_CH3_PKT_ACCUMULATE: { MPIDI_CH3_Pkt_accum_t * accum_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->accum; fprintf(stdout,"ERROR : MPIDI_CH3_PKT_ACCUMULATE not handled (yet) \n"); } break; case MPIDI_CH3_PKT_GET: { MPIDI_CH3_Pkt_get_t * get_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->get; fprintf(stdout,"ERROR : MPIDI_CH3_PKT_GET not handled (yet) \n"); } break; case MPIDI_CH3_PKT_GET_RESP: { MPIDI_CH3_Pkt_get_resp_t * get_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->get_resp; fprintf(stdout,"ERROR : MPIDI_CH3_PKT_GET_RESP not handled (yet) \n"); } break; case MPIDI_CH3_PKT_LOCK: { MPIDI_CH3_Pkt_lock_t * lock_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->lock; MPID_Win *win_ptr; MPIDI_VC_t *vc; if (in_fbox) { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE (cell), &vc); } else { MPIDI_PG_Get_vc (MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE (cell), &vc); } MPID_Win_get_ptr(lock_pkt->target_win_handle, win_ptr); if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr,lock_pkt->lock_type) == 1) { mpi_errno = MPIDI_CH3I_Send_lock_granted_pkt(vc,lock_pkt->source_win_handle); } else { /* queue the lock information */ MPIDI_Win_lock_queue *curr_ptr, *prev_ptr, *new_ptr; /* FIXME: MT: This may need to be done atomically. */ curr_ptr = (MPIDI_Win_lock_queue *) win_ptr->lock_queue; prev_ptr = curr_ptr; while (curr_ptr != NULL) { prev_ptr = curr_ptr; curr_ptr = curr_ptr->next; } new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue)); /* --BEGIN ERROR HANDLING-- */ if (!new_ptr) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); goto exit_fn; } /* --END ERROR HANDLING-- */ if (prev_ptr != NULL) prev_ptr->next = new_ptr; else win_ptr->lock_queue = new_ptr; new_ptr->next = NULL; new_ptr->lock_type = lock_pkt->lock_type; new_ptr->source_win_handle = lock_pkt->source_win_handle; new_ptr->vc = vc; new_ptr->pt_single_op = NULL; } } break; case MPIDI_CH3_PKT_LOCK_GRANTED: { MPIDI_CH3_Pkt_lock_granted_t * lock_granted_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->lock_granted; MPID_Win *win_ptr; MPID_Win_get_ptr(lock_granted_pkt->source_win_handle, win_ptr); win_ptr->lock_granted = 1; MPIDI_CH3_Progress_signal_completion(); } break; case MPIDI_CH3_PKT_PT_RMA_DONE: { MPIDI_CH3_Pkt_pt_rma_done_t * pt_rma_done_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->pt_rma_done; MPID_Win *win_ptr; MPID_Win_get_ptr(pt_rma_done_pkt->source_win_handle, win_ptr); win_ptr->lock_granted = 0; MPIDI_CH3_Progress_signal_completion(); } break; case MPIDI_CH3_PKT_LOCK_PUT_UNLOCK: { MPIDI_CH3_Pkt_lock_put_unlock_t * lock_put_unlock_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->lock_put_unlock; fprintf(stdout,"ERROR : MPIDI_CH3_PKT_LOCK_PUT_UNLOCK not handled (yet) \n"); } break; case MPIDI_CH3_PKT_LOCK_GET_UNLOCK: {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -