⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tmred.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
📖 第 1 页 / 共 3 页
字号:
    MYASSERT( sshot->ID+1 == epoch->ID, ("!") );    MYASSERT( TM_LE( lbts->LBTS, min_ts ),	          ("%s %s", TM_STR(tmstr1,lbts->LBTS), TM_STR(tmstr1,min_ts)) );    if( sshot->active )    {if(st->debug>2){printf("TMMRed_StartLBTSSnapShot(min_ts=%f,sshot-epoch=%ld,sshot-trial=%ld,curr-epoch=%ld)\n",TM_TS(min_ts),sshot->ID,sshot->trial,epoch->ID);}        MYASSERT( sshot->ID+1 == epoch->ID,      ("!");printstate() );        MYASSERT( sshot->ID+2 == epoch->next_id, ("!");printstate() );    }    else    {        /*Need to take a new snapshot and start reduction*/        MYASSERT( expected_sshot_id == epoch->ID, ("!") );        /*Transfer info from current epoch to snapshot, and prime reduction*/        {            sshot->active = TRUE;            sshot->ID = epoch->ID;            sshot->trial = trial_num-1;            rv_init( &sshot->value );            rv_set(&sshot->transients, min_ts,qual, epoch->nsent, epoch->nrecd);            move_to_next_sshot_trial();        }        /*Move to new epoch*/        {            MYASSERT( epoch->next_id == epoch->ID+1, ("!");printstate() );            ++epoch->ID;            epoch->nsent = 0;            epoch->nrecd = epoch->next_nrecd;            ++epoch->next_id;            epoch->next_nrecd = 0;        }if(st->debug>1){printf("TMMRed_StartLBTSSnapShot(min_ts=%f,new-epoch=%ld)\n",TM_TS(min_ts),epoch->ID);}        MYASSERT( lbts->n_dproc <= 0, ("!");printstate(); printstats() );        lbts->n_dproc = 0; /*Clear current callbacks*/    }    MYASSERT( sshot->active, ("!") );if(0)/*XXX*/    MYASSERT( TM_LE( sshot->value.val, min_ts ),              ("%lf %lf",TM_TS(sshot->value.val),TM_TS(min_ts)) );    /*Add given procedure to the list of callbacks for the active computation*/    if( done_proc )    {        MYASSERT( lbts->n_dproc < lbts->max_dproc, ("!") );        lbts->dproc[lbts->n_dproc++] = done_proc;    }    if(ptrans) *ptrans = sshot->ID;    return TM_SUCCESS;}/*----------------------------------------------------------------------------*/static long TMMRed_StartLBTS( TMM_Closure closure,                TM_Time min_ts, TM_TimeQual qual,                TM_LBTSDoneProc done_proc, long *ptrans ){    return TMMRed_StartLBTSSnapShot(min_ts,qual,done_proc,ptrans,epoch->ID,0);}/*----------------------------------------------------------------------------*//*Used to start LBTS when it is inferred that a new epoch/LBTS must be started*//*----------------------------------------------------------------------------*/static void remote_start_lbts( long sshot_id, long trial_num ){    TM_Time min_ts = TM_ZERO;    TM_TimeQual qual = TM_TIME_QUAL_INCL;    long sproc_flag;    TM_LBTSDoneProc dproc;if(st->debug>1)printf("**** Starting remotely initiated LBTS\n");if(st->debug>3)printstate();    MYASSERT( !sshot->active, ("!") );    MYASSERT( lbts->sproc, ("!") );    sproc_flag = lbts->sproc( epoch->ID, &min_ts, &qual, &dproc );    if( sproc_flag == TM_DEFER ) { min_ts = lbts->LBTS; qual = lbts->qual; }    TMMRed_StartLBTSSnapShot( min_ts, qual, dproc, NULL, sshot_id, trial_num );if(st->debug>3)printf("**** Done starting remotely initiated LBTS.\n");if(st->debug>3)printstate();}/*----------------------------------------------------------------------------*/static void TMMRed_PutTag( TMM_Closure closure, char *ptag, int *nbytes ){    *((long *)ptag) = epoch->ID;    *nbytes = sizeof(epoch->ID);if(st->debug>1){printf("TMMRed_PutTag(epoch=%ld)\n",epoch->ID);}}/*----------------------------------------------------------------------------*/static void TMMRed_Out( TMM_Closure closure, TM_Time ts, long nevents ){    epoch->nsent += nevents;if(st->debug>1){printf("TMMRed_Out(ts=%f,nevents=%ld,epoch=%ld);epoch->nsent=%ld\n",TM_TS(ts),nevents,epoch->ID,epoch->nsent);}}/*----------------------------------------------------------------------------*/static void TMMRed_In( TMM_Closure closure, TM_Time ts, char *ptag, int *nbytes){    long epoch_id = *((long*)ptag);    *nbytes = sizeof(epoch_id);if(st->debug>1){printf("TMMRed_In(ts=%f,epoch=%ld,curr-epoch=%ld)\n",TM_TS(ts),epoch_id,epoch->ID);}    if( epoch_id == epoch->ID )    {if(st->debug>2){printf("IncomingRegularMsg(ts=%f,epoch=%ld)\n",TM_TS(ts),epoch_id);}        /*Belongs to current epoch*/        ++epoch->nrecd;    }    else if( !sshot->active && epoch_id < epoch->ID )    {if(st->debug>1){printf("IncomingOldMsg(ts=%f,epoch=%ld)\n",TM_TS(ts),epoch_id);}        /* Ignore this message -- this is possible in the case in which  */        /* TSO messages can be lost; TM will try to advance LBTS even if */        /* some transient messages are still not received, after waiting */        /* for a while for those messages. */        /* The RTI should drop this message on the floor, because this */        /* message was presumed lost in the earlier LBTS computation */    }    else if( sshot->active && epoch_id == sshot->ID )    {        RVALUE_TYPE transient_msg;        rv_set( &transient_msg, ts, TM_TIME_QUAL_INCL, 0, 1 );        rv_reduce( &sshot->transients, &transient_msg );if(st->debug>2){printf("IncomingTransientMsg(ts=%f,epoch=%ld)\n",TM_TS(ts),epoch_id);}    }    else if( epoch_id == epoch->ID+1 )    {        if( sshot->active )        {if(st->debug>1){printf("Future epoch; must buffer info.\n");printstate();}            MYASSERT( epoch_id == epoch->next_id, ("!");printstate() );            MYASSERT( epoch_id == sshot->ID+2,    ("!");printstate() );            ++epoch->next_nrecd;        }        else        {if(st->debug>2){printf("Future epoch; must start new LBTS.\n");printstate();}            /*Start a new epoch+snapshot at this processor*/            remote_start_lbts( epoch->ID, 0 );            MYASSERT( sshot->active, ("!");printstate() );            MYASSERT( epoch_id == epoch->ID, ("!");printstate() );            /*NOW, we have made sure this mesg belongs to current epoch!*/            ++epoch->nrecd;        }    }    else    {        MYASSERT(FALSE,("TMMRed_In:Unexpected epoch %ld\n",epoch_id);printstate());    }}/*----------------------------------------------------------------------------*/static int tot_tm_msgs, nlost_tm_msgs;/*----------------------------------------------------------------------------*/static void printstats( void ){    TIMER_TYPE stop;    double secs;    TIMER_NOW(stop);    secs = TIMER_DIFF(stop, stats->start);    printf("TMMRED-Stastics\n");    printf("-----------\n");    printf("NEpochs=             %ld\n", epoch->ID-1);    printf("NLBTS=               %ld\n", stats->nlbts);    printf("Tot-trials=          %ld\n", stats->trial.tot-1);    printf("Max-trials-per-LBTS= %ld\n", stats->trial.max);    printf("Avg-trials-per-LBTS= %g\n",((double)stats->trial.tot-1)/stats->nlbts);    printf("Time-per-trial=      %16.8f microsecs\n",secs/stats->trial.tot*1e6);    printf("Time-per-LBTS=       %16.8f microsecs\n", secs/stats->nlbts*1e6);    printf("NLBTS-per-sec=       %16.8f\n", stats->nlbts/secs);    printf("-----------\n");if(nlost_tm_msgs>0)printf( "Total TM messages = %d, lost = %d, P(loss) = %g\n", tot_tm_msgs, nlost_tm_msgs, nlost_tm_msgs/(double)tot_tm_msgs );}/*----------------------------------------------------------------------------*/static void TMMRed_PrintStats( TMM_Closure closure ) { printstats(); }/*----------------------------------------------------------------------------*/static void print_tm_mesg(FILE *fp, TMMesg *m){    fprintf(fp, "{ssn=%ld, trial=%ld, type=%ld, from_pe=%ld, to_pe=%ld, val=",            m->ssn, m->trial, m->type, m->from_pe, m->to_pe );    rv_print( fp, &m->val );    fprintf( fp, ", old_lbts=%f, next=%p}", TM_TS(m->old_lbts), m->next );}/*----------------------------------------------------------------------------*/static void printstate( void ){    int i = 0;    printf("------------  TMMRED STATE START ------------\n");    printf("myid=%d, N=%d\n", st->myid, st->N);    printf("Epoch={ID=%ld, nsent=%ld, nrecd=%ld}\n",           epoch->ID, epoch->nsent, epoch->nrecd);    printf("Snapshot={active=%d, ID=%ld, trial=%ld, rh=%p, ",           sshot->active, sshot->ID, sshot->trial, sshot->rh);    printf("value="); rv_print(stdout, &sshot->value); printf(", ");    printf("transients="); rv_print(stdout, &sshot->transients);    printf("timeout{do_timeout=%s, counter=%d, max_count=%d, period=%f secs} ",           (sshot->timeout.do_timeout ? "TRUE" : "FALSE"),           sshot->timeout.counter, sshot->timeout.max_count,           sshot->timeout.period);    printf("}\n");    printf("LBTS={LBTS=%f (%s) sproc=%p, max_dproc=%d, n_dproc=%d, dproc=[",           TM_TS(lbts->LBTS), TM_TIME_QUAL_STR(lbts->qual),           lbts->sproc, lbts->max_dproc, lbts->n_dproc);    for(i=0; i<lbts->n_dproc; i++) printf("%s%p", (i>0?",":"!"),lbts->dproc[i]);    printf("]}\n");    printf("Comm={fmh=%d, use_udp=%s, buffered_msgs=%p, free_msgs=%p"           ", loss_prob=%f, msg_delay=%f secs}\n",           comm->fmh, comm->use_udp?"TRUE":"FALSE",           comm->buffered_msgs, comm->free_msgs,           comm->loss_prob, comm->msg_delay);    {      TMMesg *m=comm->buffered_msgs;      while(m){printf("\t");print_tm_mesg(stdout,m);printf("\n");m=m->next;}    }    printf("Stats={nlbts=%ld, trial{max=%ld,tot=%ld}}\n",           stats->nlbts, stats->trial.max, stats->trial.tot);    printf("\n");    printf("------------  TMMRED STATE END ------------\n");}/*----------------------------------------------------------------------------*/static void TMMRed_PrintState( TMM_Closure closure ) { printstate(); }/*----------------------------------------------------------------------------*/static void TMMRed_Tick( TMM_Closure closure ){    int ndelivered = 0;    do    {        continue_active_reduction();        ndelivered = deliver_buffered_msgs();    }while(ndelivered > 0);}/*----------------------------------------------------------------------------*/static void TMMRed_NotifyNewLBTS(TMM_Closure closure, TM_Time ts,TM_TimeQual q){    /*XXX TBC*/}/*----------------------------------------------------------------------------*/static void move_to_next_sshot_trial( void ){    MYASSERT( sshot->active, ("!") );    ++sshot->trial;    rv_reduce( &sshot->value, &sshot->transients ); /*Update value*/    rv_init( &sshot->transients ); /*Reset*/    if( sshot->timeout.do_timeout )    {        sshot->timeout.counter = 0;        TIMER_NOW( sshot->timeout.start );    }    ++stats->trial.tot;    if(sshot->trial > stats->trial.max) {stats->trial.max = sshot->trial;}    rm_init( sshot->rh, RM_SCHEDULE_GROUPED_BFLY );    rm_receive_value( sshot->rh, st->myid, &sshot->value );}/*----------------------------------------------------------------------------*/static void terminate_active_sshot( TM_Time new_lbts, TM_TimeQual new_qual ){    MYASSERT( sshot->active, ("!") );    MYASSERT( TM_LE( lbts->LBTS, new_lbts ),              ("LBTS %lf must not decrease to %lf.",			  TM_TS(lbts->LBTS),TM_TS(new_lbts)) );    sshot->active = FALSE;    lbts->LBTS = new_lbts;    lbts->qual = new_qual;    /*Report to waiting callbacks*/    {        int c;        for( c = 0; c < lbts->n_dproc; c++ )        {            lbts->dproc[c]( lbts->LBTS, lbts->qual, sshot->ID );

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -