📄 tmnull.c
字号:
qual_j = TM_GT(temp_min_ts, lbts_j) ? qual_j : TM_LT(temp_min_ts,lbts_j) ? qual_i : (qual_i < qual_j ? qual_i : qual_j); lbts_j = temp_min_ts; } if( comp->inprogress && TM_LE(comp->transient_ts,lbts_j) ) { lbts_j = comp->transient_ts; qual_j = TM_TIME_QUAL_INCL; } lbts_j = TM_Add(lbts_j, external_la);#if 0 /*XXX*/ #define ABS(t) ((t)<0?(-(t)):(t)) #define TM_EQ2(t1,t2) (ABS((t1)-(t2))<=1e-10) MYASSERT( TM_GE(lbts_j,*old_lbts_j) || (TM_EQ2(lbts_j,*old_lbts_j) && qual_j>=*old_qual_j), ("Null ts %308.6lf,%s shouldn't decrease from %308.6lf,%s %s", lbts_j*1e+300,TM_TIME_QUAL_STR(qual_j), 1e+300**old_lbts_j,TM_TIME_QUAL_STR(*old_qual_j), ((lbts_j-*old_lbts_j)<0?"-ve":"+ve"));TM_PrintState());#endif /*XXX*/if(st->debug>1){printf("TMMNull_DoNullSends() to %d lbts_j %lf old_lbts_j %lf.\n",rpe,TM_TS(lbts_j),TM_TS(*old_lbts_j));fflush(stdout);} /*Send only if new lbts is better than previously sent*/ if( TM_GT(lbts_j, *old_lbts_j) || (TM_EQ(lbts_j, *old_lbts_j) && qual_j > *old_qual_j) ) { *old_lbts_j = lbts_j; *old_qual_j = qual_j; TMMNull_SendNullMesg( rpe, *old_lbts_j, *old_qual_j );if(st->debug>1){printf("TMMNull_DoNullSends() sent NULL(%lf,%s) to %d.\n",TM_TS(*old_lbts_j),TM_TIME_QUAL_STR(*old_qual_j), rpe);fflush(stdout);} } }if(st->debug>1){printf("TMMNull_DoNullSends() done.\n");fflush(stdout);}}/*----------------------------------------------------------------------------*/static int TMMNull_UpdateSender(int spe, TM_Time null_ts,TM_TimeQual null_qual){ int i = 0, updated = FALSE; ConnectionInfo *inconn = 0; for( i = 0; i < st->in.n; i++ ) if(st->in.conn[i].pe == spe) break; MYASSERT(i < st->in.n, ("Null sender %d should be found", spe)); inconn = &st->in.conn[i]; if( spe == st->myid || /*Special case: self lbts update can go up/down*/ TM_LT(inconn->lbts, null_ts) || /*For others, a better ts is required*/ (TM_EQ(inconn->lbts, null_ts) && inconn->qual < null_qual) ) { inconn->lbts = null_ts; inconn->qual = null_qual; comp->updated = TRUE; updated = TRUE; } return updated;}/*----------------------------------------------------------------------------*/static void TMMNull_RegisterLocalLBTS( TM_Time min_ts, TM_TimeQual qual, TM_LBTSDoneProc done_proc, long *ptrans ){ TM_Time min_ext_la = TM_TopoGetMinExternalLA(); int myindex = 0, mype = TM_TopoGetSender( myindex ); MYASSERT( mype == st->myid, ("%d %d",mype,st->myid) ); TMMNull_AddDoneProc( done_proc ); min_ts = TM_Sub(min_ts, min_ext_la);/*Caller already adds min-lookahead*/ TMMNull_UpdateSender( mype, min_ts, qual ); comp->inprogress = TRUE; if(ptrans) *ptrans = lbts->ID;}/*----------------------------------------------------------------------------*/static long TMMNull_StartLBTS( TMM_Closure closure, TM_Time min_ts, TM_TimeQual qual, TM_LBTSDoneProc done_proc, long *ptrans ){if(st->debug>1){printf("TMMNull_StartLBTS(%lf,%s).\n",TM_TS(min_ts),TM_TIME_QUAL_STR(qual));fflush(stdout);} TMMNull_RegisterLocalLBTS( min_ts, qual, done_proc, ptrans ); return TM_SUCCESS;}/*----------------------------------------------------------------------------*/static void TMMNull_EvaluateProgress( void ){ int i = 0; TM_Time new_min_ts = TM_IDENT; TM_TimeQual new_min_qual = TM_TIME_QUAL_INCL; TM_Time min_ext_la = TM_TopoGetMinExternalLA(); for( i = 0; i < st->in.n; i++ ) /*Find min lbts among all incoming*/ { ConnectionInfo *conn = &st->in.conn[i]; TM_Time this_lbts = conn->lbts; if(conn->pe == st->myid && st->N > 1) /*Self, multi-processor*/ { continue; /*Skip self*/ } if( st->N <= 1 )/*Special case for uni-processor*/ { this_lbts = TM_Add(this_lbts, min_ext_la); /*Can progress by minLA*/ } if( TM_GT(new_min_ts, this_lbts) || (TM_EQ(new_min_ts, this_lbts) && new_min_qual > conn->qual) ) { new_min_ts = this_lbts; new_min_qual = conn->qual; } } if( comp->inprogress && TM_LE(comp->transient_ts, new_min_ts) ) { new_min_ts = comp->transient_ts; new_min_qual = TM_TIME_QUAL_INCL; } MYASSERT( TM_LE(lbts->LBTS, new_min_ts), ("%lf %lf",TM_TS(lbts->LBTS),TM_TS(new_min_ts));TM_PrintState() ); /*Did the LBTS increase?*/ if( TM_LT(lbts->LBTS, new_min_ts) || (TM_EQ(lbts->LBTS, new_min_ts) && lbts->qual < new_min_qual ) ) { int c; /*Yes, got an advance!*/ lbts->LBTS = new_min_ts; lbts->qual = new_min_qual;if(st->debug>1){printf("TMMNull_EvaluateProgress() got advance to (%lf,%s).\n",TM_TS(lbts->LBTS),TM_TIME_QUAL_STR(lbts->qual));fflush(stdout);} /*Inform all the waiting dprocs*/ for( c = 0; c < lbts->n_dproc; c++ ) { lbts->dproc[c]( lbts->LBTS, lbts->qual, lbts->ID ); } lbts->n_dproc = 0; /*Mark that we're done with this round*/ comp->inprogress = FALSE; comp->updated = FALSE; comp->transient_ts = TM_IDENT; lbts->ID++; }}/*----------------------------------------------------------------------------*/static void TMMNull_DoAggressiveSends( void ){ if( opt->aggressive_sends.on && ++opt->aggressive_sends.count >= opt->aggressive_sends.freq ) { opt->aggressive_sends.count = 0; comp->updated = TRUE; }}/*----------------------------------------------------------------------------*/static void TMMNull_Tick( TMM_Closure closure ){ if(!comp->inprogress && !comp->updated) { TMMNull_DoAggressiveSends(); } else if(comp->inprogress && !comp->updated) { TMMNull_DoAggressiveSends(); } else { MYASSERT( comp->updated, ("!") ); if( !comp->inprogress ) { /*Query for local LBTS*/ TM_Time min_ts = TM_ZERO; TM_TimeQual qual = TM_TIME_QUAL_INCL; long sproc_flag; TM_LBTSDoneProc dproc = 0;if(st->debug>1){printf("** Starting remotely initiated LBTS\n");fflush(stdout);} MYASSERT( lbts->sproc, ("!") ); sproc_flag = lbts->sproc( lbts->ID, &min_ts, &qual, &dproc ); if( sproc_flag==TM_DEFER ) {min_ts = lbts->LBTS; qual = lbts->qual;} TMMNull_RegisterLocalLBTS( min_ts, qual, dproc, NULL ); } TMMNull_DoNullSends(); TMMNull_EvaluateProgress(); }}/*----------------------------------------------------------------------------*/static int TMMNull_RecvNullMesg( FM_stream *strm, unsigned senderID ){ TMMNullMesg msg; FM_receive( &msg, strm, sizeof(msg) ); ntoh_TMMNullMesg( &msg ); ++stats->nulls.nrecd;if(st->debug>1){printf("TMMNull_RecvNullMesg: "); TMMNull_PrintNullMesg(stdout,&msg);printf("\n");fflush(stdout);} TMMNull_UpdateSender( msg.from_pe, msg.lbts, msg.qual ); return FM_CONTINUE;}/*----------------------------------------------------------------------------*/static void TMMNull_NotifyNewLBTS(TMM_Closure closure, TM_Time ts,TM_TimeQual q){ int i = 0;if(st->debug>1){printf("TMMNull_NotifyNewLBTS(%lf, %s)\n",TM_TS(ts),TM_TIME_QUAL_STR(q));fflush(stdout);} for( i = 0; i < st->in.n; i++ ) { ConnectionInfo *conn = &st->in.conn[i]; /*Don't update self; otherwise, we might send out*/ /*null msgs before events with timestamps <= lbts are sent out*/ if( conn->pe == st->myid ) continue; if( TM_GT(ts, conn->lbts) || (TM_EQ(ts, conn->lbts) && q > conn->qual) ) { TMMNull_UpdateSender( conn->pe, ts, q ); } }}/*----------------------------------------------------------------------------*/static void TMMNull_PrintStats( TMM_Closure closure ){ TIMER_TYPE stop; double secs; long nlbts = lbts->ID; long totnulls = stats->nulls.nsent+stats->nulls.nrecd; long totevents = stats->events.nsent+stats->events.nrecd; TIMER_NOW(stop); secs = TIMER_DIFF(stop, stats->start); printf("TMMNULL-Stastics\n"); printf("-----------\n"); printf("NLBTS= %ld\n", nlbts); printf("Time-per-LBTS= %16.8f microsecs\n", secs/nlbts*1e6); printf("NLBTS-per-sec= %16.8f\n", nlbts/secs); printf("Null-mesgs-sent= %ld\n", stats->nulls.nsent); printf("Null-mesgs-recd= %ld\n", stats->nulls.nrecd); printf("Events-sent= %ld\n", stats->events.nsent); printf("Events-recd= %ld\n", stats->events.nrecd); printf("Events/Null= "); if(totnulls==0)printf("Inf\n");else printf("%lf\n", totevents*1.0/totnulls); printf("-----------\n");}/*----------------------------------------------------------------------------*/static void TMMNull_PrintState( TMM_Closure closure ){ int i = 0, j = 0; printf("------------ TMMNULL STATE START ------------\n"); printf("myid=%d, N=%d fmh=%d\n", st->myid, st->N, st->fmh); printf("LBTS={ID=%ld LBTS=%f (%s) " "sproc=%p, max_dproc=%d, n_dproc=%d, dproc=[", lbts->ID, TM_TS(lbts->LBTS), TM_TIME_QUAL_STR(lbts->qual), lbts->sproc, lbts->max_dproc, lbts->n_dproc); for(i=0; i<lbts->n_dproc; i++) printf("%s%p", (i>0?",":"!"),lbts->dproc[i]); printf("]}\n"); for(j=0;j<2;j++) { ConnectionList *cl = (j==0?&st->in:&st->out); printf("%s=[%d]{\n",(j==0?"in":"out"),cl->n); for(i=0;i<cl->n;i++) { ConnectionInfo *conn = &cl->conn[i]; printf("\t%d: {pe=%d, lbts=%lf, qual=%s}\n", i,conn->pe,TM_TS(conn->lbts),TM_TIME_QUAL_STR(conn->qual)); } } printf("Comp={inprogress=%d, updated=%d, transient_ts=%lf}\n", comp->inprogress,comp->updated,TM_TS(comp->transient_ts)); printf("Stats={Nulls-nsent=%ld,Nulls-nrecd=%ld," "Events-nsent=%ld,Events-nrecd=%ld}\n", stats->nulls.nsent, stats->nulls.nrecd, stats->events.nsent, stats->events.nrecd); printf("\n"); printf("------------ TMMNULL STATE END ------------\n");}/*----------------------------------------------------------------------------*/void TMMNull_AddModule( void ){ TMModule mod = { &tm_state, TMMNull_Init, TMMNull_CurrentEpoch, TMMNull_Recent_LBTS, TMMNull_StartLBTS, TMMNull_PutTag, TMMNull_Out, TMMNull_In, TMMNull_PrintStats, TMMNull_PrintState, TMMNull_Tick, TMMNull_NotifyNewLBTS }; TM_AddModule( &mod );}/*----------------------------------------------------------------------------*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -