📄 ch3u_port.c
字号:
mpi_errno = MPIC_Sendrecv(local_translation, local_comm_size * 2, MPI_INT, 0, sendtag++, remote_translation, remote_comm_size * 2, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE);#ifdef MPICH_DBG_OUTPUT MPIU_DBG_PRINTF(("[%d]accept:Received remote_translation:\n", rank)); for (i=0; i<remote_comm_size; i++) { MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n", i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank)); }#endif } else { mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag, n_remote_pgs, remote_pg ); } /* Broadcast out the remote rank translation array */ MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Broadcast remote_translation"); mpi_errno = MPIR_Bcast(remote_translation, remote_comm_size * 2, MPI_INT, root, comm_ptr);#ifdef MPICH_DBG_OUTPUT MPIU_DBG_PRINTF(("[%d]accept:Received remote_translation after broadcast:\n", rank)); for (i=0; i<remote_comm_size; i++) { MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n", i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank)); }#endif /* Now fill in newcomm */ intercomm = *newcomm; intercomm->is_low_group = 0; mpi_errno = SetupNewIntercomm( comm_ptr, remote_comm_size, remote_translation, remote_pg, intercomm ); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* synchronize with remote root */ if (rank == root) { MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"sync with peer"); mpi_errno = MPIC_Sendrecv(&i, 0, MPI_INT, 0, sendtag++, &j, 0, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* All communication with remote root done. Release the communicator. */ MPIR_Comm_release(tmp_comm); } MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Barrier"); mpi_errno = MPIR_Barrier(comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* Free new_vc once the connection is completed. It was explicitly allocated in ch3_progress.c and returned by MPIDI_CH3I_Acceptq_dequeue. */ if (rank == root) { FreeNewVC( new_vc ); }fn_exit: MPIU_CHKLMEM_FREEALL(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_COMM_ACCEPT); return mpi_errno;fn_fail: goto fn_exit;}/* This is a utility routine used to initialize temporary communicators used in connect/accept operations */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Initialize_tmp_comm#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Initialize_tmp_comm(MPID_Comm **comm_pptr, MPIDI_VC_t *vc_ptr, int is_low_group){ int mpi_errno = MPI_SUCCESS; MPID_Comm *tmp_comm, *commself_ptr; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_INITIALIZE_TMP_COMM); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_INITIALIZE_TMP_COMM); MPID_Comm_get_ptr( MPI_COMM_SELF, commself_ptr ); mpi_errno = MPIR_Comm_create(commself_ptr, &tmp_comm); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* fill in all the fields of tmp_comm. */ tmp_comm->context_id = 4095; /* FIXME - we probably need a unique context_id. */ tmp_comm->remote_size = 1; /* Fill in new intercomm */ /* FIXME: This should share a routine with the communicator code to ensure that the initialization is consistent */ tmp_comm->attributes = NULL; tmp_comm->local_size = 1; tmp_comm->rank = 0; tmp_comm->local_group = NULL; tmp_comm->remote_group = NULL; tmp_comm->comm_kind = MPID_INTERCOMM; tmp_comm->local_comm = NULL; tmp_comm->is_low_group = is_low_group; tmp_comm->coll_fns = NULL; /* No pg structure needed since vc has already been set up (connection has been established). */ /* Point local vcr, vcrt at those of commself_ptr */ tmp_comm->local_vcrt = commself_ptr->vcrt; MPID_VCRT_Add_ref(commself_ptr->vcrt); tmp_comm->local_vcr = commself_ptr->vcr; /* No pg needed since connection has already been formed. FIXME - ensure that the comm_release code does not try to free an unallocated pg */ /* Set up VC reference table */ mpi_errno = MPID_VCRT_Create(tmp_comm->remote_size, &tmp_comm->vcrt); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_vcrt"); } mpi_errno = MPID_VCRT_Get_ptr(tmp_comm->vcrt, &tmp_comm->vcr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_getptr"); } MPID_VCR_Dup(vc_ptr, tmp_comm->vcr); *comm_pptr = tmp_comm;fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_INITIALIZE_TMP_COMM); return mpi_errno;fn_fail: goto fn_exit;}/* This routine initializes the new intercomm, setting up the VCRT and other common structures. The is_low_group and context_id fields are NOT set because they differ in the use of this routine in Comm_accept and Comm_connect */#undef FUNCNAME#define FUNCNAME SetupNewIntercomm#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int SetupNewIntercomm( MPID_Comm *comm_ptr, int remote_comm_size, pg_translation remote_translation[], MPIDI_PG_t **remote_pg, MPID_Comm *intercomm ){ int mpi_errno = MPI_SUCCESS, i; intercomm->attributes = NULL; intercomm->remote_size = remote_comm_size; intercomm->local_size = comm_ptr->local_size; intercomm->rank = comm_ptr->rank; intercomm->local_group = NULL; intercomm->remote_group = NULL; intercomm->comm_kind = MPID_INTERCOMM; intercomm->local_comm = NULL; intercomm->coll_fns = NULL; /* Point local vcr, vcrt at those of incoming intracommunicator */ intercomm->local_vcrt = comm_ptr->vcrt; MPID_VCRT_Add_ref(comm_ptr->vcrt); intercomm->local_vcr = comm_ptr->vcr; /* Set up VC reference table */ mpi_errno = MPID_VCRT_Create(intercomm->remote_size, &intercomm->vcrt); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_vcrt"); } mpi_errno = MPID_VCRT_Get_ptr(intercomm->vcrt, &intercomm->vcr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_getptr"); } for (i=0; i < intercomm->remote_size; i++) { MPIDI_VC_t *vc; MPIDI_PG_Get_vcr(remote_pg[remote_translation[i].pg_index], remote_translation[i].pg_rank, &vc); MPID_VCR_Dup(vc, &intercomm->vcr[i]); } MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Barrier"); mpi_errno = MPIR_Barrier(comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } fn_exit: return mpi_errno; fn_fail: goto fn_exit;}#if 0 /* synchronize with remote root */ if (rank == root) { MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"sync with peer"); mpi_errno = MPIC_Sendrecv(&i, 0, MPI_INT, 0, sendtag++, &j, 0, MPI_INT, 0, recvtag++, tmp_comm->handle, MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* All communication with remote root done. Release the communicator. */ MPIR_Comm_release(tmp_comm); } /*printf("connect:barrier\n");fflush(stdout);*/ mpi_errno = MPIR_Barrier(comm_ptr); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }}#endif/* Free new_vc. It was explicitly allocated in MPIDI_CH3_Connect_to_root. */#undef FUNCNAME#define FUNCNAME FreeNewVC#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int FreeNewVC( MPIDI_VC_t *new_vc ){ MPID_Progress_state progress_state; int mpi_errno = MPI_SUCCESS; if (new_vc->state != MPIDI_VC_STATE_INACTIVE) { MPID_Progress_start(&progress_state); while (new_vc->state != MPIDI_VC_STATE_INACTIVE) { mpi_errno = MPID_Progress_wait(&progress_state); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); MPIU_ERR_POP(mpi_errno); } /* --END ERROR HANDLING-- */ } MPID_Progress_end(&progress_state); }#ifdef MPIDI_CH3_HAS_CONN_ACCEPT_HOOK mpi_errno = MPIDI_CH3_Cleanup_after_connection( new_vc );#endif MPIU_Free(new_vc); fn_fail: return mpi_errno;}/* FIXME: What is an Accept queue and who uses it? Is this part of the connect/accept support? These routines appear to be called by channel progress routines; perhaps this belongs in util/sock (note the use of a port_name_tag in the dequeue code, though this could be any string). Are the locks required? If this is only called within the progress engine, then the progress engine locks should be sufficient. If a finer grain lock model is used, it needs to be very carefully designed and documented.*/typedef struct MPIDI_CH3I_Acceptq_s{ struct MPIDI_VC *vc; struct MPIDI_CH3I_Acceptq_s *next;}MPIDI_CH3I_Acceptq_t;static MPIDI_CH3I_Acceptq_t * acceptq_head=0;#if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED)static MPID_Thread_lock_t acceptq_mutex;#define MPIDI_Acceptq_lock() MPID_Thread_lock(&acceptq_mutex)#define MPIDI_Acceptq_unlock() MPID_Thread_unlock(&acceptq_mutex)#else#define MPIDI_Acceptq_lock()#define MPIDI_Acceptq_unlock()#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Acceptq_enqueue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Acceptq_enqueue(MPIDI_VC_t * vc){ int mpi_errno=MPI_SUCCESS; MPIDI_CH3I_Acceptq_t *q_item; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE); q_item = (MPIDI_CH3I_Acceptq_t *) MPIU_Malloc(sizeof(MPIDI_CH3I_Acceptq_t)); /* --BEGIN ERROR HANDLING-- */ if (q_item == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ q_item->vc = vc; MPIDI_Acceptq_lock(); q_item->next = acceptq_head; acceptq_head = q_item; MPIDI_Acceptq_unlock(); fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE); return mpi_errno;}/* Attempt to dequeue a vc from the accept queue. If the queue is empty or the port_name_tag doesn't match, return a NULL vc. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Acceptq_dequeue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Acceptq_dequeue(MPIDI_VC_t ** vc, int port_name_tag){ int mpi_errno=MPI_SUCCESS; MPIDI_CH3I_Acceptq_t *q_item, *prev; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE); MPIDI_Acceptq_lock(); *vc = NULL; q_item = acceptq_head; prev = q_item; while (q_item != NULL) { if (q_item->vc->ch.port_name_tag == port_name_tag) { *vc = q_item->vc; if ( q_item == acceptq_head ) acceptq_head = q_item->next; else prev->next = q_item->next; MPIU_Free(q_item); break;; } else { prev = q_item; q_item = q_item->next; } } MPIDI_Acceptq_unlock(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Acceptq_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Acceptq_init(void){ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_ACCEPTQ_INIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_ACCEPTQ_INIT);# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED) { MPID_Thread_lock_init(&acceptq_mutex); }# endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_ACCEPTQ_INIT); return MPI_SUCCESS;}#else /* MPIDI_CH3_HAS_NO_DYNAMIC_PROCESS is defined */#endif /* MPIDI_CH3_HAS_NO_DYNAMIC_PROCESS */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -