📄 ch3u_port.c
字号:
char *pg_str = 0; int i, j, flag, p; int rank = comm_ptr->rank; int mpi_errno = 0; int recvtag = *recvtag_p; MPIDI_STATE_DECL(MPID_STATE_RECEIVEPGANDDISTRIBUTE); MPIDI_FUNC_ENTER(MPID_STATE_RECEIVEPGANDDISTRIBUTE); for (i=0; i<n_remote_pgs; i++) { if (rank == root) { /* First, receive the pg description from the partner */ mpi_errno = MPIC_Recv(&j, 1, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); *recvtag_p = recvtag; if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } pg_str = (char*)MPIU_Malloc(j); if (pg_str == NULL) { MPIU_ERR_POP(mpi_errno); } mpi_errno = MPIC_Recv(pg_str, j, MPI_CHAR, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); *recvtag_p = recvtag; if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } /* Broadcast the size and data to the local communicator */ /*printf("accept:broadcasting 1 int\n");fflush(stdout);*/ mpi_errno = MPIR_Bcast(&j, 1, MPI_INT, root, comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } if (rank != root) { /* The root has already allocated this string */ pg_str = (char*)MPIU_Malloc(j); if (pg_str == NULL) { MPIU_ERR_POP(mpi_errno); } } /*printf("accept:broadcasting string of length %d\n", j);fflush(stdout);*/ mpi_errno = MPIR_Bcast(pg_str, j, MPI_CHAR, root, comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Then reconstruct the received process group */ mpi_errno = MPIDI_PG_Create_from_string(pg_str, &remote_pg[i], &flag); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } MPIU_Free(pg_str); if (flag) {#ifdef MPIDI_CH3_USES_SSHM /* extra pg ref needed for shared memory modules because the * shm_XXXXing_list's * need to be walked though in the later stages of finalize to * free queue_info's. */ MPIDI_PG_Add_ref(remote_pg[i]);#endif for (p=0; p<remote_pg[i]->size; p++) { MPIDI_VC_t *vc; MPIDI_PG_Get_vcr(remote_pg[i], p, &vc); MPIDI_CH3_VC_Init( vc ); } } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_RECEIVEPGANDDISTRIBUTE); return mpi_errno; fn_fail: goto fn_exit;}/* Used internally to broadcast process groups belonging to peercomm to all processes in comm. The process with rank root in comm is the process in peercomm from which the process groups are taken. */#undef FUNCNAME#define FUNCNAME MPID_PG_BCast#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_PG_BCast( MPID_Comm *peercomm_p, MPID_Comm *comm_p, int root ){ int n_local_pgs=0, mpi_errno = 0; pg_translation *local_translation = 0; pg_node *pg_list, *pg_next, *pg_head = 0; int rank, i, peer_comm_size; MPIU_CHKLMEM_DECL(1); peer_comm_size = comm_p->local_size; rank = comm_p->rank; MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*, peer_comm_size*sizeof(pg_translation), mpi_errno,"local_translation"); if (rank == root) { /* Get the process groups known to the *peercomm* */ ExtractLocalPGInfo( peercomm_p, local_translation, &pg_head, &n_local_pgs ); } /* Now, broadcast the number of local pgs */ NMPI_Bcast( &n_local_pgs, 1, MPI_INT, root, comm_p->handle ); /* printf( "Number of pgs = %d\n", n_local_pgs ); fflush(stdout); */ pg_list = pg_head; for (i=0; i<n_local_pgs; i++) { int len, flag; char *pg_str=0; MPIDI_PG_t *pgptr; if (rank == root) { if (!pg_list) { /* FIXME: Error, the pg_list is broken */ printf( "Unexpected end of pg_list\n" ); fflush(stdout); break; } pg_str = pg_list->str; pg_list = pg_list->next; len = (int)strlen(pg_str) + 1; } NMPI_Bcast( &len, 1, MPI_INT, root, comm_p->handle ); if (rank != root) { pg_str = (char *)MPIU_Malloc(len); } NMPI_Bcast( pg_str, len, MPI_CHAR, root, comm_p->handle ); if (rank != root) { /* flag is true if the pg was created, false if it already existed */ MPIDI_PG_Create_from_string( pg_str, &pgptr, &flag ); if (flag) { int p; /*printf( "[%d]Added pg named %s to list\n", rank, (char *)pgptr->id ); fflush(stdout); */ /* FIXME: This initalization should be done when the pg is created ? */ for (p=0; p<pgptr->size; p++) { MPIDI_VC_t *vc; MPIDI_PG_Get_vcr(pgptr, p, &vc); MPIDI_CH3_VC_Init( vc ); } } MPIU_Free( pg_str ); } } /* Free pg_list */ pg_list = pg_head; /* FIXME: We should use the PG destroy function for this, and ensure that the PG fields are valid for that function */ while (pg_list) { pg_next = pg_list->next; MPIU_Free( pg_list->str ); if (pg_list->pg_id ) { MPIU_Free( pg_list->pg_id ); } MPIU_Free( pg_list ); pg_list = pg_next; } fn_exit: MPIU_CHKLMEM_FREEALL(); return mpi_errno; fn_fail: goto fn_exit;}/* Sends the process group information to the peer and frees the pg_list */#undef FUNCNAME#define FUNCNAME SendPGtoPeerAndFree#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int SendPGtoPeerAndFree( MPID_Comm *tmp_comm, int *sendtag_p, pg_node *pg_list ){ int mpi_errno = 0; int sendtag = *sendtag_p, i; pg_node *pg_iter; MPIDI_STATE_DECL(MPID_STATE_SENDPGTOPEERANDFREE); MPIDI_FUNC_ENTER(MPID_STATE_SENDPGTOPEERANDFREE); while (pg_list != NULL) { pg_iter = pg_list; i = (int)(strlen(pg_iter->str) + 1); /*printf("connect:sending 1 int: %d\n", i);fflush(stdout);*/ mpi_errno = MPIC_Send(&i, 1, MPI_INT, 0, sendtag++, tmp_comm->handle); *sendtag_p = sendtag; if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /*printf("connect:sending string length %d\n", i);fflush(stdout);*/ mpi_errno = MPIC_Send(pg_iter->str, i, MPI_CHAR, 0, sendtag++, tmp_comm->handle); *sendtag_p = sendtag; if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } pg_list = pg_list->next; MPIU_Free(pg_iter->str); MPIU_Free(pg_iter->pg_id); MPIU_Free(pg_iter); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_SENDPGTOPEERANDFREE); return mpi_errno; fn_fail: goto fn_exit;}/* ---------------------------------------------------------------------- */#undef FUNCNAME#define FUNCNAME MPIDI_Create_inter_root_communicator_accept#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_Create_inter_root_communicator_accept(const char *port_name, MPID_Comm **comm_pptr, MPIDI_VC_t **vc_pptr){ int mpi_errno = MPI_SUCCESS; MPID_Comm *tmp_comm; MPIDI_VC_t *new_vc = NULL; MPID_Progress_state progress_state; int port_name_tag; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT); /* FIXME: This code should parallel the MPIDI_CH3_Connect_to_root code used in the MPIDI_Create_inter_root_communicator_connect */ /* extract the tag from the port_name */ mpi_errno = MPIDI_GetTagFromPort( port_name, &port_name_tag); if (mpi_errno != MPIU_STR_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* dequeue the accept queue to see if a connection with the root on the connect side has been formed in the progress engine (the connection is returned in the form of a vc). If not, poke the progress engine. */ MPID_Progress_start(&progress_state); for(;;) { MPIDI_CH3I_Acceptq_dequeue(&new_vc, port_name_tag); if (new_vc != NULL) { break; } mpi_errno = MPID_Progress_wait(&progress_state); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { MPID_Progress_end(&progress_state); MPIU_ERR_POP(mpi_errno); } /* --END ERROR HANDLING-- */ } MPID_Progress_end(&progress_state); mpi_errno = MPIDI_CH3I_Initialize_tmp_comm(&tmp_comm, new_vc, 0); /* If the VC creates non-duplex connections then the acceptor will * need to connect back to form the other half of the connection. */#ifdef MPIDI_CH3_HAS_CONN_ACCEPT_HOOK mpi_errno = MPIDI_CH3_Complete_unidirectional_connection2( new_vc );#endif *comm_pptr = tmp_comm; *vc_pptr = new_vc;fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT); return mpi_errno;fn_fail: goto fn_exit;}/* * MPIDI_Comm_accept() Algorithm: First dequeue the vc from the accept queue (it was enqueued by the progress engine in response to a connect request from the root on the connect side). Use this vc to create an intercommunicator between this root and the root on the connect side. Use this intercomm. to communicate the other information needed to create the real intercommunicator between the processes on the two sides. Then free the intercommunicator between the roots. Most of the complexity is because there can be multiple process groups on each side. */#undef FUNCNAME#define FUNCNAME MPIDI_Comm_accept#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_Comm_accept(const char *port_name, MPID_Info *info, int root, MPID_Comm *comm_ptr, MPID_Comm **newcomm){ int mpi_errno=MPI_SUCCESS; int i, j, rank, recv_ints[2], send_ints[3]; int remote_comm_size=0; MPID_Comm *tmp_comm = NULL, *intercomm; MPIDI_VC_t *new_vc = NULL; int n_local_pgs=1, n_remote_pgs; int sendtag=100, recvtag=100, local_comm_size; pg_translation *local_translation = NULL, *remote_translation = NULL; pg_node *pg_list = NULL; MPIDI_PG_t **remote_pg = NULL; MPIU_CHKLMEM_DECL(3); MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMM_ACCEPT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_COMM_ACCEPT); /* Create the new intercommunicator here. We need to send the context id to the other side. */ /* FIXME: There is a danger that the context id won't be unique on the other side of this connection */ mpi_errno = MPIR_Comm_create(comm_ptr, newcomm); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } rank = comm_ptr->rank; local_comm_size = comm_ptr->local_size; if (rank == root) { /* Establish a communicator to communicate with the root on the other side. */ mpi_errno = MPIDI_Create_inter_root_communicator_accept(port_name, &tmp_comm, &new_vc); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Make an array to translate local ranks to process group index and rank */ MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*, local_comm_size*sizeof(pg_translation), mpi_errno,"local_translation"); /* Make a list of the local communicator's process groups and encode them in strings to be sent to the other side. The encoded string for each process group contains the process group id, size and all its KVS values */ mpi_errno = ExtractLocalPGInfo( comm_ptr, local_translation, &pg_list, &n_local_pgs ); /* Send the remote root: n_local_pgs, local_comm_size, context_id for newcomm. Recv from the remote root: n_remote_pgs, remote_comm_size */ send_ints[0] = n_local_pgs; send_ints[1] = local_comm_size; send_ints[2] = (*newcomm)->context_id; /*printf("accept:sending 3 ints, %d, %d, %d, and receiving 2 ints\n", send_ints[0], send_ints[1], send_ints[2]);fflush(stdout);*/ mpi_errno = MPIC_Sendrecv(send_ints, 3, MPI_INT, 0, sendtag++, recv_ints, 2, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } /* broadcast the received info to local processes */ /*printf("accept:broadcasting 2 ints - %d and %d\n", recv_ints[0], recv_ints[1]);fflush(stdout);*/ mpi_errno = MPIR_Bcast(recv_ints, 2, MPI_INT, root, comm_ptr); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } n_remote_pgs = recv_ints[0]; remote_comm_size = recv_ints[1]; MPIU_CHKLMEM_MALLOC(remote_pg,MPIDI_PG_t**, n_remote_pgs * sizeof(MPIDI_PG_t*), mpi_errno,"remote_pg"); MPIU_CHKLMEM_MALLOC(remote_translation,pg_translation*, remote_comm_size * sizeof(pg_translation), mpi_errno, "remote_translation"); MPIU_DBG_PRINTF(("[%d]accept:remote process groups: %d\nremote comm size: %d\n", rank, n_remote_pgs, remote_comm_size)); /* Exchange the process groups and their corresponding KVSes */ if (rank == root) { /* The root receives the PG from the peer (in tmp_comm) and distributes them to the processes in comm_ptr */ mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag, n_remote_pgs, remote_pg ); mpi_errno = SendPGtoPeerAndFree( tmp_comm, &sendtag, pg_list ); /* Receive the translations from remote process rank to process group index */ /*printf("accept:sending %d ints and receiving %d ints\n", local_comm_size * 2, remote_comm_size * 2);fflush(stdout);*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -