📄 tmnull.c
字号:
/*----------------------------------------------------------------------------*//* A NULL-message algorithm ala CMB. *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 09Dec2003 *//*Based on experience from work reported in Park, Fujimoto & Perumalla(PADS04)*//* $Revision: 1.9 $ $Name: v26apr05 $ $Date: 2005/02/03 14:56:23 $ *//*----------------------------------------------------------------------------*/#include <stdlib.h>#include <string.h>#include <stdio.h>#include "fm.h"#include "tm.h"/*----------------------------------------------------------------------------*/typedef struct TMMNullMesgStruct{ long from_pe; /*Which processor sent this?*/ TM_Time lbts; /*LBTS from from_pe to this pe*/ TM_TimeQual qual; /*LBTS qual*/} TMMNullMesg;/*----------------------------------------------------------------------------*/#define hton_TMMNullMesg(/*TMMNullMesg **/m) \ do{ \ (m)->from_pe = htonl((m)->from_pe); \ hton_TM_Time(&(m)->lbts); \ hton_TM_TimeQual(&(m)->qual); \ }while(0)#define ntoh_TMMNullMesg(/*TMMNullMesg **/m) \ do{ \ (m)->from_pe = ntohl((m)->from_pe); \ ntoh_TM_Time(&(m)->lbts); \ ntoh_TM_TimeQual(&(m)->qual); \ }while(0)/*----------------------------------------------------------------------------*/typedef struct{ int pe; TM_Time lbts; TM_TimeQual qual;} ConnectionInfo;typedef struct{ int n; ConnectionInfo conn[MAX_PE];} ConnectionList;typedef ConnectionList Inputs, Outputs;/*----------------------------------------------------------------------------*/typedef struct{ long ID; /*Current (active) epoch identifier*/ TM_Time LBTS; /*Most recently known value*/ TM_TimeQual qual; /*Qualification of recent LBTS; see TM_TimeQual*/ TM_LBTSStartedProc sproc;/*Callback to get this PE's current guarantee*/ TM_LBTSDoneProc *dproc; /*List of callbacks waiting for new LBTS value*/ int max_dproc; /*Limit on #callbacks that can wait for new LBTS*/ int n_dproc; /*Actual #callbacks waiting for new LBTS*/} LBTSInfo;/*----------------------------------------------------------------------------*/typedef struct{ int updated; /*Has the status changed in some way?*/ int inprogress; /*Is a null-mesg activity already in progress?*/ TM_Time transient_ts; /*Timestamps of events received while inprogress*/} Computation;/*----------------------------------------------------------------------------*/typedef struct{ struct { int on; /*Piggyback LBTS onto events?*/ } piggyback_lbts; struct { int on; /*Send nulls more often than for deadlock-avoidance?*/ int count; /*How many idle ticks happened with no null msg exchange*/ int freq; /*Send null msgs every this many idle ticks*/ } aggressive_sends;} Options;/*----------------------------------------------------------------------------*/typedef struct{ TIMER_TYPE start; /*When did TMMNull_Init() end?*/ struct { long nsent; /*Total number of messages sent*/ long nrecd; /*Total number of messages received*/ } nulls, events;} Statistics;/*----------------------------------------------------------------------------*/typedef struct{ int debug; /*Debugging level*/ int myid; /*My processor ID*/ int N; /*Total number of processors*/ unsigned int fmh; /*FM handle ID*/ Inputs in; /*Incoming connections*/ Outputs out; /*Outgoing connections*/ LBTSInfo lbts; /*How LBTS values must be acquired/stored/reported*/ Computation comp; /*Status of distributed null-mesg computation*/ Options opt; /*Tunable parameters*/ Statistics stats; /*Self-explanatory*/} TMState;/*----------------------------------------------------------------------------*//* Global TM state, and cached pointers into the global TM state *//* In multi-threaded implementation, pass TM state as argument to API calls, *//* and move the cached pointers into the functions (as local variables). *//*----------------------------------------------------------------------------*/static TMState tm_state, *st;static LBTSInfo *lbts;static Computation *comp;static Options *opt;static Statistics *stats;/*----------------------------------------------------------------------------*/static int TMMNull_RecvNullMesg( FM_stream *strm, unsigned senderID );/*----------------------------------------------------------------------------*/static void TMMNull_ConfigTopology( void ){ int i = 0; for( i = 0, st->in.n = TM_TopoGetNumSenders(); i < st->in.n; i++ ) { ConnectionInfo *conn = &st->in.conn[i]; int pe = TM_TopoGetSender(i); conn->pe = pe; conn->lbts = TM_ZERO; conn->qual = TM_TIME_QUAL_INCL;if(st->debug>2){printf("SenderPE[%d]=%d\n",i,pe);fflush(stdout);} } for( i = 0, st->out.n = TM_TopoGetNumReceivers(); i < st->out.n; i++ ) { ConnectionInfo *conn = &st->out.conn[i]; int pe = TM_TopoGetReceiver(i); conn->pe = pe; conn->lbts = TM_ZERO; conn->qual = TM_TIME_QUAL_INCL;if(st->debug>2){printf("ReceiverPE[%d]=%d\n",i,pe);fflush(stdout);} }}/*----------------------------------------------------------------------------*/static void TMMNull_PrintNullMesg(FILE *fp, TMMNullMesg *m){ fprintf( fp, "{TMMNullMesg from_pe=%ld, lbts=%lf, qual=%s}", m->from_pe, TM_TS(m->lbts), TM_TIME_QUAL_STR(m->qual) );}/*----------------------------------------------------------------------------*/static void TMMNull_SendNullMesg( int to_pe, TM_Time ts, TM_TimeQual qual){ TMMNullMesg msg; FM_stream *strm = 0; msg.from_pe = st->myid; msg.lbts = ts; msg.qual = qual; strm = FM_begin_message( to_pe, sizeof(TMMNullMesg), st->fmh ); hton_TMMNullMesg( &msg ); FM_send_piece( strm, &msg, sizeof(msg) ); FM_end_message( strm ); ++stats->nulls.nsent;}/*----------------------------------------------------------------------------*/static void TMMNull_Init(TMM_Closure closure, TMM_LBTSStartedProc sproc){ char *dbgstr = getenv("TMNULL_DEBUG"); /* Integer [0,inf] */ MYASSERT( closure == &tm_state, ("Closures should match") ); st = &tm_state; lbts = &st->lbts; comp = &st->comp; opt = &st->opt; stats = &st->stats; st->debug = dbgstr?atoi(dbgstr):0; st->myid = (int) FM_nodeid; st->N = (int) FM_numnodes; st->fmh = -1; lbts->ID = 0; lbts->LBTS = TM_ZERO; lbts->qual = TM_TIME_QUAL_INCL; lbts->sproc = sproc; lbts->max_dproc = 1000; lbts->n_dproc = 0; lbts->dproc = (TM_LBTSDoneProc *)malloc( sizeof(TM_LBTSDoneProc)*lbts->max_dproc); comp->updated = TRUE; comp->inprogress = FALSE; comp->transient_ts = TM_IDENT; stats->nulls.nsent = 0; stats->nulls.nrecd = 0; stats->events.nsent = 0; stats->events.nrecd = 0; opt->piggyback_lbts.on = TRUE; opt->aggressive_sends.on = FALSE; opt->aggressive_sends.count = 0; opt->aggressive_sends.freq = FM_numnodes; /*One round trip around all PEs*/ TMMNull_ConfigTopology(); FML_RegisterHandler( &st->fmh, TMMNull_RecvNullMesg ); FML_Barrier();if(st->debug>=0){printf("%d: TMNull initialized.\n",st->myid);fflush(stdout);} TIMER_NOW(stats->start);}/*----------------------------------------------------------------------------*/static void TMMNull_SetLBTSStartProc( TMM_Closure closure, TM_LBTSStartedProc s) { lbts->sproc = s; }/*----------------------------------------------------------------------------*/static long TMMNull_CurrentEpoch( TMM_Closure closure ) { return lbts->ID; }/*----------------------------------------------------------------------------*/static void TMMNull_Recent_LBTS( TMM_Closure closure, TM_Time *pts ) { if( pts ) *pts = lbts->LBTS; }/*----------------------------------------------------------------------------*/static void TMMNull_PutTag( TMM_Closure closure, char *ptag, int *nbytes ){ TM_Time min_ext_la = TM_TopoGetMinExternalLA(); TM_Time piggy_lbts = TM_Add( lbts->LBTS, min_ext_la ); *((TM_Time*)ptag) = opt->piggyback_lbts.on ? piggy_lbts : TM_NEG1; *nbytes = sizeof(TM_Time);}/*----------------------------------------------------------------------------*/static void TMMNull_Out( TMM_Closure closure, TM_Time ts, long nevents ){ stats->events.nsent += nevents;}/*----------------------------------------------------------------------------*/static void TMMNull_In(TMM_Closure closure,TM_Time ts,char *ptag,int *nbytes){ TM_Time piggy_lbts = *((TM_Time*)ptag); *nbytes = sizeof(TM_Time); stats->events.nrecd++; if( comp->inprogress ) { TM_Time min_ext_la = TM_TopoGetMinExternalLA(); ts = TM_Sub( ts, min_ext_la ); /*Caller adds min LA to ts*/ comp->transient_ts = TM_Min( comp->transient_ts, ts ); comp->updated = TRUE; } /*XXX Not sure if the piggybacked lbts must be corrected for transients*/ /*But, better be sure than sorry*/ piggy_lbts = TM_Min(piggy_lbts, comp->transient_ts); if( TM_GE(piggy_lbts, TM_ZERO) && TM_GT(piggy_lbts, lbts->LBTS) ) { int i = 0;if(st->debug>0){printf("Piggyback advances LBTS from %lf to %lf\n",TM_TS(lbts->LBTS),TM_TS(piggy_lbts));} lbts->LBTS = piggy_lbts; lbts->qual = TM_TIME_QUAL_INCL; /*Update any lagging input connections lbts*/ for( i = 0; i < st->in.n; i++ ) { ConnectionInfo *conn = &st->in.conn[i]; if( conn->pe != st->myid && TM_GT(piggy_lbts, conn->lbts) ) {if(st->debug>0){printf("Piggyback updates lagging LBTS of input PE %d from %lf to %lf\n",conn->pe,TM_TS(conn->lbts),TM_TS(piggy_lbts));} conn->lbts = piggy_lbts; conn->qual = TM_TIME_QUAL_INCL; } } }}/*----------------------------------------------------------------------------*/static void TMMNull_AddDoneProc( TM_LBTSDoneProc dproc ){ if( dproc ) { MYASSERT( lbts->n_dproc < lbts->max_dproc, ("!") ); lbts->dproc[lbts->n_dproc++] = dproc; }}/*----------------------------------------------------------------------------*/static void TMMNull_DoNullSends( void ){ int i = 0, j = 0; if( !comp->updated ) {if(st->debug>4){printf("TMMNull_DoNullSends() nothing to do.\n");fflush(stdout);} return; } comp->updated = FALSE; for( j = 0; j < st->out.n; j++ ) /*For each receiver*/ { ConnectionInfo *outconn = &st->out.conn[j]; int rpe = outconn->pe; TM_Time external_la = TM_TopoGetReceiverLAByIndex( j ); TM_Time lbts_j = TM_IDENT, *old_lbts_j = &outconn->lbts; TM_TimeQual qual_j=TM_TIME_QUAL_INCL, *old_qual_j = &outconn->qual;if(st->debug>1){printf("TMMNull_DoNullSends() receiver PE %d has external_la %lf.\n",rpe,TM_TS(external_la));fflush(stdout);} for( i = 0; i < st->in.n; i++ ) /*For each sender*/ { TM_Time temp_min_ts = TM_ZERO; ConnectionInfo *inconn = &st->in.conn[i]; int spe = inconn->pe; TM_Time lbts_i = inconn->lbts; TM_TimeQual qual_i = inconn->qual; TM_Time pair_la = TM_IDENT;if(st->debug>1){printf("TMMNull_DoNullSends() to %d lbts[%d(spe%d)]=%lf.\n",rpe,i,inconn->pe,TM_TS(lbts_i));fflush(stdout);} if( !TM_TopoGetInternalLA( spe, rpe, &pair_la ) ) continue; temp_min_ts = TM_Min( lbts_j, TM_Add(lbts_i, pair_la) );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -