📄 ch3u_rma_sync.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidimpl.h"#include "mpidrma.h"static int MPIDI_CH3I_Send_rma_msg(MPIDI_RMA_ops * rma_op, MPID_Win * win_ptr, MPI_Win source_win_handle, MPI_Win target_win_handle, MPIDI_RMA_dtype_info * dtype_info, void ** dataloop, MPID_Request ** request);static int MPIDI_CH3I_Recv_rma_msg(MPIDI_RMA_ops * rma_op, MPID_Win * win_ptr, MPI_Win source_win_handle, MPI_Win target_win_handle, MPIDI_RMA_dtype_info * dtype_info, void ** dataloop, MPID_Request ** request); static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr, int *wait_for_rma_done_pkt);static int MPIDI_CH3I_Send_lock_put_or_acc(MPID_Win *win_ptr);static int MPIDI_CH3I_Send_lock_get(MPID_Win *win_ptr);#undef FUNCNAME#define FUNCNAME MPIDI_Win_fence#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_Win_fence(int assert, MPID_Win *win_ptr){ int mpi_errno = MPI_SUCCESS; int comm_size, done, *recvcnts; int *rma_target_proc, *nops_to_proc, i, total_op_count, *curr_ops_cnt; MPIDI_RMA_ops *curr_ptr, *next_ptr; MPID_Comm *comm_ptr; MPID_Request **requests=NULL; /* array of requests */ MPI_Win source_win_handle, target_win_handle; MPIDI_RMA_dtype_info *dtype_infos=NULL; void **dataloops=NULL; /* to store dataloops for each datatype */ MPID_Progress_state progress_state; MPIU_CHKLMEM_DECL(7); MPIU_THREADPRIV_DECL; MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_FENCE); MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_WIN_FENCE); MPIU_THREADPRIV_GET; /* In case this process was previously the target of passive target rma * operations, we need to take care of the following... * Since we allow MPI_Win_unlock to return without a done ack from * the target in the case of multiple rma ops and exclusive lock, * we need to check whether there is a lock on the window, and if * there is a lock, poke the progress engine until the operartions * have completed and the lock is released. */ if (win_ptr->current_lock_type != MPID_LOCK_NONE) { MPID_Progress_start(&progress_state); while (win_ptr->current_lock_type != MPID_LOCK_NONE) { /* poke the progress engine */ mpi_errno = MPID_Progress_wait(&progress_state); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "making progress on the rma messages failed"); goto fn_exit; } /* --END ERROR HANDLING-- */ } MPID_Progress_end(&progress_state); } if (assert & MPI_MODE_NOPRECEDE) { win_ptr->fence_cnt = (assert & MPI_MODE_NOSUCCEED) ? 0 : 1; goto fn_exit; } if ((win_ptr->fence_cnt == 0) && ((assert & MPI_MODE_NOSUCCEED) != 1)) { /* win_ptr->fence_cnt == 0 means either this is the very first call to fence or the preceding fence had the MPI_MODE_NOSUCCEED assert. Do nothing except increment the count. */ win_ptr->fence_cnt = 1; } else { /* This is the second or later fence. Do all the preceding RMA ops. */ MPID_Comm_get_ptr( win_ptr->comm, comm_ptr ); /* First inform every process whether it is a target of RMA ops from this process */ comm_size = comm_ptr->local_size; MPIU_CHKLMEM_MALLOC(rma_target_proc, int *, comm_size*sizeof(int), mpi_errno, "rma_target_proc"); for (i=0; i<comm_size; i++) rma_target_proc[i] = 0; /* keep track of no. of ops to each proc. Needed for knowing whether or not to decrement the completion counter. The completion counter is decremented only on the last operation. */ MPIU_CHKLMEM_MALLOC(nops_to_proc, int *, comm_size*sizeof(int), mpi_errno, "nops_to_proc"); for (i=0; i<comm_size; i++) nops_to_proc[i] = 0; /* set rma_target_proc[i] to 1 if rank i is a target of RMA ops from this process */ total_op_count = 0; curr_ptr = win_ptr->rma_ops_list; while (curr_ptr != NULL) { total_op_count++; rma_target_proc[curr_ptr->target_rank] = 1; nops_to_proc[curr_ptr->target_rank]++; curr_ptr = curr_ptr->next; } MPIU_CHKLMEM_MALLOC(curr_ops_cnt, int *, comm_size*sizeof(int), mpi_errno, "curr_ops_cnt"); for (i=0; i<comm_size; i++) curr_ops_cnt[i] = 0; if (total_op_count != 0) { MPIU_CHKLMEM_MALLOC(requests, MPID_Request **, total_op_count*sizeof(MPID_Request*), mpi_errno, "requests"); MPIU_CHKLMEM_MALLOC(dtype_infos, MPIDI_RMA_dtype_info *, total_op_count*sizeof(MPIDI_RMA_dtype_info), mpi_errno, "dtype_infos"); MPIU_CHKLMEM_MALLOC(dataloops, void **, total_op_count*sizeof(void*), mpi_errno, "dataloops"); for (i=0; i<total_op_count; i++) dataloops[i] = NULL; } /* do a reduce_scatter (with MPI_SUM) on rma_target_proc. As a result, each process knows how many other processes will be doing RMA ops on its window */ /* first initialize the completion counter. */ win_ptr->my_counter = comm_size; /* set up the recvcnts array for reduce scatter */ MPIU_CHKLMEM_MALLOC(recvcnts, int *, comm_size*sizeof(int), mpi_errno, "recvcnts"); for (i=0; i<comm_size; i++) recvcnts[i] = 1; MPIR_Nest_incr(); mpi_errno = NMPI_Reduce_scatter(MPI_IN_PLACE, rma_target_proc, recvcnts, MPI_INT, MPI_SUM, win_ptr->comm); /* result is stored in rma_target_proc[0] */ MPIR_Nest_decr(); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } /* Set the completion counter */ /* FIXME: MT: this needs to be done atomically because other procs have the address and could decrement it. */ win_ptr->my_counter = win_ptr->my_counter - comm_size + rma_target_proc[0]; i = 0; curr_ptr = win_ptr->rma_ops_list; while (curr_ptr != NULL) { /* The completion counter at the target is decremented only on the last RMA operation. We indicate the last operation by passing the source_win_handle only on the last operation. Otherwise, we pass NULL */ if (curr_ops_cnt[curr_ptr->target_rank] == nops_to_proc[curr_ptr->target_rank] - 1) source_win_handle = win_ptr->handle; else source_win_handle = MPI_WIN_NULL; target_win_handle = win_ptr->all_win_handles[curr_ptr->target_rank]; switch (curr_ptr->type) { case (MPIDI_RMA_PUT): case (MPIDI_RMA_ACCUMULATE): mpi_errno = MPIDI_CH3I_Send_rma_msg(curr_ptr, win_ptr, source_win_handle, target_win_handle, &dtype_infos[i], &dataloops[i], &requests[i]); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } break; case (MPIDI_RMA_GET): mpi_errno = MPIDI_CH3I_Recv_rma_msg(curr_ptr, win_ptr, source_win_handle, target_win_handle, &dtype_infos[i], &dataloops[i], &requests[i]); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } break; default: /* --BEGIN ERROR HANDLING-- */ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "invalid RMA operation"); goto fn_exit; /* --END ERROR HANDLING-- */ } i++; curr_ops_cnt[curr_ptr->target_rank]++; curr_ptr = curr_ptr->next; } if (total_op_count) { done = 1; MPID_Progress_start(&progress_state); while (total_op_count) { for (i=0; i<total_op_count; i++) { if (requests[i] != NULL) { if (*(requests[i]->cc_ptr) != 0) { done = 0; break; } else { mpi_errno = requests[i]->status.MPI_ERROR; /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "rma message operation failed"); goto fn_exit; } /* --END ERROR HANDLING-- */ /* if origin datatype was a derived datatype, it will get freed when the request gets freed. */ MPID_Request_release(requests[i]); requests[i] = NULL; } } } if (done) { break; } mpi_errno = MPID_Progress_wait(&progress_state); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "making progress on the rma messages failed"); goto fn_exit; } /* --END ERROR HANDLING-- */ done = 1; } MPID_Progress_end(&progress_state); } if (total_op_count != 0) { for (i=0; i<total_op_count; i++) { if (dataloops[i] != NULL) { MPIU_Free(dataloops[i]); /* allocated in send_rma_msg or recv_rma_msg */ } } } /* free MPIDI_RMA_ops_list */ curr_ptr = win_ptr->rma_ops_list; while (curr_ptr != NULL) { next_ptr = curr_ptr->next; MPIU_Free(curr_ptr); curr_ptr = next_ptr; } win_ptr->rma_ops_list = NULL; /* wait for all operations from other processes to finish */ if (win_ptr->my_counter) { MPID_Progress_start(&progress_state); while (win_ptr->my_counter) { mpi_errno = MPID_Progress_wait(&progress_state); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "making progress on the rma messages failed"); goto fn_exit; } /* --END ERROR HANDLING-- */ } MPID_Progress_end(&progress_state); } if (assert & MPI_MODE_NOSUCCEED) { win_ptr->fence_cnt = 0; } } fn_exit: MPIU_CHKLMEM_FREEALL(); MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_FENCE); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Send_rma_msg#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Send_rma_msg(MPIDI_RMA_ops *rma_op, MPID_Win *win_ptr, MPI_Win source_win_handle, MPI_Win target_win_handle, MPIDI_RMA_dtype_info *dtype_info, void **dataloop, MPID_Request **request) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_put_t *put_pkt = &upkt.put; MPIDI_CH3_Pkt_accum_t *accum_pkt = &upkt.accum; MPID_IOV iov[MPID_IOV_LIMIT]; int mpi_errno=MPI_SUCCESS, predefined; int origin_dt_derived, target_dt_derived, origin_type_size, iovcnt, iov_n; MPIDI_VC_t * vc; MPID_Comm *comm_ptr; MPID_Datatype *target_dtp=NULL, *origin_dtp=NULL; MPIU_CHKPMEM_DECL(1); MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_RMA_MSG); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_RMA_MSG); if (rma_op->type == MPIDI_RMA_PUT) { MPIDI_Pkt_init(put_pkt, MPIDI_CH3_PKT_PUT); put_pkt->addr = (char *) win_ptr->base_addrs[rma_op->target_rank] + win_ptr->disp_units[rma_op->target_rank] * rma_op->target_disp; put_pkt->count = rma_op->target_count; put_pkt->datatype = rma_op->target_datatype; put_pkt->dataloop_size = 0; put_pkt->target_win_handle = target_win_handle; put_pkt->source_win_handle = source_win_handle; iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) put_pkt; iov[0].MPID_IOV_LEN = sizeof(*put_pkt); } else { MPIDI_Pkt_init(accum_pkt, MPIDI_CH3_PKT_ACCUMULATE); accum_pkt->addr = (char *) win_ptr->base_addrs[rma_op->target_rank] + win_ptr->disp_units[rma_op->target_rank] * rma_op->target_disp; accum_pkt->count = rma_op->target_count; accum_pkt->datatype = rma_op->target_datatype; accum_pkt->dataloop_size = 0; accum_pkt->op = rma_op->op; accum_pkt->target_win_handle = target_win_handle; accum_pkt->source_win_handle = source_win_handle; iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) accum_pkt; iov[0].MPID_IOV_LEN = sizeof(*accum_pkt); }/* printf("send pkt: type %d, addr %d, count %d, base %d\n", rma_pkt->type, rma_pkt->addr, rma_pkt->count, win_ptr->base_addrs[rma_op->target_rank]); fflush(stdout);*/ MPID_Comm_get_ptr(win_ptr->comm, comm_ptr); MPIDI_Comm_get_vc(comm_ptr, rma_op->target_rank, &vc); MPIDI_CH3I_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype, predefined); if (!predefined) { origin_dt_derived = 1; MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp); } else { origin_dt_derived = 0; } MPIDI_CH3I_DATATYPE_IS_PREDEFINED(rma_op->target_datatype, predefined); if (!predefined)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -