📄 ch3u_handle_recv_pkt.c
字号:
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETRESP); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received get response pkt"); data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t); data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t); MPID_Request_get_ptr(get_resp_pkt->request_handle, req); MPID_Datatype_get_size_macro(req->dev.datatype, type_size); req->dev.recv_data_sz = type_size * req->dev.user_count; /* FIXME: It is likely that this cannot happen (never perform a get with a 0-sized item). In that case, change this to an MPIU_Assert (and do the same for accumulate and put) */ if (req->dev.recv_data_sz == 0) { MPIDI_CH3U_Request_complete( req ); *buflen = sizeof(MPIDI_CH3_Pkt_t); *rreqp = NULL; } else { *rreqp = req; mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete); MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_GET_RESP"); if (complete) { MPIDI_CH3U_Request_complete(req); *rreqp = NULL; } /* return the number of bytes processed in this function */ *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETRESP); return mpi_errno; fn_fail: goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_Accumulate#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_Accumulate( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_accum_t * accum_pkt = &pkt->accum; MPID_Request *req = NULL; MPI_Aint true_lb, true_extent, extent; void *tmp_buf = NULL; int predefined; int complete = 0; char *data_buf = NULL; MPIDI_msg_sz_t data_len; int mpi_errno = MPI_SUCCESS; int type_size; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received accumulate pkt"); data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t); data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t); req = MPID_Request_create(); MPIU_Object_set_ref(req, 1); *rreqp = req; req->dev.user_count = accum_pkt->count; req->dev.op = accum_pkt->op; req->dev.real_user_buf = accum_pkt->addr; req->dev.target_win_handle = accum_pkt->target_win_handle; req->dev.source_win_handle = accum_pkt->source_win_handle; MPIDI_CH3I_DATATYPE_IS_PREDEFINED(accum_pkt->datatype, predefined); if (predefined) { MPIU_THREADPRIV_DECL; MPIU_THREADPRIV_GET; MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP); req->dev.datatype = accum_pkt->datatype; MPIR_Nest_incr(); mpi_errno = NMPI_Type_get_true_extent(accum_pkt->datatype, &true_lb, &true_extent); MPIR_Nest_decr(); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent); tmp_buf = MPIU_Malloc(accum_pkt->count * (MPIR_MAX(extent,true_extent))); if (!tmp_buf) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem"); } /* adjust for potential negative lower bound in datatype */ tmp_buf = (void *)((char*)tmp_buf - true_lb); req->dev.user_buf = tmp_buf; MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size); req->dev.recv_data_sz = type_size * accum_pkt->count; if (req->dev.recv_data_sz == 0) { MPIDI_CH3U_Request_complete(req); *buflen = sizeof(MPIDI_CH3_Pkt_t); *rreqp = NULL; } else { mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete); MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE"); /* FIXME: Only change the handling of completion if post_data_receive reset the handler. There should be a cleaner way to do this */ if (!req->dev.OnDataAvail) { req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PutAccumRespComplete; } /* return the number of bytes processed in this function */ *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t); if (complete) { mpi_errno = MPIDI_CH3_ReqHandler_PutAccumRespComplete(vc, req, &complete); if (mpi_errno) MPIU_ERR_POP(mpi_errno); if (complete) { *rreqp = NULL; goto fn_exit; } } } } else { MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP_DERIVED_DT); req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumRespDerivedDTComplete; req->dev.datatype = MPI_DATATYPE_NULL; req->dev.dtype_info = (MPIDI_RMA_dtype_info *) MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info)); if (! req->dev.dtype_info) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } req->dev.dataloop = MPIU_Malloc(accum_pkt->dataloop_size); if (! req->dev.dataloop) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->dataloop_size) { /* copy all of dtype_info and dataloop */ memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info)); memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info), accum_pkt->dataloop_size); *buflen = sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) + accum_pkt->dataloop_size; /* All dtype data has been received, call req handler */ mpi_errno = MPIDI_CH3_ReqHandler_AccumRespDerivedDTComplete(vc, req, &complete); MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE"); if (complete) { *rreqp = NULL; goto fn_exit; } } else { req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)req->dev.dtype_info; req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info); req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)req->dev.dataloop; req->dev.iov[1].MPID_IOV_LEN = accum_pkt->dataloop_size; req->dev.iov_count = 2; *buflen = sizeof(MPIDI_CH3_Pkt_t); } } if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER,"**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE"); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE); return mpi_errno; fn_fail: goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_Lock#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_Lock( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_lock_t * lock_pkt = &pkt->lock; MPID_Win *win_ptr = NULL; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCK); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCK); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received lock pkt"); *buflen = sizeof(MPIDI_CH3_Pkt_t); MPID_Win_get_ptr(lock_pkt->target_win_handle, win_ptr); if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_pkt->lock_type) == 1) { /* send lock granted packet. */ mpi_errno = MPIDI_CH3I_Send_lock_granted_pkt(vc, lock_pkt->source_win_handle); } else { /* queue the lock information */ MPIDI_Win_lock_queue *curr_ptr, *prev_ptr, *new_ptr; /* FIXME: MT: This may need to be done atomically. */ curr_ptr = (MPIDI_Win_lock_queue *) win_ptr->lock_queue; prev_ptr = curr_ptr; while (curr_ptr != NULL) { prev_ptr = curr_ptr; curr_ptr = curr_ptr->next; } new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue)); if (!new_ptr) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } if (prev_ptr != NULL) prev_ptr->next = new_ptr; else win_ptr->lock_queue = new_ptr; new_ptr->next = NULL; new_ptr->lock_type = lock_pkt->lock_type; new_ptr->source_win_handle = lock_pkt->source_win_handle; new_ptr->vc = vc; new_ptr->pt_single_op = NULL; } *rreqp = NULL; fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCK); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_LockGranted#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_LockGranted( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_lock_granted_t * lock_granted_pkt = &pkt->lock_granted; MPID_Win *win_ptr = NULL; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKGRANTED); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKGRANTED); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received lock granted pkt"); *buflen = sizeof(MPIDI_CH3_Pkt_t); MPID_Win_get_ptr(lock_granted_pkt->source_win_handle, win_ptr); /* set the lock_granted flag in the window */ win_ptr->lock_granted = 1; *rreqp = NULL; MPIDI_CH3_Progress_signal_completion(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKGRANTED); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_PtRMADone#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_PtRMADone( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_pt_rma_done_t * pt_rma_done_pkt = &pkt->pt_rma_done; MPID_Win *win_ptr = NULL; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_PTRMADONE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_PTRMADONE); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received shared lock ops done pkt"); *buflen = sizeof(MPIDI_CH3_Pkt_t); MPID_Win_get_ptr(pt_rma_done_pkt->source_win_handle, win_ptr); /* reset the lock_granted flag in the window */ win_ptr->lock_granted = 0; *rreqp = NULL; MPIDI_CH3_Progress_signal_completion(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_PTRMADONE); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_LockPutUnlock#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_LockPutUnlock( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_lock_put_unlock_t * lock_put_unlock_pkt = &pkt->lock_put_unlock; MPID_Win *win_ptr = NULL; MPID_Request *req = NULL; int type_size; int complete; char *data_buf = NULL; MPIDI_msg_sz_t data_len; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKPUTUNLOCK); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKPUTUNLOCK); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received lock_put_unlock pkt"); data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t); data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t); req = MPID_Request_create(); MPIU_Object_set_ref(req, 1); req->dev.datatype = lock_put_unlock_pkt->datatype; MPID_Datatype_get_size_macro(lock_put_unlock_pkt->datatype, type_size); req->dev.recv_data_sz = type_size * lock_put_unlock_pkt->count; req->dev.user_count = lock_put_unlock_pkt->count; req->dev.target_win_handle = lock_put_unlock_pkt->target_win_handle; MPID_Win_get_ptr(lock_put_unlock_pkt->target_win_handle, win_ptr); if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_put_unlock_pkt->lock_type) == 1) { /* do the put. for this optimization, only basic datatypes supported. */ MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RESP); req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PutAccumRespComplete; req->dev.user_buf = lock_put_unlock_pkt->addr; req->dev.source_win_handle = lock_put_unlock_pkt->source_win_handle; req->dev.single_op_opt = 1; } else { /* queue the information */ MPIDI_Win_lock_queue *curr_ptr, *prev_ptr, *new_ptr; new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue)); if (!new_ptr) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } new_ptr->pt_single_op = (MPIDI_PT_single_op *) MPIU_Malloc(sizeof(MPIDI_PT_single_op)); if (new_ptr->pt_single_op == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } /* FIXME: MT: The queuing may need to be done atomically. */ curr_ptr = (MPIDI_Win_lock_queue *) win_ptr->lock_queue; prev_ptr = curr_ptr; while (curr_ptr != NULL) { prev_ptr = curr_ptr; curr_ptr = curr_ptr->next; } if (prev_ptr != NULL) prev_ptr->next = new_ptr; else win_ptr->lock_queue = new_ptr; new_ptr->next = NULL; new_ptr->lock_type = lock_put_unlock_pkt->lock_type; new_ptr->source_win_handle = lock_put_unlock_pkt->source_win_handle; new_ptr->vc = vc; new_ptr->pt_single_op->type = MPIDI_RMA_PUT; new_ptr->pt_single_op->addr = lock_put_unlock_pkt->addr;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -