📄 ch3u_port.c
字号:
MPI_STATUS_IGNORE); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }#ifdef MPICH_DBG_OUTPUT MPIU_DBG_PRINTF(("[%d]connect:Received remote_translation:\n", rank)); for (i=0; i<remote_comm_size; i++) { MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n", i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank)); }#endif } else { mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag, n_remote_pgs, remote_pg ); } /* Broadcast out the remote rank translation array */ MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Broadcasting remote translation"); mpi_errno = MPIR_Bcast(remote_translation, remote_comm_size * 2, MPI_INT, root, comm_ptr); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }#ifdef MPICH_DBG_OUTPUT MPIU_DBG_PRINTF(("[%d]connect:Received remote_translation after broadcast:\n", rank)); for (i=0; i<remote_comm_size; i++) { MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n", i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank)); }#endif intercomm = *newcomm; intercomm->context_id = context_id; intercomm->is_low_group = 1; mpi_errno = SetupNewIntercomm( comm_ptr, remote_comm_size, remote_translation, remote_pg, intercomm ); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* synchronize with remote root */ if (rank == root) { MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"sync with peer"); mpi_errno = MPIC_Sendrecv(&i, 0, MPI_INT, 0, sendtag++, &j, 0, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* All communication with remote root done. Release the communicator. */ MPIR_Comm_release(tmp_comm,0); } /*printf("connect:barrier\n");fflush(stdout);*/ mpi_errno = MPIR_Barrier(comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Free new_vc. It was explicitly allocated in MPIDI_CH3_Connect_to_root.*/ if (rank == root) { FreeNewVC( new_vc ); } fn_exit: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Exiting ch3u_comm_connect"); MPIU_CHKLMEM_FREEALL(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_COMM_CONNECT); return mpi_errno; fn_fail: goto fn_exit;}/* * Extract all of the process groups from the given communicator and * form a list (returned in pg_list) of those process groups. * Also returned is an array (local_translation) that contains tuples mapping * rank in process group to rank in that communicator (local translation * must be allocated before this routine is called). The number of * distinct process groups is returned in n_local_pgs_p . * * This allows an intercomm_create to exchange the full description of * all of the process groups that have made up the communicator that * will define the "remote group". */#undef FUNCNAME#define FUNCNAME ExtractLocalPGInfo#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int ExtractLocalPGInfo( MPID_Comm *comm_p, pg_translation local_translation[], pg_node **pg_list_p, int *n_local_pgs_p ){ pg_node *pg_list = 0, *pg_iter, *pg_trailer; int i, cur_index = 0, local_comm_size, mpi_errno = 0; MPIU_CHKPMEM_DECL(1); MPIDI_STATE_DECL(MPID_STATE_EXTRACTLOCALPGINFO); MPIDI_FUNC_ENTER(MPID_STATE_EXTRACTLOCALPGINFO); /* If we are in the case of singleton-init, we may need to reset the id string for comm world. We do this before doing anything else */ MPIDI_PG_CheckForSingleton(); local_comm_size = comm_p->local_size; /* Make a list of the local communicator's process groups and encode them in strings to be sent to the other side. The encoded string for each process group contains the process group id, size and all its KVS values */ cur_index = 0; MPIU_CHKPMEM_MALLOC(pg_list,pg_node*,sizeof(pg_node),mpi_errno, "pg_list"); pg_list->pg_id = MPIU_Strdup(comm_p->vcr[0]->pg->id); pg_list->index = cur_index++; pg_list->next = NULL; MPIU_Assert( comm_p->vcr[0]->pg->ref_count); mpi_errno = MPIDI_PG_To_string(comm_p->vcr[0]->pg, &pg_list->str, &pg_list->lenStr ); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } MPIU_DBG_STMT(CH3_CONNECT,VERBOSE,MPIDI_PrintConnStr(__FILE__,__LINE__,"PG as string is", pg_list->str )); local_translation[0].pg_index = 0; local_translation[0].pg_rank = comm_p->vcr[0]->pg_rank; pg_iter = pg_list; for (i=1; i<local_comm_size; i++) { pg_iter = pg_list; pg_trailer = pg_list; while (pg_iter != NULL) { /* Check to ensure pg is (probably) valid */ MPIU_Assert(comm_p->vcr[i]->pg->ref_count != 0); if (MPIDI_PG_Id_compare(comm_p->vcr[i]->pg->id, pg_iter->pg_id)) { local_translation[i].pg_index = pg_iter->index; local_translation[i].pg_rank = comm_p->vcr[i]->pg_rank; break; } if (pg_trailer != pg_iter) pg_trailer = pg_trailer->next; pg_iter = pg_iter->next; } if (pg_iter == NULL) { /* We use MPIU_Malloc directly because we do not know in advance how many nodes we may allocate */ pg_iter = (pg_node*)MPIU_Malloc(sizeof(pg_node)); if (!pg_iter) { MPIU_ERR_POP(mpi_errno); } pg_iter->pg_id = MPIU_Strdup(comm_p->vcr[i]->pg->id); pg_iter->index = cur_index++; pg_iter->next = NULL; mpi_errno = MPIDI_PG_To_string(comm_p->vcr[i]->pg, &pg_iter->str, &pg_iter->lenStr ); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } local_translation[i].pg_index = pg_iter->index; local_translation[i].pg_rank = comm_p->vcr[i]->pg_rank; pg_trailer->next = pg_iter; } } *n_local_pgs_p = cur_index; *pg_list_p = pg_list; #ifdef MPICH_DBG_OUTPUT pg_iter = pg_list; while (pg_iter != NULL) { MPIU_DBG_PRINTF(("connect:PG: '%s'\n<%s>\n", pg_iter->pg_id, pg_iter->str)); pg_iter = pg_iter->next; }#endif fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_EXTRACTLOCALPGINFO); return mpi_errno; fn_fail: MPIU_CHKPMEM_REAP(); goto fn_exit;}/* The root process in comm_ptr receives strings describing the process groups and then distributes them to the other processes in comm_ptr. See SendPGToPeer for the routine that sends the descriptions */#undef FUNCNAME#define FUNCNAME ReceivePGAndDistribute#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int ReceivePGAndDistribute( MPID_Comm *tmp_comm, MPID_Comm *comm_ptr, int root, int *recvtag_p, int n_remote_pgs, MPIDI_PG_t *remote_pg[] ){ char *pg_str = 0; int i, j, flag; int rank = comm_ptr->rank; int mpi_errno = 0; int recvtag = *recvtag_p; MPIDI_STATE_DECL(MPID_STATE_RECEIVEPGANDDISTRIBUTE); MPIDI_FUNC_ENTER(MPID_STATE_RECEIVEPGANDDISTRIBUTE); for (i=0; i<n_remote_pgs; i++) { if (rank == root) { /* First, receive the pg description from the partner */ mpi_errno = MPIC_Recv(&j, 1, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); *recvtag_p = recvtag; if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } pg_str = (char*)MPIU_Malloc(j); if (pg_str == NULL) { MPIU_ERR_POP(mpi_errno); } mpi_errno = MPIC_Recv(pg_str, j, MPI_CHAR, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); *recvtag_p = recvtag; if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } /* Broadcast the size and data to the local communicator */ /*printf("accept:broadcasting 1 int\n");fflush(stdout);*/ mpi_errno = MPIR_Bcast(&j, 1, MPI_INT, root, comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } if (rank != root) { /* The root has already allocated this string */ pg_str = (char*)MPIU_Malloc(j); if (pg_str == NULL) { MPIU_ERR_POP(mpi_errno); } } /*printf("accept:broadcasting string of length %d\n", j);fflush(stdout);*/ mpi_errno = MPIR_Bcast(pg_str, j, MPI_CHAR, root, comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Then reconstruct the received process group. This step also initializes the created process group */ MPIU_DBG_STMT(CH3_CONNECT,VERBOSE,MPIDI_PrintConnStr(__FILE__,__LINE__,"Creating pg from string", pg_str )); mpi_errno = MPIDI_PG_Create_from_string(pg_str, &remote_pg[i], &flag); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } MPIU_Free(pg_str); if (flag) { /* FIXME: If this is really needed, make it a destroy callback on the process group rather than an SSHM-specific item */#ifdef MPIDI_CH3_USES_SSHM /* extra pg ref needed for shared memory modules because the * shm_XXXXing_list's * need to be walked though in the later stages of finalize to * free queue_info's. */ /* FIXME: Need to understand this and either remove or make common to all channels */ MPIDI_PG_add_ref(remote_pg[i]);#endif } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_RECEIVEPGANDDISTRIBUTE); return mpi_errno; fn_fail: goto fn_exit;}/* Used internally to broadcast process groups belonging to peercomm to all processes in comm. The process with rank root in comm is the process in peercomm from which the process groups are taken. This routine is collective over comm_p . */#undef FUNCNAME#define FUNCNAME MPID_PG_BCast#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_PG_BCast( MPID_Comm *peercomm_p, MPID_Comm *comm_p, int root ){ int n_local_pgs=0, mpi_errno = 0; pg_translation *local_translation = 0; pg_node *pg_list, *pg_next, *pg_head = 0; int rank, i, peer_comm_size; MPIU_CHKLMEM_DECL(1); peer_comm_size = comm_p->local_size; rank = comm_p->rank; MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*, peer_comm_size*sizeof(pg_translation), mpi_errno,"local_translation"); if (rank == root) { /* Get the process groups known to the *peercomm* */ ExtractLocalPGInfo( peercomm_p, local_translation, &pg_head, &n_local_pgs ); } /* Now, broadcast the number of local pgs */ NMPI_Bcast( &n_local_pgs, 1, MPI_INT, root, comm_p->handle ); pg_list = pg_head; for (i=0; i<n_local_pgs; i++) { int len, flag; char *pg_str=0; MPIDI_PG_t *pgptr; if (rank == root) { if (!pg_list) { /* FIXME: Error, the pg_list is broken */ printf( "Unexpected end of pg_list\n" ); fflush(stdout); break; } pg_str = pg_list->str; len = pg_list->lenStr + 1; pg_list = pg_list->next; } NMPI_Bcast( &len, 1, MPI_INT, root, comm_p->handle ); if (rank != root) { pg_str = (char *)MPIU_Malloc(len); } NMPI_Bcast( pg_str, len, MPI_CHAR, root, comm_p->handle ); if (rank != root) { /* flag is true if the pg was created, false if it already existed. This step also initializes the created process group */ MPIDI_PG_Create_from_string( pg_str, &pgptr, &flag ); if (flag) { /*printf( "[%d]Added pg named %s to list\n", rank, (char *)pgptr->id ); fflush(stdout); */ } MPIU_Free( pg_str ); } } /* Free pg_list */ pg_list = pg_head; /* FIXME: We should use the PG destroy function for this, and ensure that the PG fields are valid for that function */ while (pg_list) { pg_next = pg_list->next; MPIU_Free( pg_list->str ); if (pg_list->pg_id ) { MPIU_Free( pg_list->pg_id ); } MPIU_Free( pg_list ); pg_list = pg_next; } fn_exit: MPIU_CHKLMEM_FREEALL(); return mpi_errno; fn_fail: goto fn_exit;}/* Sends the process group information to the peer and frees the pg_list */#undef FUNCNAME#define FUNCNAME SendPGtoPeerAndFree#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int SendPGtoPeerAndFree( MPID_Comm *tmp_comm, int *sendtag_p, pg_node *pg_list ){ int mpi_errno = 0; int sendtag = *sendtag_p, i; pg_node *pg_iter; MPIDI_STATE_DECL(MPID_STATE_SENDPGTOPEERANDFREE); MPIDI_FUNC_ENTER(MPID_STATE_SENDPGTOPEERANDFREE); while (pg_list != NULL) { pg_iter = pg_list; i = pg_iter->lenStr + 1; /*printf("connect:sending 1 int: %d\n", i);fflush(stdout);*/ mpi_errno = MPIC_Send(&i, 1, MPI_INT, 0, sendtag++, tmp_comm->handle); *sendtag_p = sendtag; if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* printf("connect:sending string length %d\n", i);fflush(stdout); */ mpi_errno = MPIC_Send(pg_iter->str, i, MPI_CHAR, 0, sendtag++, tmp_comm->handle); *sendtag_p = sendtag; if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } pg_list = pg_list->next; MPIU_Free(pg_iter->str); MPIU_Free(pg_iter->pg_id); MPIU_Free(pg_iter); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_SENDPGTOPEERANDFREE); return mpi_errno; fn_fail: goto fn_exit;}/* ---------------------------------------------------------------------- *//* * MPIDI_Comm_accept() Algorithm: First dequeue the vc from the accept queue (it was enqueued by the progress engine in response to a connect request from the root process that is attempting the connection on the connect side). Use this vc to create an intercommunicator between this root and the root on the connect side. Use this intercomm. to communicate the other information needed to create the real intercommunicator between the processes on the two sides. Then free the intercommunicator between the roots. Most of the complexity is because there can be multiple
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -