📄 rticore.c
字号:
/*----------------------------------------------------------------------------*//* tick *//*----------------------------------------------------------------------------*/fdkErrorCode Core_tick(void){ fdkErrorCode status; /* KALYAN: In largescale, fine-grained federates, per call overhead for */ /* each NER/TAG can be too high if not optimized; hence this optimization */ if( CRTI.optimize.ner_fastpath && CRTI.PendingRequest == NER && TM_LT( CRTI.PendingTime, RTI_TSOMin() ) && TM_LT( CRTI.PendingTime, CRTI.LBTS ) ) { {static int x=0; if(x++>1000/*CUSTOMIZE*/){x=0;FM_extract(~0);}} CRTI.PendingRequest = NO_REQ; CRTI.CurrentTime = CRTI.LocalMin = CRTI.PendingTime; RTI_TimeAdvanceGrant(CRTI.CurrentTime); return fdkSUCCEEDED; } /* KALYAN: In federations with ill-balanced loads among federates, the */ /* faster federate(s) can easily bog down the slower ones with too many */ /* LBTSs; hence this optimization of ticking TM only when locally required*/ if( !CRTI.optimize.min_nlbts || CRTI.PendingRequest == NO_REQ || TM_GE( CRTI.PendingTime, CRTI.LBTS ) ) { RTIKIT_Tick(); MCAST_Tick(); } else { FM_extract(~0); } status = core_DoEventDelivery(); return status;}/*----------------------------------------------------------------------------*//* PrintRTIState *//*----------------------------------------------------------------------------*/fdkErrorCode Core_PrintRTIState (FILE *out){ if (out == NULL) return(fdkBADFD); fprintf (out,"*** RTI State PE %d ***\n", (int) Core_federateID()); fprintf (out,"CurrentTime = %f\n", (double) TM_TS(CRTI.CurrentTime)); fprintf (out,"CurrentQual = %d\n", CRTI.CurrentQual); fprintf (out,"LookAhead = %f\n", (double) TM_TS(CRTI.LookAhead)); fprintf (out,"TargetLA = %f\n", (double) TM_TS(CRTI.TargetLA)); fprintf (out,"PendingRequest = %d\n", (int) CRTI.PendingRequest); fprintf (out,"PendingTime = %f\n", (double) TM_TS(CRTI.PendingTime)); fprintf (out,"PendingQual = %d\n", CRTI.PendingQual); fprintf (out,"NPendingLBTS = %d\n", (int) CRTI.NPendingLBTS); fprintf (out,"LBTS = %f\n", (double) TM_TS(CRTI.LBTS)); fprintf (out,"LBTSQual = %d\n", CRTI.LBTSQual); fprintf (out,"Number of Sender retraction table resizes = %d\n", Stats.SRTableResizes); fprintf (out,"Final Size of Sender retraction table = %d\n", Stats.SRTableFinalSize); fprintf (out,"Initial Size of sender retraction table = %d\n", Stats.SRTableInitialSize); fprintf (out,"Number of recver retraction table resizes = %d\n", Stats.RRTableResizes); fprintf (out,"Final Size of recver retraction table = %d\n", Stats.RRTableFinalSize); fprintf (out,"Initial size of recver retraction table = %d\n", Stats.RRTableInitialSize); fflush (out); return(fdkSUCCEEDED);}/*----------------------------------------------------------------------------*//* Init Debug *//*----------------------------------------------------------------------------*/fdkErrorCode Core_InitDebug(FILE *fout){ char *dbgstr = getenv("RTIC_DEBUG"); /* Integer [1,inf] */ CRTI.debug = dbgstr?atoi(dbgstr):0; if (fout == NULL) CRTI.dout = stderr; else CRTI.dout = fout; return(fdkSUCCEEDED);}/*----------------------------------------------------------------------------*//* INTERNAL Core Procedures *//*----------------------------------------------------------------------------*/static fdkErrorCode core_DoEventDelivery(){ TM_Time ReleaseTime; TM_TimeQual ReleaseQual; TM_Time AllowableReduction; fdkErrorCode status = fdkSUCCEEDED; BOOLEAN canGrant = FALSE; RTI_DeliverROEvents(); core_Compute_Local_Min(); /* This helps validate timestamps on outgoing messages */ switch (CRTI.PendingRequest) { case FQR: { /* Always flush messages when a FQR is pending */ if ((status = core_FlushQueueRequestDelivery()) != fdkSUCCEEDED) return (status); /* * We need to issue a grant as soon as is possible with an FQR * however, we cannot grant to a time which would violate an * LBTS guarantee for an LBTS computation in progress. * * Therefore, if LBTS is far enough to permit a grant MinDelivered, * we go ahead. else, we wait for LBTS to complete. * * Since we just did a flush, there should be NO messages in * either the TSO queue or the retract heap. For now we will * verify this explicitly. */ if (TM_LT(core_MinRetract(),TM_IDENT) ) { fprintf(cd,"core_DoEventDelivery: "); fprintf(cd,"retraction heap did not empty in FQR delivery! \n"); fflush(cd); exit(1); } if (TM_LT(RTI_TSOMin(),TM_IDENT) ) { fprintf(cd,"core_DoEventDelivery: "); fprintf(cd,"TSO Queue did not empty in FQR delivery! \n"); fflush(cd); exit(1); } if (TM_GT(CRTI.LBTS, CRTI.MinDelivered) || (TM_EQ(CRTI.LBTS, CRTI.MinDelivered) && CRTI.LBTSQual == TM_TIME_QUAL_EXCL ) ) { /* * LBTS is beyond the release time, * We can grant appropriately */ AllowableReduction = TM_Sub(CRTI.MinDelivered, CRTI.CurrentTime); CRTI.CurrentQual = TM_TIME_QUAL_EXCL; CRTI.CurrentTime = CRTI.MinDelivered; canGrant = TRUE; } else if (CRTI.NPendingLBTS == 0) { /* * No Pending LBTS Computation, * We can grant up to LBTS if LBTSQual = EXCL */ if (CRTI.LBTSQual == TM_TIME_QUAL_EXCL) { AllowableReduction = TM_Sub(CRTI.LBTS, CRTI.CurrentTime); CRTI.CurrentTime = CRTI.LBTS; CRTI.CurrentQual = TM_TIME_QUAL_EXCL ; canGrant = TRUE; } } else { /* else we just have to wait */ if (CRTI.debug > 1) { fprintf(cd,"core_DoEventDelivery: "); fprintf(cd,"could not grant during tick.\n"); fflush(cd); } } } break; case NER: /* if an NER is pending, try to release it */ { if ( core_TryToRelease(&ReleaseTime, &ReleaseQual) ) { /* if we delivered messages, we know we can TAG */ AllowableReduction = TM_Sub(ReleaseTime, CRTI.CurrentTime); CRTI.CurrentTime = ReleaseTime; CRTI.CurrentQual = ReleaseQual; canGrant = TRUE; } else if ( TM_GT(CRTI.LBTS, CRTI.PendingTime) || ( TM_EQ(CRTI.LBTS, CRTI.PendingTime) && ( CRTI.LBTSQual >= CRTI.PendingQual ) ) ) { /* No messages, but we can grant all the way */ AllowableReduction = TM_Sub(CRTI.PendingTime, CRTI.CurrentTime); CRTI.CurrentTime = CRTI.PendingTime; CRTI.CurrentQual = CRTI.PendingQual; canGrant = TRUE; } else { /* still can't release; need to wait for new LBTS value */ if ((status = core_StartLBTS()) != fdkSUCCEEDED) return(status); } } break; case NERA: /* if an NERA is pending, try to release it */ { if ( core_TryToRelease(&ReleaseTime, &ReleaseQual) ) { /* if we delivered messages, we know we can TAG */ AllowableReduction = TM_Sub(ReleaseTime, CRTI.CurrentTime); CRTI.CurrentTime = ReleaseTime; CRTI.CurrentQual = ReleaseQual; canGrant = TRUE; } else if ( TM_GT(CRTI.LBTS, CRTI.PendingTime) || ( TM_EQ(CRTI.LBTS, CRTI.PendingTime) && ( CRTI.LBTSQual >= CRTI.PendingQual ) ) ) { /* No messages, but we can grant all the way */ AllowableReduction = TM_Sub(CRTI.PendingTime, CRTI.CurrentTime); CRTI.CurrentTime = CRTI.PendingTime; CRTI.CurrentQual = CRTI.PendingQual; canGrant = TRUE; } else { /* still can't release; need to wait for new LBTS value */ if ((status = core_StartLBTS()) != fdkSUCCEEDED) return(status); } } break; case TAR: { if ( TM_GT(CRTI.LBTS, CRTI.PendingTime) || ( TM_EQ(CRTI.LBTS, CRTI.PendingTime) && ( CRTI.LBTSQual >= CRTI.PendingQual ) ) ) { AllowableReduction = TM_Sub(CRTI.PendingTime,CRTI.CurrentTime); CRTI.CurrentTime = CRTI.PendingTime; CRTI.CurrentQual = CRTI.PendingQual; RTI_DeliverTSOEvents(CRTI.CurrentTime); canGrant = TRUE; } else /* still can't release; need to wait for new LBTS value */ { RTI_DeliverTSOEvents (CRTI.LBTS); if ((status = core_StartLBTS()) != fdkSUCCEEDED) return(status); } } break; case TARA: { if ( TM_GE(CRTI.LBTS, CRTI.PendingTime) ) { AllowableReduction = TM_Sub(CRTI.PendingTime,CRTI.CurrentTime); CRTI.CurrentTime = CRTI.PendingTime; CRTI.CurrentQual = CRTI.PendingQual; RTI_DeliverTSOEvents(CRTI.CurrentTime); canGrant = TRUE; } else /* still can't release; need to wait for new LBTS value */ { RTI_DeliverTSOEvents (CRTI.LBTS); if ((status = core_StartLBTS()) != fdkSUCCEEDED) return(status); } } break; default: { /* No request pending */ } break; } if (canGrant) { if (TM_GT(AllowableReduction, TM_Sub(CRTI.LookAhead, CRTI.TargetLA))) { CRTI.LookAhead = CRTI.TargetLA; } else { CRTI.LookAhead = TM_Sub(CRTI.LookAhead, AllowableReduction ); } CRTI.PendingRequest = NO_REQ; if (CRTI.debug > 1) { fprintf(cd,"core_DoEventDelivery: Granting to %16.10g \n", CRTI.CurrentTime); fprintf(cd,"PendingTime is %16.10f \n",CRTI.PendingTime); fprintf(cd,"LookAhead is %16.10f \n",CRTI.LookAhead); fprintf(cd,"TargetLA is %16.10f \n",CRTI.TargetLA); fprintf(cd,"MinDelivered is %16.10f\n",CRTI.MinDelivered); fprintf(cd,"LBTS is %16.10f\n",CRTI.LBTS); fprintf(cd,"LBTSQual is %d\n",CRTI.LBTSQual); fflush(cd); if (TM_LT(TM_Add(CRTI.CurrentTime, CRTI.LookAhead),CRTI.LBTS)) { fprintf(cd,"core_DoEventDelivery: Warning TAG violates LBTS \n"); fflush(cd); } if ( TM_isZero(CRTI.LookAhead) && TM_EQ(CRTI.CurrentTime,CRTI.LBTS) && (CRTI.CurrentQual < CRTI.LBTSQual)) { fprintf(cd,"core_DoEventDelivery: Warning! TAG violates LBTSQual \n"); fflush(cd); } } RTI_TimeAdvanceGrant(CRTI.CurrentTime); } /* If I made it this far, I must be ok */ return(fdkSUCCEEDED);}/*----------------------------------------------------------------------------*//* * core_FlushQueueRequestDelivery() * * This function is called to flush the event queue. It may be called multiple * times if a grant cannot be issued immediately. Since we need to hold on to * the value of the minimum event delivered, we have a special way to do this * for FQR. MinDelivered. We could as easily just keep current time up to date, * but this has been useful in debugging FQR behavior. * * notes: * a better way to work with the Retraction manager would be to note messages * that were delivered "unsafe". These cannot be guaranteed to have been anialated * withing the RTI. Instead, we are stuck just sending all the remaining retracts. * we can, if we want to, compare minRetract with minDelivered, but that doesn't * but us much * */ fdkErrorCode core_FlushQueueRequestDelivery(){ fdkErrorCode status = fdkSUCCEEDED; void *Data; TM_Time minTS; long MsgSize, MsgType; /* * First we see what the least message to be delivered from * the TSO Queue will be. * This may become the grant time. */ minTS = RTI_TSOMin(); /* * Deliver all TSO messages. */ while ((Data = RTI_TSOPop(&minTS,&MsgSize,&MsgType)) != NULL) { RTI_DeliverOneMessage(minTS, Data, MsgSize, MsgType); } /* * We also check the remaining retractions to ensure we allow * a federate to issue subsequent retractions for the same logical * time. * * Since delivery of messages, above happens after anialation, we * have to check the retraction queue AFTER we do the delivery * */ minTS = TM_Min(minTS, core_MinRetract()); CRTI.MinDelivered = TM_Min(minTS, CRTI.MinDelivered) ; status = core_SendRemainingRetracts(); if (status != fdkSUCCEEDED) { if (1 || CRTI.debug > 0) { fprintf(cd,"core_FlushQueueRequestDelivery: Error! "); fprintf(cd,"core_SendRemainingRetracts failed.\n");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -