📄 ch3u_handle_recv_pkt.c
字号:
new_ptr->pt_single_op->count = lock_put_unlock_pkt->count; new_ptr->pt_single_op->datatype = lock_put_unlock_pkt->datatype; /* allocate memory to receive the data */ new_ptr->pt_single_op->data = MPIU_Malloc(req->dev.recv_data_sz); if (new_ptr->pt_single_op->data == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } new_ptr->pt_single_op->data_recd = 0; MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PT_SINGLE_PUT); req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_SinglePutAccumComplete; req->dev.user_buf = new_ptr->pt_single_op->data; req->dev.lock_queue_entry = new_ptr; } if (req->dev.recv_data_sz == 0) { *buflen = sizeof(MPIDI_CH3_Pkt_t); MPIDI_CH3U_Request_complete(req); *rreqp = NULL; } else { int (*fcn)( MPIDI_VC_t *, struct MPID_Request *, int * ); fcn = req->dev.OnDataAvail; mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETFATALANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_LOCK_PUT_UNLOCK"); } req->dev.OnDataAvail = fcn; *rreqp = req; if (complete) { mpi_errno = fcn(vc, req, &complete); if (complete) { *rreqp = NULL; } } /* return the number of bytes processed in this function */ *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t); } if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETFATALANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_LOCK_PUT_UNLOCK"); } fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKPUTUNLOCK); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_LockGetUnlock#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_LockGetUnlock( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_lock_get_unlock_t * lock_get_unlock_pkt = &pkt->lock_get_unlock; MPID_Win *win_ptr = NULL; int type_size; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKGETUNLOCK); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKGETUNLOCK); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received lock_get_unlock pkt"); *buflen = sizeof(MPIDI_CH3_Pkt_t); MPID_Win_get_ptr(lock_get_unlock_pkt->target_win_handle, win_ptr); if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_get_unlock_pkt->lock_type) == 1) { /* do the get. for this optimization, only basic datatypes supported. */ MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_get_resp_t * get_resp_pkt = &upkt.get_resp; MPID_Request *req; MPID_IOV iov[MPID_IOV_LIMIT]; req = MPID_Request_create(); req->dev.target_win_handle = lock_get_unlock_pkt->target_win_handle; req->dev.source_win_handle = lock_get_unlock_pkt->source_win_handle; req->dev.single_op_opt = 1; MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_RESP); req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetSendRespComplete; req->dev.OnFinal = MPIDI_CH3_ReqHandler_GetSendRespComplete; req->kind = MPID_REQUEST_SEND; MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP); get_resp_pkt->request_handle = lock_get_unlock_pkt->request_handle; iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt; iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt); iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)lock_get_unlock_pkt->addr; MPID_Datatype_get_size_macro(lock_get_unlock_pkt->datatype, type_size); iov[1].MPID_IOV_LEN = lock_get_unlock_pkt->count * type_size; mpi_errno = MPIU_CALL(MPIDI_CH3,iSendv(vc, req, iov, 2)); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPIU_Object_set_ref(req, 0); MPIDI_CH3_Request_destroy(req); MPIU_ERR_SETFATALANDJUMP(mpi_errno,MPI_ERR_OTHER,"**ch3|rmamsg"); } /* --END ERROR HANDLING-- */ } else { /* queue the information */ MPIDI_Win_lock_queue *curr_ptr, *prev_ptr, *new_ptr; /* FIXME: MT: This may need to be done atomically. */ curr_ptr = (MPIDI_Win_lock_queue *) win_ptr->lock_queue; prev_ptr = curr_ptr; while (curr_ptr != NULL) { prev_ptr = curr_ptr; curr_ptr = curr_ptr->next; } new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue)); if (!new_ptr) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } new_ptr->pt_single_op = (MPIDI_PT_single_op *) MPIU_Malloc(sizeof(MPIDI_PT_single_op)); if (new_ptr->pt_single_op == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } if (prev_ptr != NULL) prev_ptr->next = new_ptr; else win_ptr->lock_queue = new_ptr; new_ptr->next = NULL; new_ptr->lock_type = lock_get_unlock_pkt->lock_type; new_ptr->source_win_handle = lock_get_unlock_pkt->source_win_handle; new_ptr->vc = vc; new_ptr->pt_single_op->type = MPIDI_RMA_GET; new_ptr->pt_single_op->addr = lock_get_unlock_pkt->addr; new_ptr->pt_single_op->count = lock_get_unlock_pkt->count; new_ptr->pt_single_op->datatype = lock_get_unlock_pkt->datatype; new_ptr->pt_single_op->data = NULL; new_ptr->pt_single_op->request_handle = lock_get_unlock_pkt->request_handle; new_ptr->pt_single_op->data_recd = 1; } *rreqp = NULL; fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKGETUNLOCK); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_LockAccumUnlock#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_LockAccumUnlock( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp ){ MPIDI_CH3_Pkt_lock_accum_unlock_t * lock_accum_unlock_pkt = &pkt->lock_accum_unlock; MPID_Request *req = NULL; MPID_Win *win_ptr = NULL; MPIDI_Win_lock_queue *curr_ptr = NULL, *prev_ptr = NULL, *new_ptr = NULL; int type_size; int complete; char *data_buf = NULL; MPIDI_msg_sz_t data_len; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKACCUMUNLOCK); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKACCUMUNLOCK); MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received lock_accum_unlock pkt"); /* no need to acquire the lock here because we need to receive the data into a temporary buffer first */ data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t); data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t); req = MPID_Request_create(); MPIU_Object_set_ref(req, 1); req->dev.datatype = lock_accum_unlock_pkt->datatype; MPID_Datatype_get_size_macro(lock_accum_unlock_pkt->datatype, type_size); req->dev.recv_data_sz = type_size * lock_accum_unlock_pkt->count; req->dev.user_count = lock_accum_unlock_pkt->count; req->dev.target_win_handle = lock_accum_unlock_pkt->target_win_handle; /* queue the information */ new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue)); if (!new_ptr) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } new_ptr->pt_single_op = (MPIDI_PT_single_op *) MPIU_Malloc(sizeof(MPIDI_PT_single_op)); if (new_ptr->pt_single_op == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } MPID_Win_get_ptr(lock_accum_unlock_pkt->target_win_handle, win_ptr); /* FIXME: MT: The queuing may need to be done atomically. */ curr_ptr = (MPIDI_Win_lock_queue *) win_ptr->lock_queue; prev_ptr = curr_ptr; while (curr_ptr != NULL) { prev_ptr = curr_ptr; curr_ptr = curr_ptr->next; } if (prev_ptr != NULL) prev_ptr->next = new_ptr; else win_ptr->lock_queue = new_ptr; new_ptr->next = NULL; new_ptr->lock_type = lock_accum_unlock_pkt->lock_type; new_ptr->source_win_handle = lock_accum_unlock_pkt->source_win_handle; new_ptr->vc = vc; new_ptr->pt_single_op->type = MPIDI_RMA_ACCUMULATE; new_ptr->pt_single_op->addr = lock_accum_unlock_pkt->addr; new_ptr->pt_single_op->count = lock_accum_unlock_pkt->count; new_ptr->pt_single_op->datatype = lock_accum_unlock_pkt->datatype; new_ptr->pt_single_op->op = lock_accum_unlock_pkt->op; /* allocate memory to receive the data */ new_ptr->pt_single_op->data = MPIU_Malloc(req->dev.recv_data_sz); if (new_ptr->pt_single_op->data == NULL) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem" ); } new_ptr->pt_single_op->data_recd = 0; MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PT_SINGLE_ACCUM); req->dev.user_buf = new_ptr->pt_single_op->data; req->dev.lock_queue_entry = new_ptr; *rreqp = req; if (req->dev.recv_data_sz == 0) { *buflen = sizeof(MPIDI_CH3_Pkt_t); MPIDI_CH3U_Request_complete(req); *rreqp = NULL; } else { mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete); /* FIXME: Only change the handling of completion if post_data_receive reset the handler. There should be a cleaner way to do this */ if (!req->dev.OnDataAvail) { req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_SinglePutAccumComplete; } if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SET1(mpi_errno,MPI_ERR_OTHER,"**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_LOCK_ACCUM_UNLOCK"); } /* return the number of bytes processed in this function */ *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t); if (complete) { mpi_errno = MPIDI_CH3_ReqHandler_SinglePutAccumComplete(vc, req, &complete); if (complete) { *rreqp = NULL; } } } fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_LOCKACCUMUNLOCK); return mpi_errno;}/* FIXME: we still need to implement flow control */int MPIDI_CH3_PktHandler_FlowCntlUpdate( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp){ *buflen = sizeof(MPIDI_CH3_Pkt_t); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_EndCH3#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_EndCH3( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp){ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ENDCH3); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ENDCH3); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_ENDCH3); return MPI_SUCCESS;}/* ------------------------------------------------------------------------- *//* This routine may be called within a channel to initialize an array of packet handler functions, indexed by packet type. This function initializes an array so that the array may be private to the file that contains the progress function, if this is appropriate (this allows the compiler to reduce the cost in accessing the elements of the array in some cases).*/#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PktHandler_Init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[], int arraySize ){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_INIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_INIT); /* Check that the array is large enough */ if (arraySize < MPIDI_CH3_PKT_END_CH3) { MPIU_ERR_SETFATALANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|pktarraytoosmall"); } pktArray[MPIDI_CH3_PKT_EAGER_SEND] = MPIDI_CH3_PktHandler_EagerSend;#ifdef USE_EAGER_SHORT pktArray[MPIDI_CH3_PKT_EAGERSHORT_SEND] = MPIDI_CH3_PktHandler_EagerShortSend;#endif pktArray[MPIDI_CH3_PKT_READY_SEND] = MPIDI_CH3_PktHandler_ReadySend; pktArray[MPIDI_CH3_PKT_EAGER_SYNC_SEND] = MPIDI_CH3_PktHandler_EagerSyncSend; pktArray[MPIDI_CH3_PKT_EAGER_SYNC_ACK] = MPIDI_CH3_PktHandler_EagerSyncAck; pktArray[MPIDI_CH3_PKT_RNDV_REQ_TO_SEND] = MPIDI_CH3_PktHandler_RndvReqToSend; pktArray[MPIDI_CH3_PKT_RNDV_CLR_TO_SEND] = MPIDI_CH3_PktHandler_RndvClrToSend; pktArray[MPIDI_CH3_PKT_RNDV_SEND] = MPIDI_CH3_PktHandler_RndvSend; pktArray[MPIDI_CH3_PKT_CANCEL_SEND_REQ] = MPIDI_CH3_PktHandler_CancelSendReq; pktArray[MPIDI_CH3_PKT_CANCEL_SEND_RESP] = MPIDI_CH3_PktHandler_CancelSendResp; pktArray[MPIDI_CH3_PKT_PUT] = MPIDI_CH3_PktHandler_Put; pktArray[MPIDI_CH3_PKT_ACCUMULATE] = MPIDI_CH3_PktHandler_Accumulate; pktArray[MPIDI_CH3_PKT_GET] = MPIDI_CH3_PktHandler_Get; pktArray[MPIDI_CH3_PKT_GET_RESP] = MPIDI_CH3_PktHandler_GetResp; pktArray[MPIDI_CH3_PKT_LOCK] = MPIDI_CH3_PktHandler_Lock; pktArray[MPIDI_CH3_PKT_LOCK_GRANTED] = MPIDI_CH3_PktHandler_LockGranted; pktArray[MPIDI_CH3_PKT_PT_RMA_DONE] = MPIDI_CH3_PktHandler_PtRMADone; pktArray[MPIDI_CH3_PKT_LOCK_PUT_UNLOCK] = MPIDI_CH3_PktHandler_LockPutUnlock; pktArray[MPIDI_CH3_PKT_LOCK_ACCUM_UNLOCK] = MPIDI_CH3_PktHandler_LockAccumUnlock; pktArray[MPIDI_CH3_PKT_LOCK_GET_UNLOCK] = MPIDI_CH3_PktHandler_LockGetUnlock; pktArray[MPIDI_CH3_PKT_CLOSE] = MPIDI_CH3_PktHandler_Close; pktArray[MPIDI_CH3_PKT_FLOW_CNTL_UPDATE] = 0; fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_INIT); return mpi_errno;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -