📄 tmred.c
字号:
/*----------------------------------------------------------------------------*//* The "all-new" TM implementation! *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 01Jun2000 *//* Revised to use the new hybrid/combination TM. *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 26Nov2003 *//* $Revision: 1.12 $ $Name: v26apr05 $ $Date: 2004/11/04 21:45:12 $ *//*----------------------------------------------------------------------------*/#include <stdlib.h>#include <string.h>#include <stdio.h>#include "mycompat.h"#include "fm.h"#include "tm.h"#include "rm.h"/*----------------------------------------------------------------------------*/struct RVALUE_TYPE_STRUCT{ TM_Time val; /*Timestamp being reduced*/ TM_TimeQual qual; /*Timestamp qualification: see TM_TimeQual definition*/ long nsent, nrecd; /*Number of timestamped messages sent/received*/ #if !PLATFORM_WIN long padding; /*Compatibility with size generated by Windows compiler*/ #endif};/*----------------------------------------------------------------------------*/static void hton_RVALUE_TYPE( RVALUE_TYPE *v ) { (v)->nsent = htonl((v)->nsent); (v)->nrecd = htonl((v)->nrecd); }static void ntoh_RVALUE_TYPE( RVALUE_TYPE *v ) { (v)->nsent = ntohl((v)->nsent); (v)->nrecd = ntohl((v)->nrecd); }/*----------------------------------------------------------------------------*/static RVALUE_TYPE *rv_new( void ) { return malloc( sizeof( RVALUE_TYPE ) ); }static void rv_delete( RVALUE_TYPE *v ) { free( v ); }static void rv_init(RVALUE_TYPE *a) { if(a){ (a)->val = TM_IDENT; (a)->qual = TM_TIME_QUAL_INCL; (a)->nsent = (a)->nrecd = 0; } }static void rv_assign( RVALUE_TYPE *a, RVALUE_TYPE *b ) { if((a)&&(b)) *(a) = *(b); }static void rv_reduce( RVALUE_TYPE *a, RVALUE_TYPE *b) { if((a)&&(b)) { if(TM_LT((b)->val, (a)->val) || (TM_EQ((b)->val, (a)->val) && ((b)->qual < (a)->qual)) ) { (a)->val = (b)->val; (a)->qual = (b)->qual; } (a)->nsent += (b)->nsent; (a)->nrecd += (b)->nrecd; } }static void rv_set( RVALUE_TYPE *a, TM_Time v, TM_TimeQual qual, int ns, int nr) { if(a) { (a)->val = v; (a)->qual = qual; (a)->nsent = ns; (a)->nrecd = nr; } }static void rv_print( FILE *fp, RVALUE_TYPE *a ) { if(!a) return; fprintf((fp), "<"); if(TM_GE((a)->val,TM_IDENT)) fprintf((fp), "Infinity"); else fprintf((fp),"%f (%s)", TM_TS((a)->val),TM_TIME_QUAL_STR((a)->qual)); fprintf( (fp), ", %ld, %ld>", (a)->nsent, (a)->nrecd ); }/*----------------------------------------------------------------------------*/static RVALUE_CLASS rv_class ={ rv_new, rv_delete, rv_init, rv_assign, rv_reduce, rv_print};/*----------------------------------------------------------------------------*/enum { RM_START_MSG, RM_VALUE_MSG };/*----------------------------------------------------------------------------*/typedef struct TMMesgStruct{ long ssn; /*ID# of snapshot to which this mesg belongs*/ long trial; /*Trial number within this snapshot*/ long type; /*Is it Start/Value message?*/ long from_pe, to_pe; /*From & To which processor?*/ #if !PLATFORM_WIN long padding; /*Compatibility with size generated by Windows compiler*/ #endif RVALUE_TYPE val; /*If Value message, contains the value*/ TM_Time old_lbts; /*LBTS computed in previous snapshot/epoch*/ TM_TimeQual old_qual; /*LBTS qualification in previous snapshot/epoch*/ struct TMMesgStruct *next; /*Scratch for linking msg buffers*/} TMMesg;/*----------------------------------------------------------------------------*/#define hton_TMMesg(/*TMMesg **/m) \ do{ \ (m)->ssn = htonl((m)->ssn); \ (m)->trial = htonl((m)->trial); \ (m)->type = htonl((m)->type); \ (m)->from_pe = htonl((m)->from_pe); \ (m)->to_pe = htonl((m)->to_pe); \ hton_RVALUE_TYPE(&(m)->val); \ hton_TM_Time(&(m)->old_lbts); \ hton_TM_TimeQual(&(m)->old_qual); \ }while(0)#define ntoh_TMMesg(/*TMMesg **/m) \ do{ \ (m)->ssn = ntohl((m)->ssn); \ (m)->trial = ntohl((m)->trial); \ (m)->type = ntohl((m)->type); \ (m)->from_pe = ntohl((m)->from_pe); \ (m)->to_pe = ntohl((m)->to_pe); \ ntoh_RVALUE_TYPE(&(m)->val); \ ntoh_TM_Time(&(m)->old_lbts); \ ntoh_TM_TimeQual(&(m)->old_qual); \ }while(0)/*----------------------------------------------------------------------------*//*----------------------------------------------------------------------------*/typedef struct{ long ID; /*Current epoch number (same as next snapshot ID)*/ long nsent, nrecd; /*Event counters for next (immediate) snapshot*/ /*Special case data; see TMMRed_In()*/ long next_id; /*ID of next epoch*/ long next_nrecd; /*Number of incoming messages in the next epoch*/} Epoch;/*----------------------------------------------------------------------------*/typedef struct{ int active; /*Is this snapshot computation in progress now? */ long ID; /*Active (or most recently completed) snapshot number*/ long trial; /*Trial number within active snapshot computation*/ RMUserHandle rh; /*Handle for the reduction service*/ RVALUE_TYPE value; /*Reduction value reported in currently active trial*/ RVALUE_TYPE transients; /*Transient msgs of this snapshot accumulated here*/ struct { int do_timeout; /*Should timeouts be turned on or off?*/ int counter; /*#ticks so far, before starting to check timeout*/ int max_count; /*Start checking for timeout time after this many ticks*/ TIMER_TYPE start;/*When did this snapshot computation start?*/ double period; /*Timeout this snapshot after this many seconds*/ } timeout;} Snapshot;/*----------------------------------------------------------------------------*/typedef struct{ TM_Time LBTS; /*Most recently known value; updated via snapshots*/ TM_TimeQual qual; /*Qualification of recent LBTS; see TM_TimeQual*/ TM_LBTSStartedProc sproc;/*Callback for getting this PE's snap shot value*/ TM_LBTSDoneProc *dproc; /*List of callbacks waiting for new LBTS value*/ int max_dproc; /*Limit on #callbacks that can wait for new LBTS*/ int n_dproc; /*Actual #callbacks waiting for new LBTS*/} LBTSInfo;/*----------------------------------------------------------------------------*/typedef struct{ unsigned int fmh; /*FM handle ID*/ int use_udp; /*Should UDP used for TM messages?*/ TMMesg *buffered_msgs; /*Msgs for next epoch/trial, but arrived early*/ TMMesg *free_msgs; /*Free pool of message buffers*/ double loss_prob; /*Probability of losing any incoming TM message*/ double msg_delay; /*Average network latency for any TM message*/} Communication;/*----------------------------------------------------------------------------*/typedef struct{ TIMER_TYPE start; /*When did TMMRed_Init() end?*/ long nlbts; /*Total number of LBTS computations so far*/ struct { long max; /*Max #trials per LBTS completion*/ long tot; /*Sum total of #trials across all LBTS computations*/ } trial;} Statistics;/*----------------------------------------------------------------------------*/typedef struct{ int debug; /*Debugging level*/ int myid; /*My processor ID*/ int N; /*Total number of processors*/ Epoch epoch; /*Current (active) epoch information*/ Snapshot sshot; /*Active (or most recently completed) snapshot*/ LBTSInfo lbts; /*How LBTS values must be acquired/stored/reported*/ Communication comm; /*Message send/receive/buffer information*/ Statistics stats; /*Self-explanatory*/} TMState;/*----------------------------------------------------------------------------*//* Global TM state, and cached pointers into the global TM state *//* In multi-threaded implementation, pass TM state as argument to API calls, *//* and move the cached pointers into the functions (as local variables). *//*----------------------------------------------------------------------------*/static TMState tm_state;static TMState *st;static Epoch *epoch;static Snapshot *sshot;static LBTSInfo *lbts;static Communication *comm;static Statistics *stats;/*----------------------------------------------------------------------------*//*----------------------------------------------------------------------------*/static int recv_msg( FM_stream *strm, unsigned senderID );static void send_start_msg( RMUserHandle usr, void *closure, int from_pe, int to_pe );static void send_value_msg( RMUserHandle usr, void *closure, int from_pe, int to_pe, RVALUE_TYPE *v );static void continue_active_reduction( void );static int deliver_buffered_msgs( void );static void move_to_next_sshot_trial( void );static void printstats( void );static void printstate( void );/*----------------------------------------------------------------------------*/static void TMMRed_Init(TMM_Closure closure, TMM_LBTSStartedProc sproc){ char *dbgstr = getenv("TMRED_DEBUG"); /* Integer [1,inf] */ char *lossstr= getenv("TMRED_LPROB"); /* Double [0.0,1.0] */ char *delaystr= getenv("TMRED_MSGDELAY"); /* Double [0.0,inf] secs */ char *useudpstr= getenv("TMRED_USEUDP"); /* String TRUE or FALSE */ MYASSERT( closure == &tm_state, ("Closures should match") ); st = &tm_state; epoch = &st->epoch; sshot = &st->sshot; lbts = &st->lbts; comm = &st->comm; stats = &st->stats; st->debug = dbgstr?atoi(dbgstr):0; st->myid = (int) FM_nodeid; st->N = (int) FM_numnodes; epoch->ID = 0; epoch->nsent = 0; epoch->nrecd = 0; epoch->next_id = 1; epoch->next_nrecd = 0; sshot->active = FALSE; sshot->ID = -1; sshot->trial = -1; sshot->rh = rm_register( st->N, st->myid, MAX_PE, &rv_class ); sshot->timeout.do_timeout= TRUE; sshot->timeout.counter = 0; sshot->timeout.max_count = 10; sshot->timeout.period = 2.0; /*Updated later*/ lbts->LBTS = TM_ZERO; lbts->qual = TM_TIME_QUAL_INCL; lbts->sproc = sproc; lbts->max_dproc = 1000; lbts->n_dproc = 0; lbts->dproc = (TM_LBTSDoneProc *)malloc( sizeof(TM_LBTSDoneProc)*lbts->max_dproc); comm->fmh = -1; comm->use_udp = (useudpstr&&!strcmp(useudpstr,"TRUE")); comm->buffered_msgs = 0; comm->free_msgs = 0; comm->loss_prob = lossstr?atof(lossstr):0.0; comm->msg_delay = delaystr?atof(delaystr):0.5;if( comm->loss_prob <= 0 ) sshot->timeout.do_timeout = FALSE; stats->nlbts = 0; stats->trial.max = 1; stats->trial.tot = 0; rv_init( &sshot->value ); rv_init( &sshot->transients ); FML_RegisterHandler( &comm->fmh, recv_msg ); sshot->timeout.period = 2*(st->N-1)*comm->msg_delay;/*Worst case per trial*/ FML_Barrier(); TIMER_NOW(stats->start);if(getenv("TMRED_UDPALWAYS")){FM_SetTransport(FM_TRANSPORT_UNRELIABLE);comm->use_udp=1;printf("--TMRED_UDPALWAYS---\n");fflush(stdout);}if(st->debug)printf("-- PE %d: TMRED_LPROB=%g, TMRED_MSGDELAY=%g\n", st->myid, comm->loss_prob, comm->msg_delay);if(st->debug>=0){printf("%d: TMRed initialized.\n",st->myid);fflush(stdout);}}/*----------------------------------------------------------------------------*/static void TMMRed_SetLBTSStartProc( TMM_Closure closure, TM_LBTSStartedProc started_proc ){ lbts->sproc = started_proc;}/*----------------------------------------------------------------------------*/static long TMMRed_CurrentEpoch( TMM_Closure closure ){ return epoch->ID;}/*----------------------------------------------------------------------------*/static void TMMRed_Recent_LBTS( TMM_Closure closure, TM_Time *pts ){ if( pts ) *pts = lbts->LBTS;}/*----------------------------------------------------------------------------*/static long TMMRed_StartLBTSSnapShot( TM_Time min_ts, TM_TimeQual qual, TM_LBTSDoneProc done_proc, long *ptrans, long expected_sshot_id, long trial_num ){ static char tmstr1[1000],tmstr2[1000];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -