📄 ch3u_port.c
字号:
process groups on each side. */#undef FUNCNAME#define FUNCNAME MPIDI_Comm_accept#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_Comm_accept(const char *port_name, MPID_Info *info, int root, MPID_Comm *comm_ptr, MPID_Comm **newcomm){ int mpi_errno=MPI_SUCCESS; int i, j, rank, recv_ints[3], send_ints[3], context_id; int remote_comm_size=0; MPID_Comm *tmp_comm = NULL, *intercomm; MPIDI_VC_t *new_vc = NULL; int sendtag=100, recvtag=100, local_comm_size; int n_local_pgs=1, n_remote_pgs; pg_translation *local_translation = NULL, *remote_translation = NULL; pg_node *pg_list = NULL; MPIDI_PG_t **remote_pg = NULL; MPIU_CHKLMEM_DECL(3); MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMM_ACCEPT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_COMM_ACCEPT); /* Create the new intercommunicator here. We need to send the context id to the other side. */ mpi_errno = MPIR_Comm_create(newcomm); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } (*newcomm)->recvcontext_id = MPIR_Get_contextid( comm_ptr ); if ((*newcomm)->recvcontext_id == 0) { MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**toomanycomm" ); } /* (*newcomm)->context_id = (*newcomm)->recvcontext_id; */ rank = comm_ptr->rank; local_comm_size = comm_ptr->local_size; if (rank == root) { /* Establish a communicator to communicate with the root on the other side. */ mpi_errno = MPIDI_Create_inter_root_communicator_accept(port_name, &tmp_comm, &new_vc); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Make an array to translate local ranks to process group index and rank */ MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*, local_comm_size*sizeof(pg_translation), mpi_errno,"local_translation"); /* Make a list of the local communicator's process groups and encode them in strings to be sent to the other side. The encoded string for each process group contains the process group id, size and all its KVS values */ mpi_errno = ExtractLocalPGInfo( comm_ptr, local_translation, &pg_list, &n_local_pgs ); /* Send the remote root: n_local_pgs, local_comm_size, context_id for newcomm. Recv from the remote root: n_remote_pgs, remote_comm_size */ send_ints[0] = n_local_pgs; send_ints[1] = local_comm_size; send_ints[2] = (*newcomm)->recvcontext_id; /*printf("accept:sending 3 ints, %d, %d, %d, and receiving 2 ints\n", send_ints[0], send_ints[1], send_ints[2]);fflush(stdout);*/ mpi_errno = MPIC_Sendrecv(send_ints, 3, MPI_INT, 0, sendtag++, recv_ints, 3, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } /* broadcast the received info to local processes */ /*printf("accept:broadcasting 2 ints - %d and %d\n", recv_ints[0], recv_ints[1]);fflush(stdout);*/ mpi_errno = MPIR_Bcast(recv_ints, 3, MPI_INT, root, comm_ptr); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } n_remote_pgs = recv_ints[0]; remote_comm_size = recv_ints[1]; context_id = recv_ints[2]; MPIU_CHKLMEM_MALLOC(remote_pg,MPIDI_PG_t**, n_remote_pgs * sizeof(MPIDI_PG_t*), mpi_errno,"remote_pg"); MPIU_CHKLMEM_MALLOC(remote_translation,pg_translation*, remote_comm_size * sizeof(pg_translation), mpi_errno, "remote_translation"); MPIU_DBG_PRINTF(("[%d]accept:remote process groups: %d\nremote comm size: %d\n", rank, n_remote_pgs, remote_comm_size)); /* Exchange the process groups and their corresponding KVSes */ if (rank == root) { /* The root receives the PG from the peer (in tmp_comm) and distributes them to the processes in comm_ptr */ mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag, n_remote_pgs, remote_pg ); mpi_errno = SendPGtoPeerAndFree( tmp_comm, &sendtag, pg_list ); /* Receive the translations from remote process rank to process group index */ /*printf("accept:sending %d ints and receiving %d ints\n", local_comm_size * 2, remote_comm_size * 2);fflush(stdout);*/ mpi_errno = MPIC_Sendrecv(local_translation, local_comm_size * 2, MPI_INT, 0, sendtag++, remote_translation, remote_comm_size * 2, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE);#ifdef MPICH_DBG_OUTPUT MPIU_DBG_PRINTF(("[%d]accept:Received remote_translation:\n", rank)); for (i=0; i<remote_comm_size; i++) { MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n", i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank)); }#endif } else { mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag, n_remote_pgs, remote_pg ); } /* Broadcast out the remote rank translation array */ MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Broadcast remote_translation"); mpi_errno = MPIR_Bcast(remote_translation, remote_comm_size * 2, MPI_INT, root, comm_ptr);#ifdef MPICH_DBG_OUTPUT MPIU_DBG_PRINTF(("[%d]accept:Received remote_translation after broadcast:\n", rank)); for (i=0; i<remote_comm_size; i++) { MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n", i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank)); }#endif /* Now fill in newcomm */ intercomm = *newcomm; intercomm->context_id = context_id; intercomm->is_low_group = 0; mpi_errno = SetupNewIntercomm( comm_ptr, remote_comm_size, remote_translation, remote_pg, intercomm ); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* synchronize with remote root */ if (rank == root) { MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"sync with peer"); mpi_errno = MPIC_Sendrecv(&i, 0, MPI_INT, 0, sendtag++, &j, 0, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* All communication with remote root done. Release the communicator. */ MPIR_Comm_release(tmp_comm,0); } MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Barrier"); mpi_errno = MPIR_Barrier(comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Free new_vc once the connection is completed. It was explicitly allocated in ch3_progress.c and returned by MPIDI_CH3I_Acceptq_dequeue. */ if (rank == root) { FreeNewVC( new_vc ); }fn_exit: MPIU_CHKLMEM_FREEALL(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_COMM_ACCEPT); return mpi_errno;fn_fail: goto fn_exit;}/* ------------------------------------------------------------------------- *//* This routine initializes the new intercomm, setting up the VCRT and other common structures. The is_low_group and context_id fields are NOT set because they differ in the use of this routine in Comm_accept and Comm_connect. The virtual connections are initialized from a collection of process groups. Input parameters:+ comm_ptr - communicator that gives the group for the "local" group on the new intercommnicator. remote_comm_size - size of remote group. remote_translation - array that specifies the process group and rank in that group for each of the processes to include in the remote group of the new intercommunicator- remote_pg - array of remote process groups Input/Output Parameter:. intercomm - New intercommunicator. The intercommunicator must already have been allocated; this routine initializes many of the fields Note: This routine performance a barrier over 'comm_ptr'. Why?*/#undef FUNCNAME#define FUNCNAME SetupNewIntercomm#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int SetupNewIntercomm( MPID_Comm *comm_ptr, int remote_comm_size, pg_translation remote_translation[], MPIDI_PG_t **remote_pg, MPID_Comm *intercomm ){ int mpi_errno = MPI_SUCCESS, i; /* FIXME: How much of this could/should be common with the upper level (src/mpi/comm/ *.c) code? For best robustness, this should use the same routine (not copy/paste code) as in the upper level code. */ intercomm->attributes = NULL; intercomm->remote_size = remote_comm_size; intercomm->local_size = comm_ptr->local_size; intercomm->rank = comm_ptr->rank; intercomm->local_group = NULL; intercomm->remote_group = NULL; intercomm->comm_kind = MPID_INTERCOMM; intercomm->local_comm = NULL; intercomm->coll_fns = NULL; /* Point local vcr, vcrt at those of incoming intracommunicator */ intercomm->local_vcrt = comm_ptr->vcrt; MPID_VCRT_Add_ref(comm_ptr->vcrt); intercomm->local_vcr = comm_ptr->vcr; /* Set up VC reference table */ mpi_errno = MPID_VCRT_Create(intercomm->remote_size, &intercomm->vcrt); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_vcrt"); } mpi_errno = MPID_VCRT_Get_ptr(intercomm->vcrt, &intercomm->vcr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_getptr"); } for (i=0; i < intercomm->remote_size; i++) { MPIDI_PG_Dup_vcr(remote_pg[remote_translation[i].pg_index], remote_translation[i].pg_rank, &intercomm->vcr[i]); } MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Barrier"); mpi_errno = MPIR_Barrier(comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } fn_exit: return mpi_errno; fn_fail: goto fn_exit;}/* Free new_vc. It was explicitly allocated in MPIDI_CH3_Connect_to_root. *//* FIXME: The free and the create routines should be in the same file */#undef FUNCNAME#define FUNCNAME FreeNewVC#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int FreeNewVC( MPIDI_VC_t *new_vc ){ MPID_Progress_state progress_state; int mpi_errno = MPI_SUCCESS; if (new_vc->state != MPIDI_VC_STATE_INACTIVE) { /* If the new_vc isn't done, run the progress engine until the state of the new vc is complete */ MPID_Progress_start(&progress_state); while (new_vc->state != MPIDI_VC_STATE_INACTIVE) { mpi_errno = MPID_Progress_wait(&progress_state); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); MPIU_ERR_POP(mpi_errno); } /* --END ERROR HANDLING-- */ } MPID_Progress_end(&progress_state); } /* FIXME: remove this ifdef - method on connection? */#ifdef MPIDI_CH3_HAS_CONN_ACCEPT_HOOK mpi_errno = MPIDI_CH3_Cleanup_after_connection( new_vc );#endif MPIU_Free(new_vc); fn_fail: return mpi_errno;}/* ------------------------------------------------------------------------- *//* * *//* FIXME: What is an Accept queue and who uses it? Is this part of the connect/accept support? These routines appear to be called by channel progress routines; perhaps this belongs in util/sock (note the use of a port_name_tag in the dequeue code, though this could be any string). Are the locks required? If this is only called within the progress engine, then the progress engine locks should be sufficient. If a finer grain lock model is used, it needs to be very carefully designed and documented.*/typedef struct MPIDI_CH3I_Acceptq_s{ struct MPIDI_VC *vc; int port_name_tag; struct MPIDI_CH3I_Acceptq_s *next;}MPIDI_CH3I_Acceptq_t;static MPIDI_CH3I_Acceptq_t * acceptq_head=0;static int maxAcceptQueueSize = 0;static int AcceptQueueSize = 0;#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Acceptq_enqueue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Acceptq_enqueue(MPIDI_VC_t * vc, int port_name_tag ){ int mpi_errno=MPI_SUCCESS; MPIDI_CH3I_Acceptq_t *q_item; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE); /* FIXME: Use CHKPMEM */ q_item = (MPIDI_CH3I_Acceptq_t *) MPIU_Malloc(sizeof(MPIDI_CH3I_Acceptq_t)); /* --BEGIN ERROR HANDLING-- */ if (q_item == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ q_item->vc = vc; q_item->port_name_tag = port_name_tag; /* Keep some statistics on the accept queue */ AcceptQueueSize++; if (AcceptQueueSize > maxAcceptQueueSize) maxAcceptQueueSize = AcceptQueueSize; /* FIXME: Stack or queue? */ MPIU_DBG_MSG_P(CH3_CONNECT,TYPICAL,"vc=%p:Enqueuing accept connection",vc); q_item->next = acceptq_head; acceptq_head = q_item; fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE); return mpi_errno;}/* Attempt to dequeue a vc from the accept queue. If the queue is empty or the port_name_tag doesn't match, return a NULL vc. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Acceptq_dequeue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Acceptq_dequeue(MPIDI_VC_t ** vc, int port_name_tag){ int mpi_errno=MPI_SUCCESS; MPIDI_CH3I_Acceptq_t *q_item, *prev; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE); *vc = NULL; q_item = acceptq_head; prev = q_item; while (q_item != NULL) { if (q_item->port_name_tag == port_name_tag) { *vc = q_item->vc; if ( q_item == acceptq_head ) acceptq_head = q_item->next; else prev->next = q_item->next; MPIU_Free(q_item); AcceptQueueSize--; break;; } else { prev = q_item; q_item = q_item->next; } } mpi_errno = MPIDI_CH3_Complete_Acceptq_dequeue(*vc); MPIU_DBG_MSG_FMT(CH3_CONNECT,TYPICAL, (MPIU_DBG_FDEST,"vc=%p:Dequeuing accept connection with tag %d", *vc,port_name_tag)); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE); return mpi_errno;}#else /* MPIDI_CH3_HAS_NO_DYNAMIC_PROCESS is defined */#endif /* MPIDI_CH3_HAS_NO_DYNAMIC_PROCESS */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -