📄 clog_commset.c
字号:
PMPI_Bcast( intercommIDs->global_ID, CLOG_UUID_SIZE, MPI_CHAR, 0, orig_intracommIDs->comm );#if defined( CLOG_COMMSET_PRINT ) char uuid_str[CLOG_UUID_STR_SIZE] = {0}; CLOG_Uuid_sprint( intercommIDs->global_ID, uuid_str ); fprintf( stdout, "comm_rank=%d, comm_global_ID=%s\n", intercommIDs->comm_rank, uuid_str );#endif /* Set the next available table entry with the LOCAL intracomm's info */ local_intracommIDs = intercommIDs + 1; local_intracommIDs->kind = CLOG_COMM_KIND_LOCAL; local_intracommIDs->local_ID = orig_intracommIDs->local_ID; CLOG_Uuid_copy( orig_intracommIDs->global_ID, local_intracommIDs->global_ID ); local_intracommIDs->comm = orig_intracommIDs->comm; local_intracommIDs->comm_rank = orig_intracommIDs->comm_rank; /* NOTE: LOCAL intracommIDs->comm_rank == intercommIDs->comm_rank */ /* Set the next available table entry with the REMOTE intracomm's info */ remote_intracommIDs = intercommIDs + 2; remote_intracommIDs->kind = CLOG_COMM_KIND_REMOTE; /* Broadcast local_intracommIDs's GID to everyone in remote_intracomm, i.e. Send local_intracommIDs' GID from the root of local intracomms to the root of the remote intracomms and save it as remote_intracommIDs' GID. Then broadcast the received GID to the rest of intracomm. */ if ( intercommIDs->comm_rank == 0 ) { PMPI_Irecv( remote_intracommIDs->global_ID, CLOG_UUID_SIZE, MPI_CHAR, 0, 9999, intercomm, &request ); PMPI_Send( local_intracommIDs->global_ID, CLOG_UUID_SIZE, MPI_CHAR, 0, 9999, intercomm ); PMPI_Wait( &request, &status ); } PMPI_Bcast( remote_intracommIDs->global_ID, CLOG_UUID_SIZE, MPI_CHAR, 0, orig_intracommIDs->comm ); /* Since REMOTE intracomm is NOT known or undefinable in LOCAL intracomm, (that is why we have intercomm). Set comm and comm_rank to NULL */ remote_intracommIDs->comm = MPI_COMM_NULL; remote_intracommIDs->comm_rank = -1; /* Set the related CLOG_CommIDs_t* to the local and remote intracommIDs */ intercommIDs->next = local_intracommIDs; local_intracommIDs->next = remote_intracommIDs; return intercommIDs;}/* The input argument MPI_Comm is assumed not to be MPI_COMM_NULL*/CLOG_CommLID_t CLOG_CommSet_get_LID( CLOG_CommSet_t *commset, MPI_Comm comm ){ MPI_Aint ptrlen_value; int istatus; PMPI_Comm_get_attr( comm, commset->LID_key, &ptrlen_value, &istatus ); if ( !istatus ) { fprintf( stderr, __FILE__":CLOG_CommSet_get_LID() - \n" "\t""PMPI_Comm_get_attr() fails!\n" ); fflush( stderr ); CLOG_Util_abort( 1 ); } return (CLOG_CommLID_t) ptrlen_value;}const CLOG_CommIDs_t* CLOG_CommSet_get_IDs( CLOG_CommSet_t *commset, MPI_Comm comm ){ MPI_Aint ptrlen_value; int istatus; PMPI_Comm_get_attr( comm, commset->LID_key, &ptrlen_value, &istatus ); if ( !istatus ) { fprintf( stderr, __FILE__":CLOG_CommSet_get_IDs() - \n" "\t""PMPI_Comm_get_attr() fails!\n" ); fflush( stderr ); CLOG_Util_abort( 1 ); } return &( commset->table[(CLOG_CommLID_t) ptrlen_value] );}void CLOG_CommSet_merge( CLOG_CommSet_t *commset ){ int comm_world_size, comm_world_rank; int rank_sep, rank_quot, rank_rem; int rank_src, rank_dst; int recv_table_count, recv_table_size; CLOG_CommIDs_t *recv_table; MPI_Status status; comm_world_rank = commset->world_rank; comm_world_size = commset->world_size; /* Collect CLOG_CommIDs_t table through Recursive Doubling algorithm */ rank_sep = 1; rank_quot = comm_world_rank >> 1; /* rank_quot = comm_world_rank / 2; */ rank_rem = comm_world_rank & 1; /* rank_rem = comm_world_rank % 2; */ while ( rank_sep < comm_world_size ) { if ( rank_rem == 0 ) { rank_src = comm_world_rank + rank_sep; if ( rank_src < comm_world_size ) { /* Recv from rank_src */ PMPI_Recv( &recv_table_count, 1, MPI_INT, rank_src, CLOG_COMM_TAG_START, MPI_COMM_WORLD, &status ); recv_table_size = recv_table_count * sizeof(CLOG_CommIDs_t); recv_table = (CLOG_CommIDs_t *) MALLOC( recv_table_size ); if ( recv_table == NULL ) { fprintf( stderr, __FILE__":CLOG_CommSet_merge() - \n" "\t""MALLOC(%d) fails!\n", recv_table_size ); fflush( stderr ); CLOG_Util_abort( 1 ); } /* For simplicity, receive commset's whole table and uses only CLOG_CommGID_t column from the table. The other columns are relevant only to the sending process. */ PMPI_Recv( recv_table, recv_table_size, MPI_CHAR, rank_src, CLOG_COMM_TAG_START+1, MPI_COMM_WORLD, &status ); /* Append all global_IDs in the recv_table to commset's */ CLOG_CommSet_append_GIDs( commset, recv_table_count, recv_table ); if ( recv_table != NULL ) { FREE( recv_table ); recv_table = NULL; } } } else /* if ( rank_rem != 0 ) */ { /* After sending CLOG_CommIDs_t table, the process does a barrier */ rank_dst = comm_world_rank - rank_sep; if ( rank_dst >= 0 ) { recv_table_count = commset->count; /* Send from rank_dst */ PMPI_Send( &recv_table_count, 1, MPI_INT, rank_dst, CLOG_COMM_TAG_START, MPI_COMM_WORLD ); /* For simplicity, send commset's whole table including useless things even though only CLOG_CommGID_t column will be used from the table in the receiving process. */ recv_table_size = recv_table_count * sizeof(CLOG_CommIDs_t); PMPI_Send( commset->table, recv_table_size, MPI_CHAR, rank_dst, CLOG_COMM_TAG_START+1, MPI_COMM_WORLD ); break; /* get out of the while loop */ } } rank_rem = rank_quot & 1; /* rank_rem = rank_quot % 2; */ rank_quot >>= 1; /* rank_quot /= 2; */ rank_sep <<= 1; /* rank_sep *= 2; */ } /* endof while ( rank_sep < comm_world_size ) */ /* Synchronize everybody in MPI_COMM_WORLD before broadcasting result back to everybody. */ PMPI_Barrier( MPI_COMM_WORLD ); if ( comm_world_rank == 0 ) recv_table_count = commset->count; else recv_table_count = 0; PMPI_Bcast( &recv_table_count, 1, MPI_INT, 0, MPI_COMM_WORLD ); /* Allocate a buffer for root process's commset->table */ recv_table_size = recv_table_count * sizeof(CLOG_CommIDs_t); recv_table = (CLOG_CommIDs_t *) MALLOC( recv_table_size ); if ( recv_table == NULL ) { fprintf( stderr, __FILE__":CLOG_CommSet_merge() - \n" "\t""MALLOC(%d) fails!\n", recv_table_size ); fflush( stderr ); CLOG_Util_abort( 1 ); } /* memset() to set all alignment space in commset->table. This is done to keep valgrind happy on 64bit machine. memset( recv_table, 0, recv_table_size ); */ if ( comm_world_rank == 0 ) memcpy( recv_table, commset->table, recv_table_size ); PMPI_Bcast( recv_table, recv_table_size, MPI_CHAR, 0, MPI_COMM_WORLD ); /* Make local_IDs in CLOG_CommSet_t's table[] to globally unique integers */ if ( CLOG_CommSet_sync_IDs( commset, recv_table_count, recv_table ) != CLOG_BOOL_TRUE ) { fprintf( stderr, __FILE__":CLOG_CommSet_merge() - \n" "\t""CLOG_CommSet_sync_IDs() fails!\n" ); fflush( stderr ); CLOG_Util_abort( 1 ); } if ( recv_table != NULL ) { FREE( recv_table ); recv_table = NULL; } PMPI_Barrier( MPI_COMM_WORLD );}static void CLOG_CommRec_swap_bytes( char *commrec ){ char *ptr; ptr = commrec; CLOG_Uuid_swap_bytes( ptr ); ptr += CLOG_UUID_SIZE; CLOG_Util_swap_bytes( ptr, sizeof(CLOG_CommLID_t), 1 ); ptr += sizeof(CLOG_CommLID_t); CLOG_Util_swap_bytes( ptr, sizeof(CLOG_int32_t), 1 ); ptr += sizeof(CLOG_int32_t);}static void CLOG_CommRec_print_kind( CLOG_int32_t comm_kind, FILE *stream ){ switch (comm_kind) { case CLOG_COMM_KIND_INTER: fprintf( stream, "InterComm " ); break; case CLOG_COMM_KIND_INTRA: fprintf( stream, "IntraComm " ); break; case CLOG_COMM_KIND_LOCAL: fprintf( stream, "LocalComm " ); break; case CLOG_COMM_KIND_REMOTE: fprintf( stream, "RemoteComm" ); break; default: fprintf( stream, "Unknown("i32fmt")", comm_kind ); }}/* If succeeds, returns the number of CLOG_CommIDs_t in CLOG_CommSet_t, ie >= 0. If fails, returns -1;*/int CLOG_CommSet_write( const CLOG_CommSet_t *commset, int fd, CLOG_BOOL_T do_byte_swap ){ char *local_buffer, *ptr; CLOG_int32_t count_buffer; CLOG_CommIDs_t *commIDs; int commrec_size, buffer_size; int ierr, idx; count_buffer = commset->count; /* Swap bytes of the "count" buffer BEFORE writting to disk */ if ( do_byte_swap == CLOG_BOOL_TRUE ) CLOG_Util_swap_bytes( &count_buffer, sizeof(CLOG_int32_t), 1 ); ierr = write( fd, &count_buffer, sizeof(CLOG_int32_t) ); if ( ierr != sizeof(CLOG_int32_t) ) return -1; commrec_size = CLOG_UUID_SIZE + sizeof(CLOG_CommLID_t) + sizeof(CLOG_int32_t); buffer_size = commrec_size * commset->count; local_buffer = (char *) MALLOC( buffer_size ); ptr = local_buffer; /* Save the CLOG_CommIDs_t[] in the order the entries were created */ for ( idx = 0; idx < commset->count; idx++ ) { commIDs = &( commset->table[idx] ); memcpy( ptr, commIDs->global_ID, CLOG_UUID_SIZE ); ptr += CLOG_UUID_SIZE; memcpy( ptr, &(commIDs->local_ID), sizeof(CLOG_CommLID_t) ); ptr += sizeof(CLOG_CommLID_t); memcpy( ptr, &(commIDs->kind), sizeof(CLOG_int32_t) ); ptr += sizeof(CLOG_int32_t); } /* Swap bytes of the content buffer BEFORE writting to disk */ if ( do_byte_swap == CLOG_BOOL_TRUE ) { ptr = local_buffer; for ( idx = 0; idx < commset->count; idx++ ) { CLOG_CommRec_swap_bytes( ptr ); ptr += commrec_size; } } ierr = write( fd, local_buffer, buffer_size ); if ( ierr != buffer_size ) return -1; if ( local_buffer != NULL ) FREE( local_buffer ); return (int) commset->count;}/* If succeeds, returns the number of CLOG_CommIDs_t in CLOG_CommSet_t, ie >= 0. If fails, returns -1;*/int CLOG_CommSet_read( CLOG_CommSet_t *commset, int fd, CLOG_BOOL_T do_byte_swap ){ char *local_buffer, *ptr; CLOG_int32_t count_buffer; CLOG_CommIDs_t *commIDs; int commrec_size, buffer_size; int ierr, idx; ierr = read( fd, &count_buffer, sizeof(CLOG_int32_t) ); if ( ierr != sizeof(CLOG_int32_t) ) return -1; /* Swap bytes of the "count" buffer AFTER reading from disk */ if ( do_byte_swap == CLOG_BOOL_TRUE ) CLOG_Util_swap_bytes( &count_buffer, sizeof(CLOG_int32_t), 1 ); commrec_size = CLOG_UUID_SIZE + sizeof(CLOG_CommLID_t) + sizeof(CLOG_int32_t); buffer_size = count_buffer * commrec_size; local_buffer = (char *) MALLOC( buffer_size ); ierr = read( fd, local_buffer, buffer_size ); if ( ierr != buffer_size ) return -1; /* Swap bytes before reading into memory */ if ( do_byte_swap == CLOG_BOOL_TRUE ) { ptr = local_buffer; for ( idx = 0; idx < count_buffer; idx++ ) { CLOG_CommRec_swap_bytes( ptr ); ptr += commrec_size; } } ptr = local_buffer; /* Resurrect the CLOG_CommIDs_t[] in the order the entries were created */ for ( idx = 0; idx < count_buffer; idx++ ) { commIDs = CLOG_CommSet_add_new_GID( commset, ptr ); ptr += CLOG_UUID_SIZE; commIDs->local_ID = *( (CLOG_CommLID_t *) ptr ); ptr += sizeof(CLOG_CommLID_t); commIDs->kind = *( (CLOG_int32_t *) ptr ); ptr += sizeof(CLOG_int32_t); } if ( local_buffer != NULL ) FREE( local_buffer ); return count_buffer;}void CLOG_CommSet_print( CLOG_CommSet_t *commset, FILE *stream ){ CLOG_CommIDs_t *commIDs; char uuid_str[CLOG_UUID_STR_SIZE] = {0}; int idx; for ( idx = 0; idx < commset->count; idx++ ) { commIDs = &( commset->table[idx] ); CLOG_Uuid_sprint( commIDs->global_ID, uuid_str ); fprintf( stream, "GID=%s ", uuid_str ); fprintf( stream, "LID="i32fmt" ", commIDs->local_ID ); fprintf( stream, "kind=" ); CLOG_CommRec_print_kind( commIDs->kind, stream ); fprintf( stream, "\n" ); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -