⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tmred.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
📖 第 1 页 / 共 3 页
字号:
/*----------------------------------------------------------------------------*//* The "all-new" TM implementation!                                           *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 01Jun2000   *//* Revised to use the new hybrid/combination TM.                              *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 26Nov2003   *//* $Revision: 1.12 $ $Name: v26apr05 $ $Date: 2004/11/04 21:45:12 $ *//*----------------------------------------------------------------------------*/#include <stdlib.h>#include <string.h>#include <stdio.h>#include "mycompat.h"#include "fm.h"#include "tm.h"#include "rm.h"/*----------------------------------------------------------------------------*/struct RVALUE_TYPE_STRUCT{    TM_Time val;        /*Timestamp being reduced*/    TM_TimeQual qual;   /*Timestamp qualification: see TM_TimeQual definition*/    long nsent, nrecd;  /*Number of timestamped messages sent/received*/        #if !PLATFORM_WIN            long padding; /*Compatibility with size generated by Windows compiler*/        #endif};/*----------------------------------------------------------------------------*/static void hton_RVALUE_TYPE( RVALUE_TYPE *v )    { (v)->nsent = htonl((v)->nsent); (v)->nrecd = htonl((v)->nrecd); }static void ntoh_RVALUE_TYPE( RVALUE_TYPE *v )    { (v)->nsent = ntohl((v)->nsent); (v)->nrecd = ntohl((v)->nrecd); }/*----------------------------------------------------------------------------*/static RVALUE_TYPE *rv_new( void )    { return malloc( sizeof( RVALUE_TYPE ) ); }static void rv_delete( RVALUE_TYPE *v )    { free( v ); }static void rv_init(RVALUE_TYPE *a)    { if(a){ (a)->val = TM_IDENT; (a)->qual = TM_TIME_QUAL_INCL;             (a)->nsent = (a)->nrecd = 0; } }static void rv_assign( RVALUE_TYPE *a, RVALUE_TYPE *b )    { if((a)&&(b)) *(a) = *(b); }static void rv_reduce( RVALUE_TYPE *a, RVALUE_TYPE *b)    { if((a)&&(b))        { if(TM_LT((b)->val, (a)->val) ||             (TM_EQ((b)->val, (a)->val) && ((b)->qual < (a)->qual)) )                 { (a)->val = (b)->val; (a)->qual = (b)->qual; }          (a)->nsent += (b)->nsent; (a)->nrecd += (b)->nrecd; } }static void rv_set( RVALUE_TYPE *a, TM_Time v, TM_TimeQual qual, int ns, int nr)    { if(a) { (a)->val = v; (a)->qual = qual;              (a)->nsent = ns; (a)->nrecd = nr; } }static void rv_print( FILE *fp, RVALUE_TYPE *a )    { if(!a) return;      fprintf((fp), "<");      if(TM_GE((a)->val,TM_IDENT)) fprintf((fp), "Infinity");      else fprintf((fp),"%f (%s)", TM_TS((a)->val),TM_TIME_QUAL_STR((a)->qual));      fprintf( (fp), ", %ld, %ld>", (a)->nsent, (a)->nrecd );    }/*----------------------------------------------------------------------------*/static RVALUE_CLASS rv_class ={    rv_new, rv_delete, rv_init, rv_assign, rv_reduce, rv_print};/*----------------------------------------------------------------------------*/enum { RM_START_MSG, RM_VALUE_MSG };/*----------------------------------------------------------------------------*/typedef struct TMMesgStruct{    long ssn;                  /*ID# of snapshot to which this mesg belongs*/    long trial;                /*Trial number within this snapshot*/    long type;                 /*Is it Start/Value message?*/    long from_pe, to_pe;       /*From & To which processor?*/        #if !PLATFORM_WIN            long padding; /*Compatibility with size generated by Windows compiler*/        #endif    RVALUE_TYPE val;           /*If Value message, contains the value*/    TM_Time old_lbts;          /*LBTS computed in previous snapshot/epoch*/    TM_TimeQual old_qual;      /*LBTS qualification in previous snapshot/epoch*/    struct TMMesgStruct *next; /*Scratch for linking msg buffers*/} TMMesg;/*----------------------------------------------------------------------------*/#define hton_TMMesg(/*TMMesg **/m) \    do{ \        (m)->ssn = htonl((m)->ssn); \        (m)->trial = htonl((m)->trial); \        (m)->type = htonl((m)->type); \        (m)->from_pe = htonl((m)->from_pe); \        (m)->to_pe = htonl((m)->to_pe); \        hton_RVALUE_TYPE(&(m)->val); \        hton_TM_Time(&(m)->old_lbts); \        hton_TM_TimeQual(&(m)->old_qual); \    }while(0)#define ntoh_TMMesg(/*TMMesg **/m) \    do{ \        (m)->ssn = ntohl((m)->ssn); \        (m)->trial = ntohl((m)->trial); \        (m)->type = ntohl((m)->type); \        (m)->from_pe = ntohl((m)->from_pe); \        (m)->to_pe = ntohl((m)->to_pe); \        ntoh_RVALUE_TYPE(&(m)->val); \        ntoh_TM_Time(&(m)->old_lbts); \        ntoh_TM_TimeQual(&(m)->old_qual); \    }while(0)/*----------------------------------------------------------------------------*//*----------------------------------------------------------------------------*/typedef struct{    long ID;            /*Current epoch number (same as next snapshot ID)*/    long nsent, nrecd;  /*Event counters for next (immediate) snapshot*/    /*Special case data; see TMMRed_In()*/    long next_id;       /*ID of next epoch*/    long next_nrecd;    /*Number of incoming messages in the next epoch*/} Epoch;/*----------------------------------------------------------------------------*/typedef struct{    int active;        /*Is this snapshot computation in progress now? */    long ID;           /*Active (or most recently completed) snapshot number*/    long trial;        /*Trial number within active snapshot computation*/    RMUserHandle rh;   /*Handle for the reduction service*/    RVALUE_TYPE value; /*Reduction value reported in currently active trial*/    RVALUE_TYPE transients; /*Transient msgs of this snapshot accumulated here*/    struct    {      int do_timeout;  /*Should timeouts be turned on or off?*/      int counter;     /*#ticks so far, before starting to check timeout*/      int max_count;   /*Start checking for timeout time after this many ticks*/      TIMER_TYPE start;/*When did this snapshot computation start?*/      double period;   /*Timeout this snapshot after this many seconds*/    } timeout;} Snapshot;/*----------------------------------------------------------------------------*/typedef struct{    TM_Time LBTS;           /*Most recently known value; updated via snapshots*/    TM_TimeQual qual;       /*Qualification of recent LBTS; see TM_TimeQual*/    TM_LBTSStartedProc sproc;/*Callback for getting this PE's snap shot value*/    TM_LBTSDoneProc *dproc; /*List of callbacks waiting for new LBTS value*/    int max_dproc;          /*Limit on #callbacks that can wait for new LBTS*/    int n_dproc;            /*Actual #callbacks waiting for new LBTS*/} LBTSInfo;/*----------------------------------------------------------------------------*/typedef struct{    unsigned int fmh;       /*FM handle ID*/    int use_udp;            /*Should UDP used for TM messages?*/    TMMesg *buffered_msgs;  /*Msgs for next epoch/trial, but arrived early*/    TMMesg *free_msgs;      /*Free pool of message buffers*/    double loss_prob;       /*Probability of losing any incoming TM message*/    double msg_delay;       /*Average network latency for any TM message*/} Communication;/*----------------------------------------------------------------------------*/typedef struct{    TIMER_TYPE start; /*When did TMMRed_Init() end?*/    long nlbts;        /*Total number of LBTS computations so far*/    struct    {        long max;      /*Max #trials per LBTS completion*/        long tot;      /*Sum total of #trials across all LBTS computations*/    } trial;} Statistics;/*----------------------------------------------------------------------------*/typedef struct{    int debug;              /*Debugging level*/    int myid;               /*My processor ID*/    int N;                  /*Total number of processors*/    Epoch epoch;            /*Current (active) epoch information*/    Snapshot sshot;         /*Active (or most recently completed) snapshot*/    LBTSInfo lbts;          /*How LBTS values must be acquired/stored/reported*/    Communication comm;     /*Message send/receive/buffer information*/    Statistics stats;       /*Self-explanatory*/} TMState;/*----------------------------------------------------------------------------*//* Global TM state, and cached pointers into the global TM state              *//* In multi-threaded implementation, pass TM state as argument to API calls,  *//* and move the cached pointers into the functions (as local variables).      *//*----------------------------------------------------------------------------*/static TMState tm_state;static TMState *st;static Epoch *epoch;static Snapshot *sshot;static LBTSInfo *lbts;static Communication *comm;static Statistics *stats;/*----------------------------------------------------------------------------*//*----------------------------------------------------------------------------*/static int recv_msg( FM_stream *strm, unsigned senderID );static void send_start_msg( RMUserHandle usr, void *closure,                            int from_pe, int to_pe );static void send_value_msg( RMUserHandle usr, void *closure,                            int from_pe, int to_pe, RVALUE_TYPE *v );static void continue_active_reduction( void );static int deliver_buffered_msgs( void );static void move_to_next_sshot_trial( void );static void printstats( void );static void printstate( void );/*----------------------------------------------------------------------------*/static void TMMRed_Init(TMM_Closure closure, TMM_LBTSStartedProc sproc){    char *dbgstr = getenv("TMRED_DEBUG");     /* Integer [1,inf] */    char *lossstr= getenv("TMRED_LPROB");     /* Double  [0.0,1.0] */    char *delaystr= getenv("TMRED_MSGDELAY"); /* Double  [0.0,inf] secs */    char *useudpstr= getenv("TMRED_USEUDP");  /* String TRUE or FALSE */    MYASSERT( closure == &tm_state, ("Closures should match") );    st = &tm_state;    epoch = &st->epoch;    sshot = &st->sshot;    lbts = &st->lbts;    comm = &st->comm;    stats = &st->stats;    st->debug            = dbgstr?atoi(dbgstr):0;    st->myid             = (int) FM_nodeid;    st->N                = (int) FM_numnodes;    epoch->ID            = 0;    epoch->nsent         = 0;    epoch->nrecd         = 0;    epoch->next_id       = 1;    epoch->next_nrecd    = 0;    sshot->active        = FALSE;    sshot->ID            = -1;    sshot->trial         = -1;    sshot->rh            = rm_register( st->N, st->myid, MAX_PE, &rv_class );    sshot->timeout.do_timeout= TRUE;    sshot->timeout.counter   = 0;    sshot->timeout.max_count = 10;    sshot->timeout.period    = 2.0; /*Updated later*/    lbts->LBTS           = TM_ZERO;    lbts->qual           = TM_TIME_QUAL_INCL;    lbts->sproc          = sproc;    lbts->max_dproc      = 1000;    lbts->n_dproc        = 0;    lbts->dproc          = (TM_LBTSDoneProc *)malloc(                           sizeof(TM_LBTSDoneProc)*lbts->max_dproc);    comm->fmh            = -1;    comm->use_udp        = (useudpstr&&!strcmp(useudpstr,"TRUE"));    comm->buffered_msgs  = 0;    comm->free_msgs      = 0;    comm->loss_prob      = lossstr?atof(lossstr):0.0;    comm->msg_delay      = delaystr?atof(delaystr):0.5;if( comm->loss_prob <= 0 ) sshot->timeout.do_timeout = FALSE;    stats->nlbts         = 0;    stats->trial.max     = 1;    stats->trial.tot     = 0;    rv_init( &sshot->value );    rv_init( &sshot->transients );    FML_RegisterHandler( &comm->fmh, recv_msg );    sshot->timeout.period = 2*(st->N-1)*comm->msg_delay;/*Worst case per trial*/    FML_Barrier();    TIMER_NOW(stats->start);if(getenv("TMRED_UDPALWAYS")){FM_SetTransport(FM_TRANSPORT_UNRELIABLE);comm->use_udp=1;printf("--TMRED_UDPALWAYS---\n");fflush(stdout);}if(st->debug)printf("-- PE %d: TMRED_LPROB=%g, TMRED_MSGDELAY=%g\n", st->myid, comm->loss_prob, comm->msg_delay);if(st->debug>=0){printf("%d: TMRed initialized.\n",st->myid);fflush(stdout);}}/*----------------------------------------------------------------------------*/static void TMMRed_SetLBTSStartProc( TMM_Closure closure,                TM_LBTSStartedProc started_proc ){    lbts->sproc = started_proc;}/*----------------------------------------------------------------------------*/static long TMMRed_CurrentEpoch( TMM_Closure closure ){    return epoch->ID;}/*----------------------------------------------------------------------------*/static void TMMRed_Recent_LBTS( TMM_Closure closure, TM_Time *pts ){    if( pts ) *pts = lbts->LBTS;}/*----------------------------------------------------------------------------*/static long TMMRed_StartLBTSSnapShot( TM_Time min_ts, TM_TimeQual qual,    TM_LBTSDoneProc done_proc, long *ptrans,    long expected_sshot_id, long trial_num ){	static char tmstr1[1000],tmstr2[1000];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -