📄 ch3u_rma_sync.c
字号:
nest_level_inc = TRUE; MPIR_Nest_incr(); mpi_errno = NMPI_Comm_group(win_ptr->comm, &win_grp); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } post_grp = group_ptr->handle; mpi_errno = NMPI_Group_translate_ranks(post_grp, post_grp_size, ranks_in_post_grp, win_grp, ranks_in_win_grp); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } NMPI_Comm_rank(win_ptr->comm, &rank); /* Send a 0-byte message to the source processes */ for (i=0; i<post_grp_size; i++) { dst = ranks_in_win_grp[i]; if (dst != rank) { mpi_errno = NMPI_Send(&i, 0, MPI_INT, dst, 100, win_ptr->comm); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } } mpi_errno = NMPI_Group_free(&win_grp); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } fn_exit: MPIU_CHKLMEM_FREEALL(); if (nest_level_inc) { MPIR_Nest_decr(); } MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_POST); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */}#undef FUNCNAME#define FUNCNAME MPIDI_Win_start#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_Win_start(MPID_Group *group_ptr, int assert, MPID_Win *win_ptr){ int mpi_errno=MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_START); MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_WIN_START); /* Reset the fence counter so that in case the user has switched from fence to start-complete synchronization, he cannot use the previous fence to mark the beginning of a fence epoch. */ win_ptr->fence_cnt = 0; /* In case this process was previously the target of passive target rma * operations, we need to take care of the following... * Since we allow MPI_Win_unlock to return without a done ack from * the target in the case of multiple rma ops and exclusive lock, * we need to check whether there is a lock on the window, and if * there is a lock, poke the progress engine until the operations * have completed and the lock is therefore released. */ if (win_ptr->current_lock_type != MPID_LOCK_NONE) { MPID_Progress_state progress_state; /* poke the progress engine */ MPID_Progress_start(&progress_state); while (win_ptr->current_lock_type != MPID_LOCK_NONE) { mpi_errno = MPID_Progress_wait(&progress_state); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "making progress on the rma messages failed"); goto fn_exit; } /* --END ERROR HANDLING-- */ } MPID_Progress_end(&progress_state); } win_ptr->start_group_ptr = group_ptr; MPIR_Group_add_ref( group_ptr ); win_ptr->start_assert = assert; fn_exit: MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_START); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_Win_complete#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_Win_complete(MPID_Win *win_ptr){ int nest_level_inc = FALSE; int mpi_errno = MPI_SUCCESS; int comm_size, *nops_to_proc, src, new_total_op_count; int i, j, dst, done, total_op_count, *curr_ops_cnt; MPIDI_RMA_ops *curr_ptr, *next_ptr; MPID_Comm *comm_ptr; MPID_Request **requests; /* array of requests */ MPI_Win source_win_handle, target_win_handle; MPIDI_RMA_dtype_info *dtype_infos=NULL; void **dataloops=NULL; /* to store dataloops for each datatype */ MPI_Group win_grp, start_grp; int start_grp_size, *ranks_in_start_grp, *ranks_in_win_grp, rank; MPIU_CHKLMEM_DECL(7); MPIU_THREADPRIV_DECL; MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_COMPLETE); MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_WIN_COMPLETE); MPIU_THREADPRIV_GET; MPID_Comm_get_ptr( win_ptr->comm, comm_ptr ); comm_size = comm_ptr->local_size; /* Translate the ranks of the processes in start_group to ranks in win_ptr->comm */ start_grp_size = win_ptr->start_group_ptr->size; MPIU_CHKLMEM_MALLOC(ranks_in_start_grp, int *, start_grp_size*sizeof(int), mpi_errno, "ranks_in_start_grp"); MPIU_CHKLMEM_MALLOC(ranks_in_win_grp, int *, start_grp_size*sizeof(int), mpi_errno, "ranks_in_win_grp"); for (i=0; i<start_grp_size; i++) { ranks_in_start_grp[i] = i; } nest_level_inc = TRUE; MPIR_Nest_incr(); mpi_errno = NMPI_Comm_group(win_ptr->comm, &win_grp); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } start_grp = win_ptr->start_group_ptr->handle; mpi_errno = NMPI_Group_translate_ranks(start_grp, start_grp_size, ranks_in_start_grp, win_grp, ranks_in_win_grp); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } NMPI_Comm_rank(win_ptr->comm, &rank); /* If MPI_MODE_NOCHECK was not specified, we need to check if Win_post was called on the target processes. Wait for a 0-byte sync message from each target process */ if ((win_ptr->start_assert & MPI_MODE_NOCHECK) == 0) { for (i=0; i<start_grp_size; i++) { src = ranks_in_win_grp[i]; if (src != rank) { mpi_errno = NMPI_Recv(NULL, 0, MPI_INT, src, 100, win_ptr->comm, MPI_STATUS_IGNORE); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } } } /* keep track of no. of ops to each proc. Needed for knowing whether or not to decrement the completion counter. The completion counter is decremented only on the last operation. */ MPIU_CHKLMEM_MALLOC(nops_to_proc, int *, comm_size*sizeof(int), mpi_errno, "nops_to_proc"); for (i=0; i<comm_size; i++) nops_to_proc[i] = 0; total_op_count = 0; curr_ptr = win_ptr->rma_ops_list; while (curr_ptr != NULL) { nops_to_proc[curr_ptr->target_rank]++; total_op_count++; curr_ptr = curr_ptr->next; } MPIU_CHKLMEM_MALLOC(requests, MPID_Request **, (total_op_count+start_grp_size) * sizeof(MPID_Request*), mpi_errno, "requests"); /* We allocate a few extra requests because if there are no RMA ops to a target process, we need to send a 0-byte message just to decrement the completion counter. */ MPIU_CHKLMEM_MALLOC(curr_ops_cnt, int *, comm_size*sizeof(int), mpi_errno, "curr_ops_cnt"); for (i=0; i<comm_size; i++) curr_ops_cnt[i] = 0; if (total_op_count != 0) { MPIU_CHKLMEM_MALLOC(dtype_infos, MPIDI_RMA_dtype_info *, total_op_count*sizeof(MPIDI_RMA_dtype_info), mpi_errno, "dtype_infos"); MPIU_CHKLMEM_MALLOC(dataloops, void **, total_op_count*sizeof(void*), mpi_errno, "dataloops"); for (i=0; i<total_op_count; i++) dataloops[i] = NULL; } i = 0; curr_ptr = win_ptr->rma_ops_list; while (curr_ptr != NULL) { /* The completion counter at the target is decremented only on the last RMA operation. We indicate the last operation by passing the source_win_handle only on the last operation. Otherwise, we pass NULL */ if (curr_ops_cnt[curr_ptr->target_rank] == nops_to_proc[curr_ptr->target_rank] - 1) source_win_handle = win_ptr->handle; else source_win_handle = MPI_WIN_NULL; target_win_handle = win_ptr->all_win_handles[curr_ptr->target_rank]; switch (curr_ptr->type) { case (MPIDI_RMA_PUT): case (MPIDI_RMA_ACCUMULATE): mpi_errno = MPIDI_CH3I_Send_rma_msg(curr_ptr, win_ptr, source_win_handle, target_win_handle, &dtype_infos[i], &dataloops[i], &requests[i]); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } break; case (MPIDI_RMA_GET): mpi_errno = MPIDI_CH3I_Recv_rma_msg(curr_ptr, win_ptr, source_win_handle, target_win_handle, &dtype_infos[i], &dataloops[i], &requests[i]); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } break; default: /* --BEGIN ERROR HANDLING-- */ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "invalid RMA operation"); goto fn_exit; /* --END ERROR HANDLING-- */ } i++; curr_ops_cnt[curr_ptr->target_rank]++; curr_ptr = curr_ptr->next; } /* If the start_group included some processes that did not end up becoming targets of RMA operations from this process, we need to send a dummy message to those processes just to decrement the completion counter */ j = i; new_total_op_count = total_op_count; for (i=0; i<start_grp_size; i++) { dst = ranks_in_win_grp[i]; if (dst == rank) { /* FIXME: MT: this has to be done atomically */ win_ptr->my_counter -= 1; } else if (nops_to_proc[dst] == 0) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_put_t *put_pkt = &upkt.put; MPIDI_VC_t * vc; MPIDI_Pkt_init(put_pkt, MPIDI_CH3_PKT_PUT); put_pkt->addr = NULL; put_pkt->count = 0; put_pkt->datatype = MPI_INT; put_pkt->target_win_handle = win_ptr->all_win_handles[dst]; put_pkt->source_win_handle = win_ptr->handle; MPIDI_Comm_get_vc(comm_ptr, dst, &vc); mpi_errno = MPIU_CALL(MPIDI_CH3,iStartMsg(vc, put_pkt, sizeof(*put_pkt), &requests[j])); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|rmamsg", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ j++; new_total_op_count++; } } if (new_total_op_count) { MPID_Progress_state progress_state; done = 1; MPID_Progress_start(&progress_state); while (new_total_op_count) { for (i=0; i<new_total_op_count; i++) { if (requests[i] != NULL) { if (*(requests[i]->cc_ptr) != 0) { done = 0; break; } else { mpi_errno = requests[i]->status.MPI_ERROR; /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ MPID_Request_release(requests[i]); requests[i] = NULL; } } } if (done) { break; } mpi_errno = MPID_Progress_wait(&progress_state); done = 1; } MPID_Progress_end(&progress_state); } if (total_op_count != 0) { for (i=0; i<total_op_count; i++) { if (dataloops[i] != NULL) { MPIU_Free(dataloops[i]); } } } /* free MPIDI_RMA_ops_list */ curr_ptr = win_ptr->rma_ops_list; while (curr_ptr != NULL) { next_ptr = curr_ptr->next; MPIU_Free(curr_ptr); curr_ptr = next_ptr; } win_ptr->rma_ops_list = NULL; mpi_errno = NMPI_Group_free(&win_grp); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } /* free the group stored in window */ MPIR_Group_release(win_ptr->start_group_ptr); win_ptr->start_group_ptr = NULL; fn_exit: if (nest_level_inc) { MPIR_Nest_decr(); } MPIU_CHKLMEM_FREEALL(); MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_COMPLETE); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */}#undef FUNCNAME#define FUNCNAME MPIDI_Win_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_Win_wait(MPID_Win *win_ptr){ int mpi_errno=MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_WAIT);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -