📄 mpe_log_merge.c
字号:
MPE_Log_HEADER *h; MPI_Status mesgStatus;#if DEBUG fprintf( debug_file, "[%d] reloading from %s child\n", MPE_Log_procid, msgtype==100?"left":"right"); fflush( debug_file );#endif MPI_Recv( destBuffer->buf, MPE_Log_MBUF_SIZE * sizeof( int ), MPI_BYTE, MPI_ANY_SOURCE, msgtype, MPI_COMM_WORLD, &mesgStatus); MPI_Get_count( &mesgStatus, MPI_BYTE, &ln );#if DEBUG fprintf( debug_file, "[%d] Received %d bytes from %s child\n", MPE_Log_procid, ln, msgtype==100?"left":"right" ); fflush( debug_file );#endif if (ln == 0) {#if DEBUG fprintf( debug_file, "[%d] End of data from %s child\n", MPE_Log_procid, msgtype==100?"left":"right" ); fflush( debug_file );#endif destBuffer->t = TIME_INF; *srcs = *srcs - 1; return 0; } else { destBuffer->p = destBuffer->buf; destBuffer->plast = destBuffer->p + (ln / sizeof(int));#if DEBUG PrintMbuf( debug_file, destBuffer ); fprintf( debug_file, "[%d] %d ints should be available\n", MPE_Log_procid, destBuffer->plast-destBuffer->p); fflush( debug_file );#endif h = (MPE_Log_HEADER *)(destBuffer->p); MOVEDBL( &destBuffer->t, &h->time ); } return 1;}static int MPE_Log_ReloadFromChildL( b, srcs )MPE_Log_MBuf *b;int *srcs;{ return MPE_Log_ReloadFromChild( b, 100, srcs );}static int MPE_Log_ReloadFromChildR( b, srcs )MPE_Log_MBuf *b;int *srcs;{ return MPE_Log_ReloadFromChild( b, 101, srcs );}/* move all the negative event records above the positive ones */static MPE_Log_BLOCK *MPE_Log_Sort( readBlock )MPE_Log_BLOCK *readBlock;{ MPE_Log_BLOCK *newLogHeadBlk, *newLogBlk, *readBlk; MPE_Log_HEADER *readRecHdr, *newRecHdr; int i; /* # of ints read from the input block */ int n; /* # of ints in this block */ newLogHeadBlk=newLogBlk=0; readBlk = readBlock; /* start from first block */ MPE_Log_TRAVERSE_LOG((readRecHdr->event <= MAX_HEADER_EVT) && (readRecHdr->event >= MIN_HEADER_EVT)); readBlk = readBlock; /* start from first block */ MPE_Log_TRAVERSE_LOG(readRecHdr->event > MAX_HEADER_EVT || readRecHdr->event < MIN_HEADER_EVT); MPE_Log_FreeLogMem (readBlock); /* free memory */ return newLogHeadBlk;}static void MPE_Log_SetTreeNodes( procid, np, lchild, rchild, parent, am_left )int procid, np, *lchild, *rchild, *parent, *am_left;{ *parent = (procid) ? ((procid - 1) >> 1) : -1; *lchild = (procid << 1) + 1; if (*lchild >= np) { *lchild = -1; *rchild = -1; } else { *rchild = (procid << 1) + 2; if (*rchild >= np) *rchild = -1; } *am_left = procid % 2;}static int MPE_Log_ParallelMerge( filename ) char *filename;{ int srcs, lchild, rchild, parent, np, mtype, am_left; MPE_Log_MBuf *ba, *bb, *bc, *bout; FILE *fp;/* void MPE_Log_FlushOutput(); */ MPI_Comm_size(MPI_COMM_WORLD,&np); MPE_Log_SetTreeNodes( MPE_Log_procid, np, &lchild, &rchild, &parent, &am_left ); if (MPE_Log_procid==0) { fp = fopen (filename,"w"); if (!fp) { fprintf(stderr,"Could not open logfile: %s.\n", filename); return MPE_Log_FILE_PROB; } }#if DEBUG fprintf( debug_file, "[%d] Generating header\n", MPE_Log_procid ); fflush( debug_file );#endif MPE_Log_GenerateHeader( fp ); /* On to the business of generating the logfile */ readBlock = MPE_Log_firstBlock;#if DEBUG PrintBlockChain( debug_file, MPE_Log_firstBlock ); fflush( debug_file );#endif#if DEBUG fprintf( debug_file, "[%d] Sorting logfile\n", MPE_Log_procid ); fflush( debug_file );#endif readBlock = MPE_Log_firstBlock = MPE_Log_Sort( MPE_Log_firstBlock ); /* filter negative events to the top */#if DEBUG fprintf( debug_file, "[%d] Finished sorting logfile\n", MPE_Log_procid ); fflush( debug_file );#endif ba = (MPE_Log_MBuf *)MALLOC( sizeof(MPE_Log_MBuf)); bb = (MPE_Log_MBuf *)MALLOC( sizeof(MPE_Log_MBuf)); bc = (MPE_Log_MBuf *)MALLOC( sizeof(MPE_Log_MBuf)); bout = (MPE_Log_MBuf *)MALLOC( sizeof(MPE_Log_MBuf)); bout->p = bout->buf; bout->plast = bout->buf + MPE_Log_MBUF_SIZE; srcs = 1; ba->reload = MPE_Log_ReloadFromData; (*ba->reload)( ba, &srcs ); mtype = (am_left) ? 100 : 101; if (lchild >= 0) { srcs++; bb->reload = MPE_Log_ReloadFromChildL; (*bb->reload)( bb, &srcs ); } else bb->t = TIME_INF; if (rchild >= 0) { srcs++; bc->reload = MPE_Log_ReloadFromChildR; (*bc->reload)( bc, &srcs ); } else bc->t = TIME_INF;/*fprintf( stderr, "[%d] ba->t = %lf, bb->t = %lf, bc->t = %lf\n", MPE_Log_procid, ba->t, bb->t, bc->t );*/ while (srcs > 0) {#if DEBUG>2 fprintf( debug_file, "[%d] comparing %10lf %10lf %10lf\n", MPE_Log_procid, (ba->t<10e200)?ba->t:-1, (bb->t<10e200)?bb->t:-1, (bc->t<10e200)?bc->t:-1 );#endif if (ba->t <= bb->t) { if (ba->t <= bc->t) MPE_Log_Output( ba, bout, mtype, &srcs, fp, parent ); else MPE_Log_Output( bc, bout, mtype, &srcs, fp, parent ); } else { if (bb->t <= bc->t) MPE_Log_Output( bb, bout, mtype, &srcs, fp, parent ); else MPE_Log_Output( bc, bout, mtype, &srcs, fp, parent ); } } if (parent >= 0) { /* if this is a child */ if ((int)(bout->p - bout->buf) > 0) { /* if buffer has data in it */ MPI_Send( bout->buf, (bout->p - bout->buf) * sizeof( int ), MPI_BYTE, parent, mtype, MPI_COMM_WORLD ); /* send as raw BYTEs, don't want MPI changing the data */#if DEBUG fprintf( debug_file, "[%d] send data to parent\n", MPE_Log_procid ); fflush( debug_file );#endif } MPI_Send( bout->buf, 0 , MPI_BYTE, parent, mtype, MPI_COMM_WORLD ); /* tell the parent that's the last of it */#if DEBUG fprintf( debug_file, "[%d] tell parent I'm all out\n", MPE_Log_procid ); fflush( debug_file );#endif } else { fclose (fp); /* if process 0, just close output file */ }#if DEBUG fprintf( debug_file, "About to free bout\n" ); fflush(debug_file);#endif FREE(bout);#if DEBUG fprintf( debug_file, "About to free ba\n" ); fflush(debug_file);#endif FREE(ba);#if DEBUG fprintf( debug_file, "About to free bb\n" ); fflush(debug_file);#endif FREE(bb);#if DEBUG fprintf( debug_file, "About to free bc\n" ); fflush(debug_file);#endif FREE(bc);#if DEBUG fprintf( debug_file, "About to free LogMem\n" ); fflush(debug_file);#endif MPE_Log_FreeLogMem( MPE_Log_firstBlock ); /* Make sure that everyone has finished before exiting */#if DEBUG fprintf( debug_file, "About to do barrier\n" ); fflush(debug_file);#endif MPI_Barrier( MPI_COMM_WORLD );#if DEBUG fprintf( debug_file, "Done with barrier\n" ); fflush(debug_file);#endifreturn 0;}static void MPE_Log_GetStatistics( nevents, ne_types, startTime, endTime )int *nevents, *ne_types;double *startTime, *endTime;{ MPE_Log_BLOCK *bl; int xx_i, n, *bp; MPE_Log_HEADER *ap; int ne, net; double ttest; ne = 0; net = 0; MOVEDBL( startTime, &((MPE_Log_HEADER *)(MPE_Log_firstBlock + 1))->time ); *endTime = *startTime; bl = MPE_Log_firstBlock; while (bl) { n = bl->size; bp = (int *)(bl + 1); xx_i = 0; while (xx_i < n) { ap = (MPE_Log_HEADER *)bp; if (ap->event > MAX_HEADER_EVT || ap->event < MIN_HEADER_EVT) { ne++; MOVEDBL( &ttest, &ap->time ); if (ttest < *startTime) { *startTime = ttest; } if (ttest > *endTime) { *endTime = ttest; } } xx_i += ap->len; bp += ap->len; } bl = bl->next; } *nevents = ne; *ne_types = net;}#if DEBUGPrintMbufRecord( outf, recHdr )FILE *outf;MPE_Log_HEADER *recHdr;{ int recIntsRead, fldIntsRead, i; MPE_Log_VFIELD *fldPtr; double temp_time;/* fprintf( outf, "Raw record: " ); PrintSomeInts( outf, (int *)recHdr, recHdr->len ); putc( '\n', outf );*/ MOVEDBL( &temp_time, &recHdr->time ); fprintf( outf, "Header: pid %d ln %d evt %d %10.5lf Field lengths: ", *((int*)(recHdr+1)), recHdr->len, recHdr->event, temp_time ); recIntsRead = MPE_Log_HEADERSIZE + 1; fldPtr = (MPE_Log_VFIELD *)((int *)recHdr + MPE_Log_HEADERSIZE + 1) ; while (recIntsRead < recHdr->len) { fprintf( outf, "%d ", fldPtr->len ); recIntsRead += fldPtr->len; fldPtr = (MPE_Log_VFIELD *) ((int *)fldPtr + fldPtr->len ); } /* per field in a record */ putc( '\n', outf );}PrintMbuf( outf, thisBlock )FILE *outf;MPE_Log_MBuf *thisBlock;{ int blkIntsRead=0, recIntsRead, fldIntsRead, i; MPE_Log_HEADER *recHdr; MPE_Log_VFIELD *fldPtr; double temp_time; recHdr = (MPE_Log_HEADER *)(thisBlock->buf); blkIntsRead = 0; fprintf( outf, "\n[%d] start mbuf, %d read, %d full\n\n", MPE_Log_procid, thisBlock->p-thisBlock->buf, thisBlock->plast- thisBlock->buf ); while ((int*)recHdr < thisBlock->p) { fprintf( outf, "(%d of %d) ", (int*)recHdr-thisBlock->buf, thisBlock->p - thisBlock->buf ); PrintMbufRecord( outf, recHdr ); recHdr = (MPE_Log_HEADER *) ((int *)recHdr + recHdr->len); } /* per record in a block */ fprintf( outf, "\n[%d] end of mbuf\n\n", MPE_Log_procid );}#endif /* if DEBUG */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -