📄 clog_joiner.c
字号:
/* (C) 2001 by Argonne National Laboratory. See COPYRIGHT in top-level directory.*/#include "mpe_logging_conf.h"#if defined( STDC_HEADERS ) || defined( HAVE_STDIO_H )#include <stdio.h>#endif#if defined( STDC_HEADERS ) || defined( HAVE_STDLIB_H )#include <stdlib.h>#endif#if defined( STDC_HEADERS ) || defined( HAVE_STRING_H )#include <string.h>#endif#if defined( HAVE_FCNTL_H )#include <fcntl.h>#endif#if defined( HAVE_UNISTD_H )#include <unistd.h>#endif#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#ifdef HAVE_IO_H#include <io.h>#endif#include "clog_const.h"#include "clog_mem.h"#include "clog_cache.h"#include "clog_joiner.h"CLOG_Joiner_t* CLOG_Joiner_create( int num_files, char *filenames[] ){ CLOG_Joiner_t *joiner; CLOG_Cache_t **in_caches, *out_cache; int idx, ii; in_caches = (CLOG_Cache_t **) MALLOC( num_files * sizeof(CLOG_Cache_t *) ); if ( in_caches == NULL ) { fprintf( stderr, __FILE__":CLOG_Joiner_create() - \n" "\t""MALLOC(CLOG_Joiner_t's in_caches[]) fails!\n" ); fflush( stderr ); return NULL; } for ( idx = 0; idx < num_files; idx++ ) { in_caches[ idx ] = CLOG_Cache_create(); if ( in_caches[ idx ] == NULL ) { fprintf( stderr, __FILE__":CLOG_Joiner_create() - \n" "\t""CLOG_Cache_create(in_caches[%d]) fails!\n", idx ); fflush( stderr ); for ( ii = idx-1; idx >= 0; ii-- ) { CLOG_Cache_free( &(in_caches[ idx ]) ); } if ( in_caches != NULL ) FREE( in_caches ); return NULL; } CLOG_Cache_open4read( in_caches[ idx ], filenames[ idx ] ); } out_cache = CLOG_Cache_create(); if ( out_cache == NULL ) { fprintf( stderr, __FILE__":CLOG_Joiner_create() - \n" "\t""CLOG_Cache_create(out_cache) fails!\n" ); fflush( stderr ); for ( ii = 0; idx < num_files; ii++ ) { CLOG_Cache_free( &(in_caches[ idx ]) ); } if ( in_caches != NULL ) FREE( in_caches ); return NULL; } joiner = (CLOG_Joiner_t *) MALLOC( sizeof(CLOG_Joiner_t) ); if ( joiner == NULL ) { fprintf( stderr, __FILE__":CLOG_Joiner_create() - \n" "\t""MALLOC() of CLOG_Joiner_t fails!\n" ); fflush( stderr ); return NULL; } joiner->num_caches = num_files; joiner->in_caches = in_caches; joiner->out_cache = out_cache; joiner->sorted_caches_head = NULL; joiner->sorted_caches_tail = NULL; return joiner;}void CLOG_Joiner_free( CLOG_Joiner_t **joiner_handle ){ CLOG_Joiner_t *joiner; int idx; joiner = *joiner_handle; if ( joiner != NULL ) { for ( idx = 0; idx < joiner->num_caches; idx++ ) { CLOG_Cache_free( &(joiner->in_caches[idx]) ); } joiner->num_caches = 0; CLOG_Cache_free( &(joiner->out_cache) ); FREE( joiner ); } *joiner_handle = NULL;}/* Synchronizes all communicator local_IDs of the input/output caches[]. The function does not require the output cache be initalized except memory allocation for the commset of the output cache.*/static void CLOG_Joiner_sync_commIDs( CLOG_Joiner_t *joiner ){ int idx; /* Update the output cache's commset with that of input caches[]. So that joiner->out_cache->commset contains all the CLOG_Uuid_t(GID) in all the joiner->in_caches[]->commset. */ for ( idx = 0; idx < joiner->num_caches; idx++ ) { CLOG_CommSet_append_GIDs( joiner->out_cache->commset, joiner->in_caches[idx]->commset->count, joiner->in_caches[idx]->commset->table ); } /* Synchronize the local_ID in each of in_caches[]->commset->table[] with the local_ID in the out_cache->commset->table[] for the same GID. Essentially, the local_IDs in all the in_caches[]'s are made to be globally unique integers (GIDs are also globally unique but they are much bigger than integers) */ for ( idx = 0; idx < joiner->num_caches; idx++ ) {#if defined( CLOG_JOINER_PRINT ) fprintf( stdout, "BEFORE SYNC: in_caches[%d]:", idx ); CLOG_CommSet_print( joiner->in_caches[idx]->commset, stdout );#endif if ( CLOG_CommSet_sync_IDs( joiner->in_caches[idx]->commset, joiner->out_cache->commset->count, joiner->out_cache->commset->table ) != CLOG_BOOL_TRUE ) { fprintf( stderr, __FILE__":CLOG_Joiner_init() - \n" "\t""CLOG_CommSet_sync_IDs(%s,out) fails!\n", joiner->in_caches[idx]->local_filename ); fflush( stderr ); exit( 1 ); }#if defined( CLOG_JOINER_PRINT ) fprintf( stdout, "AFTER SYNC: in_caches[%d]:", idx ); CLOG_CommSet_print( joiner->in_caches[idx]->commset, stdout ); fprintf( stdout, "\n" );#endif }}static void CLOG_Joiner_sync_preambles( CLOG_Joiner_t *joiner ){ CLOG_Preamble_t *in_preamble, *out_preamble; int num_links; int idx; out_preamble = joiner->out_cache->preamble; num_links = 0; for ( idx = 0; idx < joiner->num_caches; idx++ ) { if ( CLOG_Cache_has_rec( joiner->in_caches[idx] ) == CLOG_BOOL_TRUE ) { in_preamble = joiner->in_caches[idx]->preamble; if ( num_links == 0 ) CLOG_Preamble_copy( in_preamble, out_preamble ); else CLOG_Preamble_sync( out_preamble, in_preamble ); num_links++; } }}void CLOG_Joiner_init( CLOG_Joiner_t *joiner, const char *out_filename ){ CLOG_CacheLink_t *head_link, *tail_link; CLOG_CacheLink_t *curr_link, *earliest_link; CLOG_Time_t curr_time, earliest_time; int num_links; int idx; /* Sync all local commIDs to be globally unique integers in both input/output caches. */ CLOG_Joiner_sync_commIDs( joiner ); for ( idx = 0; idx < joiner->num_caches; idx++ ) { CLOG_Cache_init4read( joiner->in_caches[idx] ); } /* Setup a list of CLOG_Joiner_t->in_caches[] pointed by head_link. */ head_link = NULL; tail_link = NULL; num_links = 0; for ( idx = 0; idx < joiner->num_caches; idx++ ) { if ( CLOG_Cache_has_rec( joiner->in_caches[idx] ) == CLOG_BOOL_TRUE ) { /* Setup initialize binary linked list of CLOG_CacheLink_t */ curr_link = CLOG_CacheLink_create( joiner->in_caches[idx] ); CLOG_CacheLink_insert( &head_link, &tail_link, NULL, curr_link ); num_links++; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -