📄 ch3u_handle_recv_pkt.c
字号:
*rreqp = req; mpi_errno = MPIDI_CH3U_Post_data_receive(TRUE, rreqp); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_LOCK_ACCUM_UNLOCK"); } /* --END ERROR HANDLING-- */ break; } case MPIDI_CH3_PKT_LOCK_GET_UNLOCK: { MPIDI_CH3_Pkt_lock_get_unlock_t * lock_get_unlock_pkt = &pkt->lock_get_unlock; MPID_Win *win_ptr; MPIDI_DBG_PRINTF((30, FCNAME, "received lock_get_unlock pkt")); MPID_Win_get_ptr(lock_get_unlock_pkt->target_win_handle, win_ptr); if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_get_unlock_pkt->lock_type) == 1) { /* do the get. for this optimization, only basic datatypes supported. */ MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_get_resp_t * get_resp_pkt = &upkt.get_resp; MPID_Request *req; MPID_IOV iov[MPID_IOV_LIMIT]; req = MPID_Request_create(); req->dev.target_win_handle = lock_get_unlock_pkt->target_win_handle; req->dev.source_win_handle = lock_get_unlock_pkt->source_win_handle; req->dev.single_op_opt = 1; req->dev.ca = MPIDI_CH3_CA_COMPLETE; MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_RESP); req->kind = MPID_REQUEST_SEND; MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP); get_resp_pkt->request_handle = lock_get_unlock_pkt->request_handle; iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt; iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt); iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)lock_get_unlock_pkt->addr; MPID_Datatype_get_size_macro(lock_get_unlock_pkt->datatype, type_size); iov[1].MPID_IOV_LEN = lock_get_unlock_pkt->count * type_size; mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, 2); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPIU_Object_set_ref(req, 0); MPIDI_CH3_Request_destroy(req); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|rmamsg", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } else { /* queue the information */ MPIDI_Win_lock_queue *curr_ptr, *prev_ptr, *new_ptr; /* FIXME: MT: This may need to be done atomically. */ curr_ptr = (MPIDI_Win_lock_queue *) win_ptr->lock_queue; prev_ptr = curr_ptr; while (curr_ptr != NULL) { prev_ptr = curr_ptr; curr_ptr = curr_ptr->next; } new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue)); if (!new_ptr) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } new_ptr->pt_single_op = (MPIDI_PT_single_op *) MPIU_Malloc(sizeof(MPIDI_PT_single_op)); if (new_ptr->pt_single_op == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } if (prev_ptr != NULL) prev_ptr->next = new_ptr; else win_ptr->lock_queue = new_ptr; new_ptr->next = NULL; new_ptr->lock_type = lock_get_unlock_pkt->lock_type; new_ptr->source_win_handle = lock_get_unlock_pkt->source_win_handle; new_ptr->vc = vc; new_ptr->pt_single_op->type = MPIDI_RMA_GET; new_ptr->pt_single_op->addr = lock_get_unlock_pkt->addr; new_ptr->pt_single_op->count = lock_get_unlock_pkt->count; new_ptr->pt_single_op->datatype = lock_get_unlock_pkt->datatype; new_ptr->pt_single_op->data = NULL; new_ptr->pt_single_op->request_handle = lock_get_unlock_pkt->request_handle; new_ptr->pt_single_op->data_recd = 0; } *rreqp = NULL; break; } case MPIDI_CH3_PKT_CLOSE: { MPIDI_CH3_Pkt_close_t * close_pkt = &pkt->close; if (vc->state == MPIDI_VC_STATE_LOCAL_CLOSE) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_close_t * resp_pkt = &upkt.close; MPID_Request * resp_sreq; MPIDI_Pkt_init(resp_pkt, MPIDI_CH3_PKT_CLOSE); resp_pkt->ack = TRUE; MPIDI_DBG_PRINTF((30, FCNAME, "sending close(TRUE) to %d", vc->pg_rank)); mpi_errno = MPIDI_CH3_iStartMsg(vc, resp_pkt, sizeof(*resp_pkt), &resp_sreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|send_close_ack"); } if (resp_sreq != NULL) { MPID_Request_release(resp_sreq); } } if (close_pkt->ack == FALSE) { if (vc->state == MPIDI_VC_STATE_LOCAL_CLOSE) { MPIDI_DBG_PRINTF((30, FCNAME, "received close(FALSE) from %d, moving to CLOSE_ACKED.", vc->pg_rank)); MPIU_DBG_PrintVCState2(vc, MPIDI_VC_STATE_CLOSE_ACKED); MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CLOSE_ACKED"); vc->state = MPIDI_VC_STATE_CLOSE_ACKED; } else /* (vc->state == MPIDI_VC_STATE_ACTIVE) */ { MPIDI_DBG_PRINTF((30, FCNAME, "received close(FALSE) from %d, moving to REMOTE_CLOSE.", vc->pg_rank)); MPIU_DBG_PrintVCState2(vc, MPIDI_VC_STATE_REMOTE_CLOSE); MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_REMOTE_CLOSE"); vc->state = MPIDI_VC_STATE_REMOTE_CLOSE; } } else { MPIDI_DBG_PRINTF((30, FCNAME, "received close(TRUE) from %d, moving to CLOSE_ACKED.", vc->pg_rank)); MPIU_Assert (vc->state == MPIDI_VC_STATE_LOCAL_CLOSE || vc->state == MPIDI_VC_STATE_CLOSE_ACKED); MPIU_DBG_PrintVCState2(vc, MPIDI_VC_STATE_CLOSE_ACKED); MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CLOSE_ACKED"); vc->state = MPIDI_VC_STATE_CLOSE_ACKED; mpi_errno = MPIDI_CH3_Connection_terminate(vc); } *rreqp = NULL; break; } case MPIDI_CH3_PKT_FLOW_CNTL_UPDATE: { /* --BEGIN ERROR HANDLING-- */ MPIDI_DBG_PRINTF((30, FCNAME, "received flow control update pkt")); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**ch3|flowcntlpkt", 0); *rreqp = NULL; break; /* --END ERROR HANDLING-- */ } default: { /* --BEGIN ERROR HANDLING-- */ *rreqp = NULL; MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_INTERN, "**ch3|unknownpkt", "**ch3|unknownpkt %d", pkt->type); break; /* --END ERROR HANDLING-- */ } } fn_fail: MPIDI_DBG_PRINTF((10, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_HANDLE_ORDERED_RECV_PKT); return mpi_errno;}/* FIXME: What does this routine do *//* This function is used in conjunction with MPIDI_CH3_iStartRndvTransfer */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Post_data_receive#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Post_data_receive(int found, MPID_Request ** rreqp){ int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t userbuf_sz; MPID_Datatype * dt_ptr; MPIDI_msg_sz_t data_sz; MPID_Request * rreq = *rreqp; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE); MPIDI_DBG_PRINTF((30, FCNAME, "entering")); if (rreq->dev.recv_data_sz == 0) { MPIDI_DBG_PRINTF((30, FCNAME, "null message, %s, decrementing completion counter", (found ? "posted request found" : "unexpected request allocated"))); /* mark data transfer as complete and decrment CC */ MPIDI_CH3U_Request_complete(rreq); *rreqp = NULL; goto fn_exit; } if (found) { MPIDI_DBG_PRINTF((30, FCNAME, "posted request found")); MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, userbuf_sz, dt_ptr, dt_true_lb); if (rreq->dev.recv_data_sz <= userbuf_sz) { data_sz = rreq->dev.recv_data_sz; } else { MPIDI_DBG_PRINTF((35, FCNAME, "receive buffer too small; message truncated, msg_sz=" MPIDI_MSG_SZ_FMT ", userbuf_sz=" MPIDI_MSG_SZ_FMT, rreq->dev.recv_data_sz, userbuf_sz)); rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE, "**truncate", "**truncate %d %d %d %d", rreq->status.MPI_SOURCE, rreq->status.MPI_TAG, rreq->dev.recv_data_sz, userbuf_sz ); rreq->status.count = userbuf_sz; data_sz = userbuf_sz; } if (dt_contig && data_sz == rreq->dev.recv_data_sz) { /* user buffer is contiguous and large enough to store the entire message */ /* FIXME: So why don't we move it *now* ? */ MPIDI_DBG_PRINTF((35, FCNAME, "IOV loaded for contiguous read")); rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char*)(rreq->dev.user_buf) + dt_true_lb); rreq->dev.iov[0].MPID_IOV_LEN = data_sz; rreq->dev.iov_count = 1; rreq->dev.ca = MPIDI_CH3_CA_COMPLETE; } else { /* user buffer is not contiguous or is too small to hold the entire message */ int mpi_errno; MPIDI_DBG_PRINTF((35, FCNAME, "IOV loaded for non-contiguous read")); MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, &rreq->dev.segment, 0); rreq->dev.segment_first = 0; rreq->dev.segment_size = data_sz; mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|loadrecviov", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ } } else /* if (!found) */ { /* TODO: to improve performance, allocate temporary buffer from a specialized buffer pool. */ MPIDI_DBG_PRINTF((30, FCNAME, "unexpected request allocated")); rreq->dev.tmpbuf = MPIU_Malloc(rreq->dev.recv_data_sz); /* FIXME: No test for malloc failure ! */ rreq->dev.tmpbuf_sz = rreq->dev.recv_data_sz; rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rreq->dev.tmpbuf; rreq->dev.iov[0].MPID_IOV_LEN = rreq->dev.recv_data_sz; rreq->dev.iov_count = 1; rreq->dev.ca = MPIDI_CH3_CA_UNPACK_UEBUF_AND_COMPLETE; rreq->dev.recv_pending_count = 2; MPID_Request_initialized_set(rreq); }fn_exit: MPIDI_DBG_PRINTF((30, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_POST_DATA_RECEIVE); return mpi_errno;}/* Check if requested lock can be granted. If it can, set win_ptr->current_lock_type to the new lock type and return 1. Else return 0. FIXME: MT: This function must be atomic because two threads could be trying to do the same thing, e.g., the main thread in MPI_Win_lock(source=target) and another thread in the progress engine. */int MPIDI_CH3I_Try_acquire_win_lock(MPID_Win *win_ptr, int requested_lock){ int existing_lock; existing_lock = win_ptr->current_lock_type; /* Locking Rules: Requested Existing Action -------- -------- ------ Shared Exclusive Queue it Shared NoLock/Shared Grant it Exclusive NoLock Grant it Exclusive Exclusive/Shared Queue it */ if ( ( (requested_lock == MPI_LOCK_SHARED) && ((existing_lock == MPID_LOCK_NONE) || (existing_lock == MPI_LOCK_SHARED) ) ) || ( (requested_lock == MPI_LOCK_EXCLUSIVE) && (existing_lock == MPID_LOCK_NONE) ) ) { /* grant lock. set new lock type on window */ win_ptr->current_lock_type = requested_lock; /* if shared lock, incr. ref. count */ if (requested_lock == MPI_LOCK_SHARED) win_ptr->shared_lock_ref_cnt++; return 1; } else { /* do not grant lock */ return 0; }}int MPIDI_CH3I_Send_lock_granted_pkt(MPIDI_VC_t *vc, MPI_Win source_win_handle){ MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_lock_granted_t *lock_granted_pkt = &upkt.lock_granted; MPID_Request *req; int mpi_errno; /* send lock granted packet */ MPIDI_Pkt_init(lock_granted_pkt, MPIDI_CH3_PKT_LOCK_GRANTED); lock_granted_pkt->source_win_handle = source_win_handle; mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_granted_pkt, sizeof(*lock_granted_pkt), &req); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|rmamsg", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ if (req != NULL) { MPID_Request_release(req); } return mpi_errno;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -