📄 ch3u_port.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. *//* FIXME: This could go into util/port as a general utility routine *//* FIXME: This is needed/used only if dynamic processes are supported (e.g., another reason to place it into util/port) */#include "mpidi_ch3_impl.h"/* * This file replaces ch3u_comm_connect.c and ch3u_comm_accept.c . These * routines need to be used together, particularly since they must exchange * information. In addition, many of the steps that the take are identical, * such as building the new process group information after a connection. * By having these routines in the same file, it is easier for them * to share internal routines and it is easier to ensure that communication * between the two root processes (the connector and acceptor) are * consistent. *//* FIXME: If dynamic processes are not supported, this file will contain no code and some compilers may warn about an "empty translation unit */#ifndef MPIDI_CH3_HAS_NO_DYNAMIC_PROCESStypedef struct pg_translation { int pg_index; int pg_rank;} pg_translation;typedef struct pg_node { int index; char *pg_id; char *str; struct pg_node *next;} pg_node;/* These functions help implement the connect/accept algorithm */static int ExtractLocalPGInfo( MPID_Comm *, pg_translation [], pg_node **, int * );static int ReceivePGAndDistribute( MPID_Comm *, MPID_Comm *, int, int *, int, MPIDI_PG_t *[] );static int SendPGtoPeerAndFree( MPID_Comm *, int *, pg_node * );static int FreeNewVC( MPIDI_VC_t *new_vc );static int SetupNewIntercomm( MPID_Comm *comm_ptr, int remote_comm_size, pg_translation remote_translation[], MPIDI_PG_t **remote_pg, MPID_Comm *intercomm );#undef FUNCNAME#define FUNCNAME MPIDI_Create_inter_root_communicator_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_Create_inter_root_communicator_connect(const char *port_name, MPID_Comm **comm_pptr, MPIDI_VC_t **vc_pptr){ int mpi_errno = MPI_SUCCESS; MPID_Comm *tmp_comm; MPIDI_VC_t *connect_vc = NULL; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_CONNECT); /* Connect to the root on the other side. Create a temporary intercommunicator between the two roots so that we can use MPI functions to communicate data between them. */ mpi_errno = MPIDI_CH3_Connect_to_root(port_name, &connect_vc); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } mpi_errno = MPIDI_CH3I_Initialize_tmp_comm(&tmp_comm, connect_vc, 1); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }#ifdef MPIDI_CH3_HAS_CONN_ACCEPT_HOOK /* If the VC creates non-duplex connections then the acceptor will * need to connect back to form the other half of the connection. */ mpi_errno = MPIDI_CH3_Complete_unidirectional_connection( connect_vc );#endif *comm_pptr = tmp_comm; *vc_pptr = connect_vc; fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_CONNECT); return mpi_errno; fn_fail: goto fn_exit;}/* MPIDI_Comm_connect() Algorithm: First create a connection (vc) between this root and the root on the accept side. Using this vc, create a temporary intercomm between the two roots. Use MPI functions to communicate the other information needed to create the real intercommunicator between the processes on the two sides. Then free the intercommunicator between the roots. Most of the complexity is because there can be multiple process groups on each side. */ #undef FUNCNAME#define FUNCNAME MPIDI_Comm_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_Comm_connect(const char *port_name, MPID_Info *info, int root, MPID_Comm *comm_ptr, MPID_Comm **newcomm){ int mpi_errno=MPI_SUCCESS; int j, i, rank, recv_ints[3], send_ints[2], context_id; int remote_comm_size=0; MPID_Comm *tmp_comm = NULL, *intercomm; MPIDI_VC_t *new_vc; int sendtag=100, recvtag=100, n_remote_pgs; int n_local_pgs=1, local_comm_size; pg_translation *local_translation = NULL, *remote_translation = NULL; pg_node *pg_list = NULL; MPIDI_PG_t **remote_pg = NULL; MPIU_CHKLMEM_DECL(3); MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMM_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_COMM_CONNECT); rank = comm_ptr->rank; local_comm_size = comm_ptr->local_size; if (rank == root) { /* Establish a communicator to communicate with the root on the other side. */ mpi_errno = MPIDI_Create_inter_root_communicator_connect(port_name, &tmp_comm, &new_vc); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Make an array to translate local ranks to process group index and rank */ MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*, local_comm_size*sizeof(pg_translation), mpi_errno,"local_translation"); /* Make a list of the local communicator's process groups and encode them in strings to be sent to the other side. The encoded string for each process group contains the process group id, size and all its KVS values */ mpi_errno = ExtractLocalPGInfo( comm_ptr, local_translation, &pg_list, &n_local_pgs ); /* Send the remote root: n_local_pgs, local_comm_size, Recv from the remote root: n_remote_pgs, remote_comm_size, context_id for newcomm */ send_ints[0] = n_local_pgs; send_ints[1] = local_comm_size; MPIU_DBG_MSG_FMT(CH3_CONNECT,VERBOSE,(MPIU_DBG_FDEST, "sending two ints, %d and %d, and receiving 3 ints", send_ints[0], send_ints[1])); mpi_errno = MPIC_Sendrecv(send_ints, 2, MPI_INT, 0, sendtag++, recv_ints, 3, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } /* broadcast the received info to local processes */ MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"broadcasting the received 3 ints"); mpi_errno = MPIR_Bcast(recv_ints, 3, MPI_INT, root, comm_ptr); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } n_remote_pgs = recv_ints[0]; remote_comm_size = recv_ints[1]; context_id = recv_ints[2]; MPIU_CHKLMEM_MALLOC(remote_pg,MPIDI_PG_t**, n_remote_pgs * sizeof(MPIDI_PG_t*), mpi_errno,"remote_pg"); MPIU_CHKLMEM_MALLOC(remote_translation,pg_translation*, remote_comm_size * sizeof(pg_translation), mpi_errno,"remote_translation"); MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"allocated remote process groups"); /* Exchange the process groups and their corresponding KVSes */ if (rank == root) { mpi_errno = SendPGtoPeerAndFree( tmp_comm, &sendtag, pg_list ); mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag, n_remote_pgs, remote_pg ); /* Receive the translations from remote process rank to process group index */ MPIU_DBG_MSG_FMT(CH3_CONNECT,VERBOSE,(MPIU_DBG_FDEST, "sending %d ints, receiving %d ints", local_comm_size * 2, remote_comm_size * 2)); mpi_errno = MPIC_Sendrecv(local_translation, local_comm_size * 2, MPI_INT, 0, sendtag++, remote_translation, remote_comm_size * 2, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }#ifdef MPICH_DBG_OUTPUT MPIU_DBG_PRINTF(("[%d]connect:Received remote_translation:\n", rank)); for (i=0; i<remote_comm_size; i++) { MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n", i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank)); }#endif } else { mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag, n_remote_pgs, remote_pg ); } /* Broadcast out the remote rank translation array */ MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Broadcasting remote translation"); mpi_errno = MPIR_Bcast(remote_translation, remote_comm_size * 2, MPI_INT, root, comm_ptr); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }#ifdef MPICH_DBG_OUTPUT MPIU_DBG_PRINTF(("[%d]connect:Received remote_translation after broadcast:\n", rank)); for (i=0; i<remote_comm_size; i++) { MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n", i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank)); }#endif /* create and fill in the new intercommunicator */ mpi_errno = MPIR_Comm_create(comm_ptr, newcomm); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } intercomm = *newcomm; intercomm->context_id = context_id; intercomm->is_low_group = 1; mpi_errno = SetupNewIntercomm( comm_ptr, remote_comm_size, remote_translation, remote_pg, intercomm ); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* synchronize with remote root */ if (rank == root) { MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"sync with peer"); mpi_errno = MPIC_Sendrecv(&i, 0, MPI_INT, 0, sendtag++, &j, 0, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* All communication with remote root done. Release the communicator. */ MPIR_Comm_release(tmp_comm); } /*printf("connect:barrier\n");fflush(stdout);*/ mpi_errno = MPIR_Barrier(comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Free new_vc. It was explicitly allocated in MPIDI_CH3_Connect_to_root.*/ if (rank == root) { FreeNewVC( new_vc ); } fn_exit: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Exiting ch3u_comm_connect"); MPIU_CHKLMEM_FREEALL(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_COMM_CONNECT); return mpi_errno; fn_fail: goto fn_exit;}/* * Extract all of the process groups from the given communicator and * form a list (returned in pg_list) that contains that information. * Also returned is an array (local_translation) that contains tuples mapping * rank in process group to rank in that communicator (local translation * must be allocated before this routine is called). The number of * distinct process groups is returned in n_local_pgs_p . */#undef FUNCNAME#define FUNCNAME ExtractLocalPGInfo#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int ExtractLocalPGInfo( MPID_Comm *comm_p, pg_translation local_translation[], pg_node **pg_list_p, int *n_local_pgs_p ){ pg_node *pg_list = 0, *pg_iter, *pg_trailer; int i, cur_index = 0, local_comm_size, mpi_errno = 0; MPIU_CHKPMEM_DECL(1); MPIDI_STATE_DECL(MPID_STATE_EXTRACTLOCALPGINFO); MPIDI_FUNC_ENTER(MPID_STATE_EXTRACTLOCALPGINFO); local_comm_size = comm_p->local_size; /* Make a list of the local communicator's process groups and encode them in strings to be sent to the other side. The encoded string for each process group contains the process group id, size and all its KVS values */ cur_index = 0; MPIU_CHKPMEM_MALLOC(pg_list,pg_node*,sizeof(pg_node),mpi_errno, "pg_list"); pg_list->pg_id = MPIU_Strdup(comm_p->vcr[0]->pg->id); pg_list->index = cur_index++; pg_list->next = NULL; mpi_errno = MPIDI_PG_To_string(comm_p->vcr[0]->pg, &pg_list->str); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } local_translation[0].pg_index = 0; local_translation[0].pg_rank = comm_p->vcr[0]->pg_rank; pg_iter = pg_list; for (i=1; i<local_comm_size; i++) { pg_iter = pg_list; pg_trailer = pg_list; while (pg_iter != NULL) { if (MPIDI_PG_Id_compare(comm_p->vcr[i]->pg->id, pg_iter->pg_id)) { local_translation[i].pg_index = pg_iter->index; local_translation[i].pg_rank = comm_p->vcr[i]->pg_rank; break; } if (pg_trailer != pg_iter) pg_trailer = pg_trailer->next; pg_iter = pg_iter->next; } if (pg_iter == NULL) { /* We use MPIU_Malloc directly because we do not know in advance how many nodes we may allocate */ pg_iter = (pg_node*)MPIU_Malloc(sizeof(pg_node)); if (!pg_iter) { MPIU_ERR_POP(mpi_errno); } pg_iter->pg_id = MPIU_Strdup(comm_p->vcr[i]->pg->id); pg_iter->index = cur_index++; pg_iter->next = NULL; mpi_errno = MPIDI_PG_To_string(comm_p->vcr[i]->pg, &pg_iter->str); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } local_translation[i].pg_index = pg_iter->index; local_translation[i].pg_rank = comm_p->vcr[i]->pg_rank; pg_trailer->next = pg_iter; } } *n_local_pgs_p = cur_index; *pg_list_p = pg_list; #ifdef MPICH_DBG_OUTPUT pg_iter = pg_list; while (pg_iter != NULL) { MPIU_DBG_PRINTF(("connect:PG: '%s'\n<%s>\n", pg_iter->pg_id, pg_iter->str)); pg_iter = pg_iter->next; }#endif fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_EXTRACTLOCALPGINFO); return mpi_errno; fn_fail: MPIU_CHKPMEM_REAP(); goto fn_exit;}/* The root process in comm_ptr receives strings describing the process groups and then distributes them to the other processes in comm_ptr. See SendPGToPeer for the routine that sends the descriptions */#undef FUNCNAME#define FUNCNAME ReceivePGAndDistribute#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int ReceivePGAndDistribute( MPID_Comm *tmp_comm, MPID_Comm *comm_ptr, int root, int *recvtag_p, int n_remote_pgs, MPIDI_PG_t *remote_pg[] ){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -