⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tmred.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
📖 第 1 页 / 共 3 页
字号:
        }        lbts->n_dproc = 0;    }    ++stats->nlbts;}/*----------------------------------------------------------------------------*/static void continue_active_reduction( void ){    if( sshot->active )    {        RVALUE_TYPE dval;        int timedout = FALSE;        int done=rm_resume(sshot->rh, &dval, send_start_msg, send_value_msg, 0);        /*Check if trial's taking too long (i.e.must be timedout?)*/        if( !done && sshot->timeout.do_timeout )        {            /*Performance opt: Poll the timer only after several initial ticks*/            if( sshot->timeout.counter < sshot->timeout.max_count )            {                ++sshot->timeout.counter;            }            else            {                TIMER_TYPE now;                TIMER_NOW(now);                if( TIMER_DIFF( now, sshot->timeout.start ) >                    sshot->timeout.period )                {                    /*This trial is timedout*/                    timedout = TRUE;                }            }        }        if( done || timedout ) /*Current trial done or timedout*/        {if(st->debug>0){printf("TMMRed: %s snapshot %ld trial %ld: ",(done?"Done":"Timedout"),sshot->ID,sshot->trial);rv_print(stdout,&dval); printf("\n"); fflush(stdout);}if(st->debug>2) printstate();            if( done && (dval.nsent == dval.nrecd) ) /*Current snapshot done!*/            {                terminate_active_sshot( dval.val, dval.qual );            }            else /*Snapshot incomplete or timedout; start next trial*/            {                move_to_next_sshot_trial();            }        }    }}/*----------------------------------------------------------------------------*/static int deliver_buffered_msgs( void ){    TMMesg **b = &comm->buffered_msgs;    int ndelivered = 0;    while( *b )    {        TMMesg *msg = *b;        int deliver = FALSE, consume = TRUE;        if( sshot->active )        {            if( msg->ssn == sshot->ID )            {                if( msg->trial == sshot->trial )                {                    deliver = TRUE;                }                else                {                    if( msg->trial < sshot->trial )                    {                        /*Ignore outdated msg*/                    }                    else                    {                        /*Future message; retain in queue*/                        consume = FALSE;                    }                }            }            else            {                if( msg->ssn < sshot->ID )                {                    /*Ignore outdated msg*/                }                else /*This must be a Start/Value msg from next epoch*/                {                    MYASSERT( msg->ssn==sshot->ID+1, ("Must be of next epoch"));                    /*No point continuing currently active snapshot*/                    /*since we have the LBTS as sent by the neighbor PE!*/if(st->debug>1){printf("!!------------ Skipping currently active snapshot!\n");printf("Recd msg:");print_tm_mesg(stdout,msg);printf("\n");printstate();}                    if( sshot->timeout.do_timeout )                    terminate_active_sshot( msg->old_lbts, msg->old_qual );                    /*Future message; retain in queue*/                    consume = FALSE;                }            }        }        else        {            if( msg->ssn == epoch->ID )            {                /*Take new snapshot and start LBTS computation*/                remote_start_lbts( msg->ssn, msg->trial );                MYASSERT( sshot->active &&                        msg->ssn == sshot->ID &&                        msg->trial == sshot->trial,                        ("!");print_tm_mesg(stderr,msg);printstate() );                deliver = TRUE; /*And, deliver this msg to new reduction*/            }            else            {                if( msg->ssn < epoch->ID )                {                    /*Ignore outdated msg*/                }                else                {                    /*Future message; retain in queue*/                    consume = FALSE;                }            }        }        if( deliver )        {            MYASSERT(msg->ssn == sshot->ID && msg->trial == sshot->trial,("!"));            switch( msg->type )            {                case RM_START_MSG:                {                    rm_receive_start( sshot->rh, msg->from_pe );                    break;                }                case RM_VALUE_MSG:                {                    rm_receive_value( sshot->rh, msg->from_pe, &msg->val );                    break;                }                default:                {                    MYASSERT( FALSE, ("!") ); /*Unknown type*/                    break;                }            }        }        if( consume )        {if(st->debug>3){printf("Consuming msg:");print_tm_mesg(stdout,msg);printf("\n");}            { *b = msg->next; }            { msg->next = comm->free_msgs; comm->free_msgs = msg; }        }        else /*retain msg in queue*/        {if(st->debug>3){printf("Retaining msg:");print_tm_mesg(stdout,msg);printf("\n");}            { b = &((*b)->next); }        }    }if(st->debug>4) if(ndelivered>0) {printf("+++ Delivered %d msgs\n",ndelivered);printstate();}    return ndelivered;}/*----------------------------------------------------------------------------*/static void send_msg( TMMesg *msg ){    FM_stream *strm = 0;    FM_Transport prev_transport;    MYASSERT( sshot->active, ("!") );    msg->ssn = sshot->ID; msg->trial = sshot->trial;    msg->old_lbts = lbts->LBTS;    msg->old_qual = lbts->qual;    prev_transport = FM_GetTransport();    FM_SetTransport( comm->use_udp ?                     FM_TRANSPORT_UNRELIABLE : FM_TRANSPORT_RELIABLE );    strm = FM_begin_message( msg->to_pe, sizeof(TMMesg), comm->fmh );    hton_TMMesg( msg );    FM_send_piece( strm, msg, sizeof(*msg) );    FM_end_message( strm );    FM_SetTransport( prev_transport );}/*----------------------------------------------------------------------------*/static void send_start_msg( RMUserHandle usr, void *closure,    int from_pe, int to_pe ){    TMMesg msg;    msg.type = RM_START_MSG; msg.from_pe = from_pe; msg.to_pe = to_pe;    rv_init( &msg.val );    send_msg( &msg );}/*----------------------------------------------------------------------------*/static void send_value_msg( RMUserHandle usr, void *closure,    int from_pe, int to_pe, RVALUE_TYPE *v ){    TMMesg msg;    msg.type = RM_VALUE_MSG; msg.from_pe = from_pe; msg.to_pe = to_pe;    rv_assign( &msg.val, v );    send_msg( &msg );}/*----------------------------------------------------------------------------*/static TMMesg *allocate_free_msgs( int nmsgs ){    int i = 0;    TMMesg *first = 0, **b = &first;    for( i = 0; i < nmsgs; i++ )    {        *b = (TMMesg *)malloc( sizeof(TMMesg) );        if( *b ) { b = &((*b)->next); *b = 0; }    }    if(st->debug)printf("Allocated %d free TM buffers.\n",nmsgs);    return first;}/*----------------------------------------------------------------------------*/static int recv_msg( FM_stream *strm, unsigned senderID ){    TMMesg msg;    FM_receive( &msg, strm, sizeof(msg) );    ntoh_TMMesg( &msg );if(st->debug>2){printf("recv_msg: "); print_tm_mesg(stdout,&msg); printf("\n");}    if( sshot->active && msg.ssn != sshot->ID ) /*Debugging/Testing*/    {if(st->debug>0){printf("***Type %ld ID mismatch: %ld %s %ld\n", msg.type, msg.ssn, ((msg.ssn < sshot->ID)?"<":">"), sshot->ID);}    }++tot_tm_msgs;    if( !(0<=msg.from_pe && msg.from_pe<st->N &&          0<=msg.to_pe && msg.to_pe<st->N &&          msg.to_pe == st->myid &&          (msg.type==RM_START_MSG || msg.type==RM_VALUE_MSG)) )    {        printf( "***TM error on read: %ld, %ld\n", msg.from_pe, msg.to_pe );    }    elseif(FALSE&&(comm->loss_prob>0.0)&&((rand()%((int)(1/comm->loss_prob)))==0)){if(st->debug>0){printf("Dropped TM mesg:");print_tm_mesg(stdout,&msg);printf("\n");}nlost_tm_msgs++;}else    {        /*Buffer this message*/        TMMesg *mbuf = 0;        /*Get a free buffer and copy the contents to it*/        {            if( !comm->free_msgs )                comm->free_msgs = allocate_free_msgs(100);            MYASSERT( comm->free_msgs, ("!") );            mbuf = comm->free_msgs;            comm->free_msgs = mbuf->next;            *mbuf = msg;        }        /*Append to queue*/        {            TMMesg **b = &comm->buffered_msgs;            while(*b) b = &((*b)->next);            *b = mbuf;            mbuf->next = 0;        }    }    return FM_CONTINUE;}/*----------------------------------------------------------------------------*/void TMMRed_AddModule( void ){    TMModule mod = {        &tm_state,        TMMRed_Init,        TMMRed_CurrentEpoch,        TMMRed_Recent_LBTS,        TMMRed_StartLBTS,        TMMRed_PutTag,        TMMRed_Out,        TMMRed_In,        TMMRed_PrintStats,        TMMRed_PrintState,        TMMRed_Tick,        TMMRed_NotifyNewLBTS    };    TM_AddModule( &mod );}/*----------------------------------------------------------------------------*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -