📄 ch3u_handle_recv_req.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidimpl.h"static int create_derived_datatype(MPID_Request * rreq, MPID_Datatype ** dtp);static int do_accumulate_op(MPID_Request * rreq);static int do_simple_accumulate(MPIDI_PT_single_op *single_op);static int do_simple_get(MPID_Win *win_ptr, MPIDI_Win_lock_queue *lock_queue);#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Handle_recv_req#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3U_Handle_recv_req(MPIDI_VC_t * vc, MPID_Request * rreq, int * complete){ static int in_routine = FALSE; MPID_Win *win_ptr; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_RECV_REQ); MPIDI_STATE_DECL(MPID_STATE_CH3_CA_COMPLETE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_RECV_REQ); MPIU_Assert(in_routine == FALSE); in_routine = TRUE; switch(rreq->dev.ca) { case MPIDI_CH3_CA_COMPLETE: { MPIDI_FUNC_ENTER(MPID_STATE_CH3_CA_COMPLETE) /* FIXME: put ONC operations into their own completion action */ if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_RECV) { /* mark data transfer as complete and decrement CC */ MPIDI_CH3U_Request_complete(rreq); *complete = TRUE; } else if ((MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_PUT_RESP) || (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RESP)) { if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RESP) { /* accumulate data from tmp_buf into user_buf */ mpi_errno = do_accumulate_op(rreq); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr); /* if passive target RMA, increment counter */ if (win_ptr->current_lock_type != MPID_LOCK_NONE) win_ptr->my_pt_rma_puts_accs++; if (rreq->dev.source_win_handle != MPI_WIN_NULL) { /* Last RMA operation from source. If active target RMA, decrement window counter. If passive target RMA, release lock on window and grant next lock in the lock queue if there is any. If it's a shared lock or a lock-put-unlock type of optimization, we also need to send an ack to the source. */ if (win_ptr->current_lock_type == MPID_LOCK_NONE) { /* FIXME: MT: this has to be done atomically */ win_ptr->my_counter -= 1; } else { if ((win_ptr->current_lock_type == MPI_LOCK_SHARED) || (rreq->dev.single_op_opt == 1)) { mpi_errno = MPIDI_CH3I_Send_pt_rma_done_pkt(vc, rreq->dev.source_win_handle); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } mpi_errno = MPIDI_CH3I_Release_lock(win_ptr); } } /* mark data transfer as complete and decrement CC */ MPIDI_CH3U_Request_complete(rreq); *complete = TRUE; } else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_PUT_RESP_DERIVED_DT) { MPID_Datatype *new_dtp; /* create derived datatype */ create_derived_datatype(rreq, &new_dtp); /* update request to get the data */ MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_PUT_RESP); rreq->dev.datatype = new_dtp->handle; rreq->dev.recv_data_sz = new_dtp->size * rreq->dev.user_count; rreq->dev.datatype_ptr = new_dtp; /* this will cause the datatype to be freed when the request is freed. free dtype_info here. */ MPIU_Free(rreq->dev.dtype_info); MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, &rreq->dev.segment, 0); rreq->dev.segment_first = 0; rreq->dev.segment_size = rreq->dev.recv_data_sz; mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|loadrecviov"); } *complete = FALSE; } else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RESP_DERIVED_DT) { MPID_Datatype *new_dtp; MPI_Aint true_lb, true_extent, extent; void *tmp_buf; /* create derived datatype */ create_derived_datatype(rreq, &new_dtp); /* update new request to get the data */ MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_ACCUM_RESP); /* first need to allocate tmp_buf to recv the data into */ MPIR_Nest_incr(); mpi_errno = NMPI_Type_get_true_extent(new_dtp->handle, &true_lb, &true_extent); MPIR_Nest_decr(); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } MPID_Datatype_get_extent_macro(new_dtp->handle, extent); tmp_buf = MPIU_Malloc(rreq->dev.user_count * (MPIR_MAX(extent,true_extent))); /* --BEGIN ERROR HANDLING-- */ if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); goto fn_fail; } /* --END ERROR HANDLING-- */ /* adjust for potential negative lower bound in datatype */ tmp_buf = (void *)((char*)tmp_buf - true_lb); rreq->dev.user_buf = tmp_buf; rreq->dev.datatype = new_dtp->handle; rreq->dev.recv_data_sz = new_dtp->size * rreq->dev.user_count; rreq->dev.datatype_ptr = new_dtp; /* this will cause the datatype to be freed when the request is freed. free dtype_info here. */ MPIU_Free(rreq->dev.dtype_info); MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, &rreq->dev.segment, 0); rreq->dev.segment_first = 0; rreq->dev.segment_size = rreq->dev.recv_data_sz; mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|loadrecviov"); } *complete = FALSE; } else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_RESP_DERIVED_DT) { MPID_Datatype *new_dtp; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_get_resp_t * get_resp_pkt = &upkt.get_resp; MPID_IOV iov[MPID_IOV_LIMIT]; MPID_Request * sreq; int iov_n; /* create derived datatype */ create_derived_datatype(rreq, &new_dtp); MPIU_Free(rreq->dev.dtype_info); /* create request for sending data */ sreq = MPID_Request_create(); if (sreq == NULL) { /* --BEGIN ERROR HANDLING-- */ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); goto fn_exit; /* --END ERROR HANDLING-- */ } sreq->kind = MPID_REQUEST_SEND; MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_GET_RESP); sreq->dev.user_buf = rreq->dev.user_buf; sreq->dev.user_count = rreq->dev.user_count; sreq->dev.datatype = new_dtp->handle; sreq->dev.datatype_ptr = new_dtp; sreq->dev.target_win_handle = rreq->dev.target_win_handle; sreq->dev.source_win_handle = rreq->dev.source_win_handle; MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP); get_resp_pkt->request_handle = rreq->dev.request_handle; iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt; iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt); MPID_Segment_init(sreq->dev.user_buf, sreq->dev.user_count, sreq->dev.datatype, &sreq->dev.segment, 0); sreq->dev.segment_first = 0; sreq->dev.segment_size = new_dtp->size * sreq->dev.user_count; iov_n = MPID_IOV_LIMIT - 1; mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[1], &iov_n); if (mpi_errno == MPI_SUCCESS) { iov_n += 1; mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iov_n); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPIU_Object_set_ref(sreq, 0); MPIDI_CH3_Request_destroy(sreq); sreq = NULL; mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|rmamsg", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ } /* mark receive data transfer as complete and decrement CC in receive request */ MPIDI_CH3U_Request_complete(rreq); *complete = TRUE; } else if ((MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_PT_SINGLE_PUT) || (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_PT_SINGLE_ACCUM)) { /* received all the data for single lock-put(accum)-unlock optimization where the lock was not acquired in ch3u_handle_recv_pkt. Try to acquire the lock and do the operation. */ MPID_Win *win_ptr; MPIDI_Win_lock_queue *lock_queue_entry, *curr_ptr, **curr_ptr_ptr; MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr); lock_queue_entry = rreq->dev.lock_queue_entry; if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_queue_entry->lock_type) == 1) { if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_PT_SINGLE_PUT) { /* copy the data over */ mpi_errno = MPIR_Localcopy(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, lock_queue_entry->pt_single_op->addr, lock_queue_entry->pt_single_op->count, lock_queue_entry->pt_single_op->datatype); } else { mpi_errno = do_simple_accumulate(lock_queue_entry->pt_single_op); } if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } /* increment counter */ win_ptr->my_pt_rma_puts_accs++; /* send done packet */ mpi_errno = MPIDI_CH3I_Send_pt_rma_done_pkt(vc, lock_queue_entry->source_win_handle); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } /* free lock_queue_entry including data buffer and remove it from the queue. */ curr_ptr = (MPIDI_Win_lock_queue *) win_ptr->lock_queue; curr_ptr_ptr = (MPIDI_Win_lock_queue **) &(win_ptr->lock_queue); while (curr_ptr != lock_queue_entry) { curr_ptr_ptr = &(curr_ptr->next); curr_ptr = curr_ptr->next; } *curr_ptr_ptr = curr_ptr->next; MPIU_Free(lock_queue_entry->pt_single_op->data); MPIU_Free(lock_queue_entry->pt_single_op);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -