⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3u_port.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 3 页
字号:
    char *pg_str = 0;    int  i, j, flag, p;    int  rank = comm_ptr->rank;    int  mpi_errno = 0;    int  recvtag = *recvtag_p;    MPIDI_STATE_DECL(MPID_STATE_RECEIVEPGANDDISTRIBUTE);    MPIDI_FUNC_ENTER(MPID_STATE_RECEIVEPGANDDISTRIBUTE);    for (i=0; i<n_remote_pgs; i++) {	if (rank == root) {	    /* First, receive the pg description from the partner */	    mpi_errno = MPIC_Recv(&j, 1, MPI_INT, 0, recvtag++, 				  tmp_comm->handle, MPI_STATUS_IGNORE);	    *recvtag_p = recvtag;	    if (mpi_errno != MPI_SUCCESS) {		MPIU_ERR_POP(mpi_errno);	    }	    pg_str = (char*)MPIU_Malloc(j);	    if (pg_str == NULL) {		MPIU_ERR_POP(mpi_errno);	    }	    mpi_errno = MPIC_Recv(pg_str, j, MPI_CHAR, 0, recvtag++, 				  tmp_comm->handle, MPI_STATUS_IGNORE);	    *recvtag_p = recvtag;	    if (mpi_errno != MPI_SUCCESS) {		MPIU_ERR_POP(mpi_errno);	    }	}	/* Broadcast the size and data to the local communicator */	/*printf("accept:broadcasting 1 int\n");fflush(stdout);*/	mpi_errno = MPIR_Bcast(&j, 1, MPI_INT, root, comm_ptr);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_POP(mpi_errno);	}	if (rank != root) {	    /* The root has already allocated this string */	    pg_str = (char*)MPIU_Malloc(j);	    if (pg_str == NULL) {		MPIU_ERR_POP(mpi_errno);	    }	}	/*printf("accept:broadcasting string of length %d\n", j);fflush(stdout);*/	mpi_errno = MPIR_Bcast(pg_str, j, MPI_CHAR, root, comm_ptr);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_POP(mpi_errno);	}	/* Then reconstruct the received process group */	mpi_errno = MPIDI_PG_Create_from_string(pg_str, &remote_pg[i], &flag);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_POP(mpi_errno);	}		MPIU_Free(pg_str);	if (flag) {#ifdef MPIDI_CH3_USES_SSHM	    /* extra pg ref needed for shared memory modules because the 	     * shm_XXXXing_list's	     * need to be walked though in the later stages of finalize to	     * free queue_info's.	     */	    MPIDI_PG_Add_ref(remote_pg[i]);#endif	    for (p=0; p<remote_pg[i]->size; p++) {		MPIDI_VC_t *vc;		MPIDI_PG_Get_vcr(remote_pg[i], p, &vc);		MPIDI_CH3_VC_Init( vc );	    }	}    } fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_RECEIVEPGANDDISTRIBUTE);    return mpi_errno; fn_fail:    goto fn_exit;}/* Used internally to broadcast process groups belonging to peercomm to all processes in comm.  The process with rank root in comm is the  process in peercomm from which the process groups are taken. */#undef FUNCNAME#define FUNCNAME MPID_PG_BCast#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_PG_BCast( MPID_Comm *peercomm_p, MPID_Comm *comm_p, int root ){    int n_local_pgs=0, mpi_errno = 0;    pg_translation *local_translation = 0;    pg_node *pg_list, *pg_next, *pg_head = 0;    int rank, i, peer_comm_size;    MPIU_CHKLMEM_DECL(1);    peer_comm_size = comm_p->local_size;    rank            = comm_p->rank;    MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*,			peer_comm_size*sizeof(pg_translation),			mpi_errno,"local_translation");        if (rank == root) {	/* Get the process groups known to the *peercomm* */	ExtractLocalPGInfo( peercomm_p, local_translation, &pg_head, 			    &n_local_pgs );    }    /* Now, broadcast the number of local pgs */    NMPI_Bcast( &n_local_pgs, 1, MPI_INT, root, comm_p->handle );    /* printf( "Number of pgs = %d\n", n_local_pgs ); fflush(stdout);  */    pg_list = pg_head;    for (i=0; i<n_local_pgs; i++) {	int len, flag;	char *pg_str=0;	MPIDI_PG_t *pgptr;	if (rank == root) {	    if (!pg_list) {		/* FIXME: Error, the pg_list is broken */		printf( "Unexpected end of pg_list\n" ); fflush(stdout);		break;	    }	    pg_str  = pg_list->str;	    pg_list = pg_list->next;	    len     = (int)strlen(pg_str) + 1;	}	NMPI_Bcast( &len, 1, MPI_INT, root, comm_p->handle );	if (rank != root) {	    pg_str = (char *)MPIU_Malloc(len);	}	NMPI_Bcast( pg_str, len, MPI_CHAR, root, comm_p->handle );	if (rank != root) {	    /* flag is true if the pg was created, false if it	       already existed */	    MPIDI_PG_Create_from_string( pg_str, &pgptr, &flag );	    if (flag) {		int p;		/*printf( "[%d]Added pg named %s to list\n", rank, 			(char *)pgptr->id );			fflush(stdout); */		/* FIXME: This initalization should be done		   when the pg is created ? */		for (p=0; p<pgptr->size; p++) {		    MPIDI_VC_t *vc;		    MPIDI_PG_Get_vcr(pgptr, p, &vc);		    MPIDI_CH3_VC_Init( vc );		}	    }	    MPIU_Free( pg_str );	}    }    /* Free pg_list */    pg_list = pg_head;    /* FIXME: We should use the PG destroy function for this, and ensure that       the PG fields are valid for that function */    while (pg_list) {	pg_next = pg_list->next;	MPIU_Free( pg_list->str );	if (pg_list->pg_id ) {	    MPIU_Free( pg_list->pg_id );	}	MPIU_Free( pg_list );	pg_list = pg_next;    } fn_exit:    MPIU_CHKLMEM_FREEALL();    return mpi_errno; fn_fail:    goto fn_exit;}/* Sends the process group information to the peer and frees the    pg_list */#undef FUNCNAME#define FUNCNAME SendPGtoPeerAndFree#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int SendPGtoPeerAndFree( MPID_Comm *tmp_comm, int *sendtag_p, 				pg_node *pg_list ){    int mpi_errno = 0;    int sendtag = *sendtag_p, i;    pg_node *pg_iter;    MPIDI_STATE_DECL(MPID_STATE_SENDPGTOPEERANDFREE);    MPIDI_FUNC_ENTER(MPID_STATE_SENDPGTOPEERANDFREE);    while (pg_list != NULL) {	pg_iter = pg_list;	i = (int)(strlen(pg_iter->str) + 1);	/*printf("connect:sending 1 int: %d\n", i);fflush(stdout);*/	mpi_errno = MPIC_Send(&i, 1, MPI_INT, 0, sendtag++, tmp_comm->handle);	*sendtag_p = sendtag;	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_POP(mpi_errno);	}		/*printf("connect:sending string length %d\n", i);fflush(stdout);*/	mpi_errno = MPIC_Send(pg_iter->str, i, MPI_CHAR, 0, sendtag++, 			      tmp_comm->handle);	*sendtag_p = sendtag;	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_POP(mpi_errno);	}		pg_list = pg_list->next;	MPIU_Free(pg_iter->str);	MPIU_Free(pg_iter->pg_id);	MPIU_Free(pg_iter);    } fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_SENDPGTOPEERANDFREE);    return mpi_errno; fn_fail:    goto fn_exit;}/* ---------------------------------------------------------------------- */#undef FUNCNAME#define FUNCNAME MPIDI_Create_inter_root_communicator_accept#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_Create_inter_root_communicator_accept(const char *port_name, 						MPID_Comm **comm_pptr, 						MPIDI_VC_t **vc_pptr){    int mpi_errno = MPI_SUCCESS;    MPID_Comm *tmp_comm;    MPIDI_VC_t *new_vc = NULL;    MPID_Progress_state progress_state;    int port_name_tag;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT);    /* FIXME: This code should parallel the MPIDI_CH3_Connect_to_root       code used in the MPIDI_Create_inter_root_communicator_connect */    /* extract the tag from the port_name */    mpi_errno = MPIDI_GetTagFromPort( port_name, &port_name_tag);    if (mpi_errno != MPIU_STR_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }    /* dequeue the accept queue to see if a connection with the       root on the connect side has been formed in the progress       engine (the connection is returned in the form of a vc). If       not, poke the progress engine. */    MPID_Progress_start(&progress_state);    for(;;)    {	MPIDI_CH3I_Acceptq_dequeue(&new_vc, port_name_tag);	if (new_vc != NULL)	{	    break;	}	mpi_errno = MPID_Progress_wait(&progress_state);	/* --BEGIN ERROR HANDLING-- */	if (mpi_errno)	{	    MPID_Progress_end(&progress_state);	    MPIU_ERR_POP(mpi_errno);	}	/* --END ERROR HANDLING-- */    }    MPID_Progress_end(&progress_state);    mpi_errno = MPIDI_CH3I_Initialize_tmp_comm(&tmp_comm, new_vc, 0);    /* If the VC creates non-duplex connections then the acceptor will     * need to connect back to form the other half of the connection. */#ifdef MPIDI_CH3_HAS_CONN_ACCEPT_HOOK    mpi_errno = MPIDI_CH3_Complete_unidirectional_connection2( new_vc );#endif    *comm_pptr = tmp_comm;    *vc_pptr = new_vc;fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT);    return mpi_errno;fn_fail:    goto fn_exit;}/* * MPIDI_Comm_accept()   Algorithm: First dequeue the vc from the accept queue (it was   enqueued by the progress engine in response to a connect request   from the root on the connect side). Use this vc to create an   intercommunicator between this root and the root on the connect   side. Use this intercomm. to communicate the other information   needed to create the real intercommunicator between the processes   on the two sides. Then free the intercommunicator between the   roots. Most of the complexity is because there can be multiple   process groups on each side. */#undef FUNCNAME#define FUNCNAME MPIDI_Comm_accept#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_Comm_accept(const char *port_name, MPID_Info *info, int root, 		      MPID_Comm *comm_ptr, MPID_Comm **newcomm){    int mpi_errno=MPI_SUCCESS;    int i, j, rank, recv_ints[2], send_ints[3];    int remote_comm_size=0;    MPID_Comm *tmp_comm = NULL, *intercomm;    MPIDI_VC_t *new_vc = NULL;    int n_local_pgs=1, n_remote_pgs;    int sendtag=100, recvtag=100, local_comm_size;    pg_translation *local_translation = NULL, *remote_translation = NULL;    pg_node *pg_list = NULL;    MPIDI_PG_t **remote_pg = NULL;    MPIU_CHKLMEM_DECL(3);    MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMM_ACCEPT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_COMM_ACCEPT);    /* Create the new intercommunicator here. We need to send the       context id to the other side. */    /* FIXME: There is a danger that the context id won't be unique       on the other side of this connection */    mpi_errno = MPIR_Comm_create(comm_ptr, newcomm);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }    rank = comm_ptr->rank;    local_comm_size = comm_ptr->local_size;        if (rank == root)    {	/* Establish a communicator to communicate with the root on the other side. */	mpi_errno = MPIDI_Create_inter_root_communicator_accept(port_name, 						&tmp_comm, &new_vc);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_POP(mpi_errno);	}	/* Make an array to translate local ranks to process group index and 	   rank */	MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*,			    local_comm_size*sizeof(pg_translation),			    mpi_errno,"local_translation");	/* Make a list of the local communicator's process groups and encode 	   them in strings to be sent to the other side.	   The encoded string for each process group contains the process 	   group id, size and all its KVS values */	mpi_errno = ExtractLocalPGInfo( comm_ptr, local_translation, 					&pg_list, &n_local_pgs );        /* Send the remote root: n_local_pgs, local_comm_size, context_id for 	   newcomm.           Recv from the remote root: n_remote_pgs, remote_comm_size */        send_ints[0] = n_local_pgs;        send_ints[1] = local_comm_size;        send_ints[2] = (*newcomm)->context_id;	/*printf("accept:sending 3 ints, %d, %d, %d, and receiving 2 ints\n", send_ints[0], send_ints[1], send_ints[2]);fflush(stdout);*/        mpi_errno = MPIC_Sendrecv(send_ints, 3, MPI_INT, 0,                                  sendtag++, recv_ints, 2, MPI_INT,                                  0, recvtag++, tmp_comm->handle,                                  MPI_STATUS_IGNORE);        if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_POP(mpi_errno);	}    }    /* broadcast the received info to local processes */    /*printf("accept:broadcasting 2 ints - %d and %d\n", recv_ints[0], recv_ints[1]);fflush(stdout);*/    mpi_errno = MPIR_Bcast(recv_ints, 2, MPI_INT, root, comm_ptr);    if (mpi_errno) {	MPIU_ERR_POP(mpi_errno);    }    n_remote_pgs = recv_ints[0];    remote_comm_size = recv_ints[1];    MPIU_CHKLMEM_MALLOC(remote_pg,MPIDI_PG_t**,			n_remote_pgs * sizeof(MPIDI_PG_t*),			mpi_errno,"remote_pg");    MPIU_CHKLMEM_MALLOC(remote_translation,pg_translation*,			remote_comm_size * sizeof(pg_translation),			mpi_errno, "remote_translation");    MPIU_DBG_PRINTF(("[%d]accept:remote process groups: %d\nremote comm size: %d\n", rank, n_remote_pgs, remote_comm_size));    /* Exchange the process groups and their corresponding KVSes */    if (rank == root)    {	/* The root receives the PG from the peer (in tmp_comm) and	   distributes them to the processes in comm_ptr */	mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag,					n_remote_pgs, remote_pg );		mpi_errno = SendPGtoPeerAndFree( tmp_comm, &sendtag, pg_list );	/* Receive the translations from remote process rank to process group index */	/*printf("accept:sending %d ints and receiving %d ints\n", local_comm_size * 2, remote_comm_size * 2);fflush(stdout);*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -