📄 clog_merger.c
字号:
reinitialize the sorted_blk. */ if ( sorted_blk->ptr + CLOG_Merger_reserved_block_size( hdr->rectype ) >= sorted_blk->tail ) { sorted_hdr = (CLOG_Rec_Header_t *) sorted_blk->ptr; sorted_hdr->time = hdr->time; /* use prev record's time */ sorted_hdr->icomm = 0; sorted_hdr->rank = merger->local_world_rank; sorted_hdr->thread = 0; sorted_hdr->rectype = CLOG_REC_ENDBLOCK; if ( merger->parent_world_rank != CLOG_RANK_NULL ) {#if !defined( CLOG_NOMPI ) PMPI_Send( sorted_blk->head, merger->block_size, CLOG_DATAUNIT_MPI_TYPE, merger->parent_world_rank, CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD );#endif } else { /* if parent_world_rank does not exist, must be the root */ CLOG_Merger_flush( merger ); } sorted_blk->ptr = sorted_blk->head; } /* CLOG_Rec_print( hdr, stdout ); */ /* Save the CLOG record into the sorted buffer */ reclen = CLOG_Rec_size( hdr->rectype ); memcpy( sorted_blk->ptr, hdr, reclen ); sorted_blk->ptr += reclen;}/* Used internally by CLOG_Merger_sort() *//* Here blockdata can be either left_blk or right_blk defined in CLOG_Merger_t*/void CLOG_Merger_refill_sideblock( CLOG_BlockData_t *blockdata, int block_world_rank, int block_size ){#if !defined( CLOG_NOMPI ) MPI_Status status; PMPI_Recv( blockdata->head, block_size, CLOG_DATAUNIT_MPI_TYPE, block_world_rank, CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD, &status ); CLOG_BlockData_reset( blockdata );#endif}/* Used internally by CLOG_Merger_sort() */void CLOG_Merger_refill_localblock( CLOG_BlockData_t *blockdata, CLOG_Buffer_t *buffer, CLOG_Time_t *timediff_handle ){ /* refill the whole CLOG_Buffer_t from disk */ if ( buffer->curr_block == NULL || buffer->num_used_blocks == 0 ) CLOG_Buffer_localIO_read( buffer ); if ( buffer->num_used_blocks > 0 ) { blockdata->head = buffer->curr_block->data->head;#if !defined( CLOG_NOMPI ) CLOG_BlockData_patch( blockdata, timediff_handle, buffer->commset->table );#else CLOG_BlockData_patch( blockdata, timediff_handle, NULL );#endif CLOG_BlockData_reset( blockdata ); buffer->curr_block = buffer->curr_block->next; buffer->num_used_blocks--; } else /* if ( buffer->num_used_blocks == 0 ) */ blockdata->ptr += CLOG_Rec_size( CLOG_REC_ENDBLOCK );}/* Used internally by CLOG_Merger_sort() *//* Here blockdata can be either left_blk or right_blk defined in CLOG_Merger_t*/CLOG_Rec_Header_t *CLOG_Merger_next_sideblock_hdr( CLOG_BlockData_t *blockdata, CLOG_Rec_Header_t *hdr, CLOG_Merger_t *merger, int block_world_rank, int block_size ){ if ( hdr->rectype == CLOG_REC_ENDLOG ) { hdr->time = CLOG_MAXTIME; (merger->num_active_blks)--; } else { CLOG_Merger_save_rec( merger, hdr ); blockdata->ptr += CLOG_Rec_size( hdr->rectype ); hdr = ( CLOG_Rec_Header_t *) blockdata->ptr; if ( hdr->rectype == CLOG_REC_ENDBLOCK ) { CLOG_Merger_refill_sideblock( blockdata, block_world_rank, block_size ); hdr = (CLOG_Rec_Header_t *) blockdata->ptr; } } return hdr;}/* Used internally by CLOG_Merger_sort() */CLOG_Rec_Header_t *CLOG_Merger_next_localblock_hdr( CLOG_BlockData_t *blockdata, CLOG_Rec_Header_t *hdr, CLOG_Merger_t *merger, CLOG_Buffer_t *buffer, CLOG_Time_t *timediff_handle ) { if ( hdr->rectype == CLOG_REC_ENDLOG ) { hdr->time = CLOG_MAXTIME; (merger->num_active_blks)--; } else { CLOG_Merger_save_rec( merger, hdr ); blockdata->ptr += CLOG_Rec_size( hdr->rectype ); hdr = ( CLOG_Rec_Header_t *) blockdata->ptr; if ( hdr->rectype == CLOG_REC_ENDBLOCK ) { CLOG_Merger_refill_localblock( blockdata, buffer, timediff_handle ); hdr = (CLOG_Rec_Header_t *) blockdata->ptr; } } return hdr;}/**/void CLOG_Merger_sort( CLOG_Merger_t *merger, CLOG_Buffer_t *buffer ){ CLOG_Rec_Header_t *left_hdr, *right_hdr, *local_hdr; CLOG_BlockData_t *left_blk, *right_blk, *local_blk; CLOG_BlockData_t local_shadow_blockdata ; CLOG_Time_t local_timediff; int left_world_rank, right_world_rank; int block_size; /* May want to move this to CLOG_Merger_init() or CLOG_Converge_init() */ /* Merge/Synchronize all the CLOG_CommSet_t at all the processes */#if !defined( CLOG_NOMPI ) CLOG_CommSet_merge( buffer->commset );#endif /* reinitialization of CLOG_Buffer_t so that buffer->curr_block == buffer->head->block */ CLOG_Buffer_localIO_reinit4read( buffer ); merger->num_active_blks = 0; block_size = merger->block_size; /* local_timediff's init. value can be modified by CLOG_BlockData_patch() */ local_timediff = 0.0; left_world_rank = merger->left_world_rank; right_world_rank = merger->right_world_rank; left_blk = merger->left_blk; right_blk = merger->right_blk; local_blk = &local_shadow_blockdata; /* Initialize the local_blk from CLOG_Buffer_t.curr_block */ if ( buffer->curr_block != NULL && buffer->num_used_blocks > 0 ) { merger->num_active_blks++; CLOG_Merger_refill_localblock( local_blk, buffer, &local_timediff ); } /* Initialize CLOG_Merger_t.left_blk from its left neighbor */ if ( left_world_rank != CLOG_RANK_NULL ) { merger->num_active_blks++; CLOG_Merger_refill_sideblock( left_blk, left_world_rank, block_size ); } else { left_hdr = (CLOG_Rec_Header_t *) left_blk->head; left_hdr->time = CLOG_MAXTIME; } /* Initialize CLOG_Merger_t.right_blk its right neighbor */ if ( right_world_rank != CLOG_RANK_NULL ) { merger->num_active_blks++; CLOG_Merger_refill_sideblock( right_blk, right_world_rank, block_size ); } else { right_hdr = (CLOG_Rec_Header_t *) right_blk->head; right_hdr->time = CLOG_MAXTIME; } left_hdr = (CLOG_Rec_Header_t *) left_blk->ptr; right_hdr = (CLOG_Rec_Header_t *) right_blk->ptr; local_hdr = (CLOG_Rec_Header_t *) local_blk->ptr; while ( merger->num_active_blks > 0 ) { if ( left_hdr->time <= right_hdr->time ) { if ( left_hdr->time <= local_hdr->time ) left_hdr = CLOG_Merger_next_sideblock_hdr( left_blk, left_hdr, merger, left_world_rank, block_size ); else local_hdr = CLOG_Merger_next_localblock_hdr( local_blk, local_hdr, merger, buffer, &local_timediff ); } else { if ( right_hdr->time <= local_hdr->time ) right_hdr = CLOG_Merger_next_sideblock_hdr( right_blk, right_hdr, merger, right_world_rank, block_size ); else local_hdr = CLOG_Merger_next_localblock_hdr( local_blk, local_hdr, merger, buffer, &local_timediff ); } } /* Endof while ( merger->num_active_blks > 0 ) */}void CLOG_Merger_last_flush( CLOG_Merger_t *merger ){ CLOG_BlockData_t *sorted_blk; CLOG_Rec_Header_t *sorted_hdr; sorted_blk = merger->sorted_blk; /* Write the CLOG_REC_ENDLOG as the very last record in the sorted buffer */ sorted_hdr = (CLOG_Rec_Header_t *) sorted_blk->ptr; sorted_hdr->time = CLOG_MAXTIME; /* use prev record's time */ sorted_hdr->icomm = 0; sorted_hdr->rank = merger->local_world_rank; sorted_hdr->thread = 0; sorted_hdr->rectype = CLOG_REC_ENDLOG; if ( merger->parent_world_rank != CLOG_RANK_NULL ) {#if !defined( CLOG_NOMPI ) PMPI_Send( sorted_blk->head, merger->block_size, CLOG_DATAUNIT_MPI_TYPE, merger->parent_world_rank, CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD );#endif } else { /* if parent_world_rank does not exist, must be the root */ CLOG_Merger_flush( merger ); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -