📄 tmred.c
字号:
} lbts->n_dproc = 0; } ++stats->nlbts;}/*----------------------------------------------------------------------------*/static void continue_active_reduction( void ){ if( sshot->active ) { RVALUE_TYPE dval; int timedout = FALSE; int done=rm_resume(sshot->rh, &dval, send_start_msg, send_value_msg, 0); /*Check if trial's taking too long (i.e.must be timedout?)*/ if( !done && sshot->timeout.do_timeout ) { /*Performance opt: Poll the timer only after several initial ticks*/ if( sshot->timeout.counter < sshot->timeout.max_count ) { ++sshot->timeout.counter; } else { TIMER_TYPE now; TIMER_NOW(now); if( TIMER_DIFF( now, sshot->timeout.start ) > sshot->timeout.period ) { /*This trial is timedout*/ timedout = TRUE; } } } if( done || timedout ) /*Current trial done or timedout*/ {if(st->debug>0){printf("TMMRed: %s snapshot %ld trial %ld: ",(done?"Done":"Timedout"),sshot->ID,sshot->trial);rv_print(stdout,&dval); printf("\n"); fflush(stdout);}if(st->debug>2) printstate(); if( done && (dval.nsent == dval.nrecd) ) /*Current snapshot done!*/ { terminate_active_sshot( dval.val, dval.qual ); } else /*Snapshot incomplete or timedout; start next trial*/ { move_to_next_sshot_trial(); } } }}/*----------------------------------------------------------------------------*/static int deliver_buffered_msgs( void ){ TMMesg **b = &comm->buffered_msgs; int ndelivered = 0; while( *b ) { TMMesg *msg = *b; int deliver = FALSE, consume = TRUE; if( sshot->active ) { if( msg->ssn == sshot->ID ) { if( msg->trial == sshot->trial ) { deliver = TRUE; } else { if( msg->trial < sshot->trial ) { /*Ignore outdated msg*/ } else { /*Future message; retain in queue*/ consume = FALSE; } } } else { if( msg->ssn < sshot->ID ) { /*Ignore outdated msg*/ } else /*This must be a Start/Value msg from next epoch*/ { MYASSERT( msg->ssn==sshot->ID+1, ("Must be of next epoch")); /*No point continuing currently active snapshot*/ /*since we have the LBTS as sent by the neighbor PE!*/if(st->debug>1){printf("!!------------ Skipping currently active snapshot!\n");printf("Recd msg:");print_tm_mesg(stdout,msg);printf("\n");printstate();} if( sshot->timeout.do_timeout ) terminate_active_sshot( msg->old_lbts, msg->old_qual ); /*Future message; retain in queue*/ consume = FALSE; } } } else { if( msg->ssn == epoch->ID ) { /*Take new snapshot and start LBTS computation*/ remote_start_lbts( msg->ssn, msg->trial ); MYASSERT( sshot->active && msg->ssn == sshot->ID && msg->trial == sshot->trial, ("!");print_tm_mesg(stderr,msg);printstate() ); deliver = TRUE; /*And, deliver this msg to new reduction*/ } else { if( msg->ssn < epoch->ID ) { /*Ignore outdated msg*/ } else { /*Future message; retain in queue*/ consume = FALSE; } } } if( deliver ) { MYASSERT(msg->ssn == sshot->ID && msg->trial == sshot->trial,("!")); switch( msg->type ) { case RM_START_MSG: { rm_receive_start( sshot->rh, msg->from_pe ); break; } case RM_VALUE_MSG: { rm_receive_value( sshot->rh, msg->from_pe, &msg->val ); break; } default: { MYASSERT( FALSE, ("!") ); /*Unknown type*/ break; } } } if( consume ) {if(st->debug>3){printf("Consuming msg:");print_tm_mesg(stdout,msg);printf("\n");} { *b = msg->next; } { msg->next = comm->free_msgs; comm->free_msgs = msg; } } else /*retain msg in queue*/ {if(st->debug>3){printf("Retaining msg:");print_tm_mesg(stdout,msg);printf("\n");} { b = &((*b)->next); } } }if(st->debug>4) if(ndelivered>0) {printf("+++ Delivered %d msgs\n",ndelivered);printstate();} return ndelivered;}/*----------------------------------------------------------------------------*/static void send_msg( TMMesg *msg ){ FM_stream *strm = 0; FM_Transport prev_transport; MYASSERT( sshot->active, ("!") ); msg->ssn = sshot->ID; msg->trial = sshot->trial; msg->old_lbts = lbts->LBTS; msg->old_qual = lbts->qual; prev_transport = FM_GetTransport(); FM_SetTransport( comm->use_udp ? FM_TRANSPORT_UNRELIABLE : FM_TRANSPORT_RELIABLE ); strm = FM_begin_message( msg->to_pe, sizeof(TMMesg), comm->fmh ); hton_TMMesg( msg ); FM_send_piece( strm, msg, sizeof(*msg) ); FM_end_message( strm ); FM_SetTransport( prev_transport );}/*----------------------------------------------------------------------------*/static void send_start_msg( RMUserHandle usr, void *closure, int from_pe, int to_pe ){ TMMesg msg; msg.type = RM_START_MSG; msg.from_pe = from_pe; msg.to_pe = to_pe; rv_init( &msg.val ); send_msg( &msg );}/*----------------------------------------------------------------------------*/static void send_value_msg( RMUserHandle usr, void *closure, int from_pe, int to_pe, RVALUE_TYPE *v ){ TMMesg msg; msg.type = RM_VALUE_MSG; msg.from_pe = from_pe; msg.to_pe = to_pe; rv_assign( &msg.val, v ); send_msg( &msg );}/*----------------------------------------------------------------------------*/static TMMesg *allocate_free_msgs( int nmsgs ){ int i = 0; TMMesg *first = 0, **b = &first; for( i = 0; i < nmsgs; i++ ) { *b = (TMMesg *)malloc( sizeof(TMMesg) ); if( *b ) { b = &((*b)->next); *b = 0; } } if(st->debug)printf("Allocated %d free TM buffers.\n",nmsgs); return first;}/*----------------------------------------------------------------------------*/static int recv_msg( FM_stream *strm, unsigned senderID ){ TMMesg msg; FM_receive( &msg, strm, sizeof(msg) ); ntoh_TMMesg( &msg );if(st->debug>2){printf("recv_msg: "); print_tm_mesg(stdout,&msg); printf("\n");} if( sshot->active && msg.ssn != sshot->ID ) /*Debugging/Testing*/ {if(st->debug>0){printf("***Type %ld ID mismatch: %ld %s %ld\n", msg.type, msg.ssn, ((msg.ssn < sshot->ID)?"<":">"), sshot->ID);} }++tot_tm_msgs; if( !(0<=msg.from_pe && msg.from_pe<st->N && 0<=msg.to_pe && msg.to_pe<st->N && msg.to_pe == st->myid && (msg.type==RM_START_MSG || msg.type==RM_VALUE_MSG)) ) { printf( "***TM error on read: %ld, %ld\n", msg.from_pe, msg.to_pe ); } elseif(FALSE&&(comm->loss_prob>0.0)&&((rand()%((int)(1/comm->loss_prob)))==0)){if(st->debug>0){printf("Dropped TM mesg:");print_tm_mesg(stdout,&msg);printf("\n");}nlost_tm_msgs++;}else { /*Buffer this message*/ TMMesg *mbuf = 0; /*Get a free buffer and copy the contents to it*/ { if( !comm->free_msgs ) comm->free_msgs = allocate_free_msgs(100); MYASSERT( comm->free_msgs, ("!") ); mbuf = comm->free_msgs; comm->free_msgs = mbuf->next; *mbuf = msg; } /*Append to queue*/ { TMMesg **b = &comm->buffered_msgs; while(*b) b = &((*b)->next); *b = mbuf; mbuf->next = 0; } } return FM_CONTINUE;}/*----------------------------------------------------------------------------*/void TMMRed_AddModule( void ){ TMModule mod = { &tm_state, TMMRed_Init, TMMRed_CurrentEpoch, TMMRed_Recent_LBTS, TMMRed_StartLBTS, TMMRed_PutTag, TMMRed_Out, TMMRed_In, TMMRed_PrintStats, TMMRed_PrintState, TMMRed_Tick, TMMRed_NotifyNewLBTS }; TM_AddModule( &mod );}/*----------------------------------------------------------------------------*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -