📄 tmred.c
字号:
MYASSERT( sshot->ID+1 == epoch->ID, ("!") ); MYASSERT( TM_LE( lbts->LBTS, min_ts ), ("%s %s", TM_STR(tmstr1,lbts->LBTS), TM_STR(tmstr1,min_ts)) ); if( sshot->active ) {if(st->debug>2){printf("TMMRed_StartLBTSSnapShot(min_ts=%f,sshot-epoch=%ld,sshot-trial=%ld,curr-epoch=%ld)\n",TM_TS(min_ts),sshot->ID,sshot->trial,epoch->ID);} MYASSERT( sshot->ID+1 == epoch->ID, ("!");printstate() ); MYASSERT( sshot->ID+2 == epoch->next_id, ("!");printstate() ); } else { /*Need to take a new snapshot and start reduction*/ MYASSERT( expected_sshot_id == epoch->ID, ("!") ); /*Transfer info from current epoch to snapshot, and prime reduction*/ { sshot->active = TRUE; sshot->ID = epoch->ID; sshot->trial = trial_num-1; rv_init( &sshot->value ); rv_set(&sshot->transients, min_ts,qual, epoch->nsent, epoch->nrecd); move_to_next_sshot_trial(); } /*Move to new epoch*/ { MYASSERT( epoch->next_id == epoch->ID+1, ("!");printstate() ); ++epoch->ID; epoch->nsent = 0; epoch->nrecd = epoch->next_nrecd; ++epoch->next_id; epoch->next_nrecd = 0; }if(st->debug>1){printf("TMMRed_StartLBTSSnapShot(min_ts=%f,new-epoch=%ld)\n",TM_TS(min_ts),epoch->ID);} MYASSERT( lbts->n_dproc <= 0, ("!");printstate(); printstats() ); lbts->n_dproc = 0; /*Clear current callbacks*/ } MYASSERT( sshot->active, ("!") );if(0)/*XXX*/ MYASSERT( TM_LE( sshot->value.val, min_ts ), ("%lf %lf",TM_TS(sshot->value.val),TM_TS(min_ts)) ); /*Add given procedure to the list of callbacks for the active computation*/ if( done_proc ) { MYASSERT( lbts->n_dproc < lbts->max_dproc, ("!") ); lbts->dproc[lbts->n_dproc++] = done_proc; } if(ptrans) *ptrans = sshot->ID; return TM_SUCCESS;}/*----------------------------------------------------------------------------*/static long TMMRed_StartLBTS( TMM_Closure closure, TM_Time min_ts, TM_TimeQual qual, TM_LBTSDoneProc done_proc, long *ptrans ){ return TMMRed_StartLBTSSnapShot(min_ts,qual,done_proc,ptrans,epoch->ID,0);}/*----------------------------------------------------------------------------*//*Used to start LBTS when it is inferred that a new epoch/LBTS must be started*//*----------------------------------------------------------------------------*/static void remote_start_lbts( long sshot_id, long trial_num ){ TM_Time min_ts = TM_ZERO; TM_TimeQual qual = TM_TIME_QUAL_INCL; long sproc_flag; TM_LBTSDoneProc dproc;if(st->debug>1)printf("**** Starting remotely initiated LBTS\n");if(st->debug>3)printstate(); MYASSERT( !sshot->active, ("!") ); MYASSERT( lbts->sproc, ("!") ); sproc_flag = lbts->sproc( epoch->ID, &min_ts, &qual, &dproc ); if( sproc_flag == TM_DEFER ) { min_ts = lbts->LBTS; qual = lbts->qual; } TMMRed_StartLBTSSnapShot( min_ts, qual, dproc, NULL, sshot_id, trial_num );if(st->debug>3)printf("**** Done starting remotely initiated LBTS.\n");if(st->debug>3)printstate();}/*----------------------------------------------------------------------------*/static void TMMRed_PutTag( TMM_Closure closure, char *ptag, int *nbytes ){ *((long *)ptag) = epoch->ID; *nbytes = sizeof(epoch->ID);if(st->debug>1){printf("TMMRed_PutTag(epoch=%ld)\n",epoch->ID);}}/*----------------------------------------------------------------------------*/static void TMMRed_Out( TMM_Closure closure, TM_Time ts, long nevents ){ epoch->nsent += nevents;if(st->debug>1){printf("TMMRed_Out(ts=%f,nevents=%ld,epoch=%ld);epoch->nsent=%ld\n",TM_TS(ts),nevents,epoch->ID,epoch->nsent);}}/*----------------------------------------------------------------------------*/static void TMMRed_In( TMM_Closure closure, TM_Time ts, char *ptag, int *nbytes){ long epoch_id = *((long*)ptag); *nbytes = sizeof(epoch_id);if(st->debug>1){printf("TMMRed_In(ts=%f,epoch=%ld,curr-epoch=%ld)\n",TM_TS(ts),epoch_id,epoch->ID);} if( epoch_id == epoch->ID ) {if(st->debug>2){printf("IncomingRegularMsg(ts=%f,epoch=%ld)\n",TM_TS(ts),epoch_id);} /*Belongs to current epoch*/ ++epoch->nrecd; } else if( !sshot->active && epoch_id < epoch->ID ) {if(st->debug>1){printf("IncomingOldMsg(ts=%f,epoch=%ld)\n",TM_TS(ts),epoch_id);} /* Ignore this message -- this is possible in the case in which */ /* TSO messages can be lost; TM will try to advance LBTS even if */ /* some transient messages are still not received, after waiting */ /* for a while for those messages. */ /* The RTI should drop this message on the floor, because this */ /* message was presumed lost in the earlier LBTS computation */ } else if( sshot->active && epoch_id == sshot->ID ) { RVALUE_TYPE transient_msg; rv_set( &transient_msg, ts, TM_TIME_QUAL_INCL, 0, 1 ); rv_reduce( &sshot->transients, &transient_msg );if(st->debug>2){printf("IncomingTransientMsg(ts=%f,epoch=%ld)\n",TM_TS(ts),epoch_id);} } else if( epoch_id == epoch->ID+1 ) { if( sshot->active ) {if(st->debug>1){printf("Future epoch; must buffer info.\n");printstate();} MYASSERT( epoch_id == epoch->next_id, ("!");printstate() ); MYASSERT( epoch_id == sshot->ID+2, ("!");printstate() ); ++epoch->next_nrecd; } else {if(st->debug>2){printf("Future epoch; must start new LBTS.\n");printstate();} /*Start a new epoch+snapshot at this processor*/ remote_start_lbts( epoch->ID, 0 ); MYASSERT( sshot->active, ("!");printstate() ); MYASSERT( epoch_id == epoch->ID, ("!");printstate() ); /*NOW, we have made sure this mesg belongs to current epoch!*/ ++epoch->nrecd; } } else { MYASSERT(FALSE,("TMMRed_In:Unexpected epoch %ld\n",epoch_id);printstate()); }}/*----------------------------------------------------------------------------*/static int tot_tm_msgs, nlost_tm_msgs;/*----------------------------------------------------------------------------*/static void printstats( void ){ TIMER_TYPE stop; double secs; TIMER_NOW(stop); secs = TIMER_DIFF(stop, stats->start); printf("TMMRED-Stastics\n"); printf("-----------\n"); printf("NEpochs= %ld\n", epoch->ID-1); printf("NLBTS= %ld\n", stats->nlbts); printf("Tot-trials= %ld\n", stats->trial.tot-1); printf("Max-trials-per-LBTS= %ld\n", stats->trial.max); printf("Avg-trials-per-LBTS= %g\n",((double)stats->trial.tot-1)/stats->nlbts); printf("Time-per-trial= %16.8f microsecs\n",secs/stats->trial.tot*1e6); printf("Time-per-LBTS= %16.8f microsecs\n", secs/stats->nlbts*1e6); printf("NLBTS-per-sec= %16.8f\n", stats->nlbts/secs); printf("-----------\n");if(nlost_tm_msgs>0)printf( "Total TM messages = %d, lost = %d, P(loss) = %g\n", tot_tm_msgs, nlost_tm_msgs, nlost_tm_msgs/(double)tot_tm_msgs );}/*----------------------------------------------------------------------------*/static void TMMRed_PrintStats( TMM_Closure closure ) { printstats(); }/*----------------------------------------------------------------------------*/static void print_tm_mesg(FILE *fp, TMMesg *m){ fprintf(fp, "{ssn=%ld, trial=%ld, type=%ld, from_pe=%ld, to_pe=%ld, val=", m->ssn, m->trial, m->type, m->from_pe, m->to_pe ); rv_print( fp, &m->val ); fprintf( fp, ", old_lbts=%f, next=%p}", TM_TS(m->old_lbts), m->next );}/*----------------------------------------------------------------------------*/static void printstate( void ){ int i = 0; printf("------------ TMMRED STATE START ------------\n"); printf("myid=%d, N=%d\n", st->myid, st->N); printf("Epoch={ID=%ld, nsent=%ld, nrecd=%ld}\n", epoch->ID, epoch->nsent, epoch->nrecd); printf("Snapshot={active=%d, ID=%ld, trial=%ld, rh=%p, ", sshot->active, sshot->ID, sshot->trial, sshot->rh); printf("value="); rv_print(stdout, &sshot->value); printf(", "); printf("transients="); rv_print(stdout, &sshot->transients); printf("timeout{do_timeout=%s, counter=%d, max_count=%d, period=%f secs} ", (sshot->timeout.do_timeout ? "TRUE" : "FALSE"), sshot->timeout.counter, sshot->timeout.max_count, sshot->timeout.period); printf("}\n"); printf("LBTS={LBTS=%f (%s) sproc=%p, max_dproc=%d, n_dproc=%d, dproc=[", TM_TS(lbts->LBTS), TM_TIME_QUAL_STR(lbts->qual), lbts->sproc, lbts->max_dproc, lbts->n_dproc); for(i=0; i<lbts->n_dproc; i++) printf("%s%p", (i>0?",":"!"),lbts->dproc[i]); printf("]}\n"); printf("Comm={fmh=%d, use_udp=%s, buffered_msgs=%p, free_msgs=%p" ", loss_prob=%f, msg_delay=%f secs}\n", comm->fmh, comm->use_udp?"TRUE":"FALSE", comm->buffered_msgs, comm->free_msgs, comm->loss_prob, comm->msg_delay); { TMMesg *m=comm->buffered_msgs; while(m){printf("\t");print_tm_mesg(stdout,m);printf("\n");m=m->next;} } printf("Stats={nlbts=%ld, trial{max=%ld,tot=%ld}}\n", stats->nlbts, stats->trial.max, stats->trial.tot); printf("\n"); printf("------------ TMMRED STATE END ------------\n");}/*----------------------------------------------------------------------------*/static void TMMRed_PrintState( TMM_Closure closure ) { printstate(); }/*----------------------------------------------------------------------------*/static void TMMRed_Tick( TMM_Closure closure ){ int ndelivered = 0; do { continue_active_reduction(); ndelivered = deliver_buffered_msgs(); }while(ndelivered > 0);}/*----------------------------------------------------------------------------*/static void TMMRed_NotifyNewLBTS(TMM_Closure closure, TM_Time ts,TM_TimeQual q){ /*XXX TBC*/}/*----------------------------------------------------------------------------*/static void move_to_next_sshot_trial( void ){ MYASSERT( sshot->active, ("!") ); ++sshot->trial; rv_reduce( &sshot->value, &sshot->transients ); /*Update value*/ rv_init( &sshot->transients ); /*Reset*/ if( sshot->timeout.do_timeout ) { sshot->timeout.counter = 0; TIMER_NOW( sshot->timeout.start ); } ++stats->trial.tot; if(sshot->trial > stats->trial.max) {stats->trial.max = sshot->trial;} rm_init( sshot->rh, RM_SCHEDULE_GROUPED_BFLY ); rm_receive_value( sshot->rh, st->myid, &sshot->value );}/*----------------------------------------------------------------------------*/static void terminate_active_sshot( TM_Time new_lbts, TM_TimeQual new_qual ){ MYASSERT( sshot->active, ("!") ); MYASSERT( TM_LE( lbts->LBTS, new_lbts ), ("LBTS %lf must not decrease to %lf.", TM_TS(lbts->LBTS),TM_TS(new_lbts)) ); sshot->active = FALSE; lbts->LBTS = new_lbts; lbts->qual = new_qual; /*Report to waiting callbacks*/ { int c; for( c = 0; c < lbts->n_dproc; c++ ) { lbts->dproc[c]( lbts->LBTS, lbts->qual, sshot->ID );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -