⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rmtest.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
字号:
/*----------------------------------------------------------------------------*//* Reduction management test program using FM for communication.              *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 07May2002   *//* $Revision: 1.8 $ $Name: v26apr05 $ $Date: 2005/02/03 14:56:23 $ *//*----------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include <math.h>#include <ctype.h>#include "mycompat.h"#include "fm.h"#include "rm.h"/*---------------------------------------------------------------------------*/#define htond(x) (x)  /*XXX*/#define ntohd(x) (x)  /*XXX*//*---------------------------------------------------------------------------*/#ifndef FALSE  #define TRUE 1  #define FALSE 0#endif/*---------------------------------------------------------------------------*/#ifndef MAXDBL    #define MAXDBL 10000#endif/*----------------------------------------------------------------------------*/struct RVALUE_TYPE_STRUCT{    double val;	#if !PLATFORM_WIN	    long padding; /*Compatibility with size generated by Windows compiler*/	#endif};/*----------------------------------------------------------------------------*/static void hton_RVALUE_TYPE( RVALUE_TYPE *v ) { (v)->val = htond((v)->val); }static void ntoh_RVALUE_TYPE( RVALUE_TYPE *v ) { (v)->val = ntohd((v)->val); }/*----------------------------------------------------------------------------*/typedef struct rv_shell { RVALUE_TYPE rv; struct rv_shell *next; } RVALUE_SHELL;static RVALUE_SHELL *rv_shells = 0;static void alloc_rv_shells( void )    { int i = 0, n = 100;      MYASSERT( !rv_shells, ("!") );      rv_shells = (RVALUE_SHELL *)malloc(sizeof(RVALUE_SHELL)*n);      for(i=0;i<n-1;i++){rv_shells[i].next=&rv_shells[i+1];}      rv_shells[n-1].next = 0;    }static RVALUE_TYPE *rv_new( void )    { RVALUE_SHELL *s = rv_shells;      if(!s){alloc_rv_shells();} MYASSERT(rv_shells, ("!"));      s = rv_shells; rv_shells = s->next;      return &s->rv; }static void rv_delete( RVALUE_TYPE *v )    { RVALUE_SHELL *s = (RVALUE_SHELL *)v;      s->next = rv_shells; rv_shells = s; }static void rv_init(RVALUE_TYPE *a)    { if(a){ (a)->val = 0/*MAXDBL*/; } }static void rv_assign( RVALUE_TYPE *a, RVALUE_TYPE *b )    { if((a)&&(b)) *(a) = *(b); }static void rv_reduce( RVALUE_TYPE *a, RVALUE_TYPE *b)    { if((a)&&(b)) { (a)->val += (b)->val; } }    /*{ if((a)&&(b)) { if((b)->val < (a)->val) (a)->val = (b)->val; } }*/static void rv_set( RVALUE_TYPE *a, double v )    { if(a) { (a)->val = v; } }static void rv_print( FILE *fp, RVALUE_TYPE *a )    { if(a)fprintf( (fp), "(%f)", (a)->val ); }/*----------------------------------------------------------------------------*/enum { RM_START_MSG, RM_VALUE_MSG };#define RM_MSG_TYPE_STR(t) ((t)==RM_START_MSG?"RM_START_MSG":"RM_VALUE_MSG")typedef struct RMMesgStruct{    long trans;    long type;    long from_pe, to_pe;    RVALUE_TYPE val;    struct RMMesgStruct *next; /*Scratch for msg buffering*/} RMMesg;#define hton_RMMesg(/*RMMesg **/m) \    do{ \	(m)->trans = htonl((m)->trans); \	(m)->type = htonl((m)->type); \	(m)->from_pe = htonl((m)->from_pe); \	(m)->to_pe = htonl((m)->to_pe); \	hton_RVALUE_TYPE(&(m)->val); \    }while(0)#define ntoh_RMMesg(/*RMMesg **/m) \    do{ \	(m)->trans = ntohl((m)->trans); \	(m)->type = ntohl((m)->type); \	(m)->from_pe = ntohl((m)->from_pe); \	(m)->to_pe = ntohl((m)->to_pe); \	ntoh_RVALUE_TYPE(&(m)->val); \    }while(0)/*----------------------------------------------------------------------------*/typedef struct msg_shell { RMMesg msg; struct msg_shell *next; } MSG_SHELL;static MSG_SHELL *msg_shells = 0;static void alloc_msg_shells( void )    { int i = 0, n = 100;      MYASSERT( !msg_shells, ("!") );      msg_shells = (MSG_SHELL *)malloc(sizeof(MSG_SHELL)*n);      for(i=0;i<n-1;i++){msg_shells[i].next=&msg_shells[i+1];}      msg_shells[n-1].next = 0;    }static RMMesg *msg_new( void )    { MSG_SHELL *s = msg_shells;      if(!s){alloc_msg_shells();} MYASSERT(msg_shells, ("!"));      s = msg_shells; msg_shells = s->next;      return &s->msg; }static void msg_delete( RMMesg *v )    { MSG_SHELL *s = (MSG_SHELL *)v;      s->next = msg_shells; msg_shells = s; }    /*----------------------------------------------------------------------------*/#define MAXN 512static RMMesg *buffered_msgs = 0;/*----------------------------------------------------------------------------*/typedef struct{    RMUserHandle usr;    long trans_num;    long nmsgs;} Transaction;static Transaction action[2];/*---------------------------------------------------------------------------*/static int dbg = 0;static int myid = 0, N = 1;static unsigned int hid = 0;/*----------------------------------------------------------------------------*/void rm_send_msg( RMMesg *msg );/*----------------------------------------------------------------------------*/static void rm_send_start_msg( RMUserHandle usr, void *closure,    int from_pe, int to_pe ){    RMMesg msg;    msg.trans = (long)closure;    MYASSERT( usr == action[msg.trans%2].usr, ("!") );    msg.type = RM_START_MSG; msg.from_pe = from_pe; msg.to_pe = to_pe;    rv_init( &msg.val );    rm_send_msg( &msg );}/*----------------------------------------------------------------------------*/static void rm_send_value_msg( RMUserHandle usr, void *closure,    int from_pe, int to_pe, RVALUE_TYPE *v ){    RMMesg msg;    msg.trans = (long)closure;    MYASSERT( usr == action[msg.trans%2].usr, ("!") );    msg.type = RM_VALUE_MSG; msg.from_pe = from_pe; msg.to_pe = to_pe;    rv_assign( &msg.val, v );    rm_send_msg( &msg );}/*----------------------------------------------------------------------------*/static int rm_receive_msgs( int myid, int N, long trans ){    FM_extract(1000);    return 0;}/*----------------------------------------------------------------------------*/static int rm_deliver_msgs( int myid, int N, long trans ){    RMMesg **b = &buffered_msgs;    int ndelivered = 0;    while( *b )    {	if( FALSE /*XXX*/ && random() & 01 )	{	    /*Skip this mesg*/	    b = &((*b)->next);	}	else	{	    /*Deliver this mesg*/	    Transaction *act = 0;	    RMMesg *m = *b;	    *b = m->next;	    if( m->trans == trans )	    {		act = &action[trans%2];		MYASSERT( act->trans_num == trans, ("!") );	    }	    else if( m->trans == trans+1 )	    {		act = &action[m->trans%2];		MYASSERT( act->trans_num == m->trans, ("!") );if(dbg>2){printf("*** %d: Future msg! %ld,%ld\n", myid, trans, m->trans);}	    }	    else if( m->trans > trans+1 )	    {		MYASSERT(0, ("%d: TRANS ERROR! %ld, %ld\n",myid,trans,m->trans));	    }	    else	    {		/*This msg must be a START msg for prev trans.  Just drop it.*/		MYASSERT( m->type == RM_START_MSG && m->trans < trans, ("!") );	    }	    if( m->type == RM_START_MSG )	    {if(dbg>2){printf("%d START_MSG%ld<-%ld\n",myid,m->trans,m->from_pe);}		if( !act )		{if(dbg>2){printf("%d ignoring old START_MSG%ld\n", myid, m->trans);}		}		else		{	            rm_receive_start( act->usr, m->from_pe );	            act->nmsgs++; ndelivered++;		}	    }	    else /*m->type == RM_VALUE_MSG*/	    {if(dbg>2){printf("%d VALUE_MSG%ld<-%ld\n",myid,m->trans,m->from_pe);}		MYASSERT( act /*Action must exist!*/, ("!") );                rm_receive_value( act->usr, m->from_pe, &m->val );	        act->nmsgs++; ndelivered++;	    }	    msg_delete(m);	}    }    return ndelivered;}/*----------------------------------------------------------------------------*/void rm_send_msg( RMMesg *msg ){    RMMesg out_msg = *msg;    FM_stream *stream = 0;    int dest = msg->to_pe, maxlen = 100;    msg->next = 0;    hton_RMMesg( &out_msg );    stream = FM_begin_message( dest, maxlen, hid );    FM_send_piece( stream, &out_msg, sizeof(out_msg) );    FM_end_message( stream );if(dbg>2){printf("%ld sending %s#%ld to %ld\n",msg->from_pe,RM_MSG_TYPE_STR(msg->type),msg->trans,msg->to_pe);}}/*---------------------------------------------------------------------------*/static int fm_handler( FM_stream *stream, unsigned int sender ){    RMMesg *msg = (RMMesg *)msg_new();if(dbg>2){printf("fm_handler() from %d\n", sender);fflush(stdout);}    FM_receive( msg, stream, sizeof(*msg) );    ntoh_RMMesg( msg );    if( !(0<=msg->from_pe && msg->from_pe<N &&          0<=msg->to_pe && msg->to_pe<N &&          (msg->type==RM_START_MSG || msg->type==RM_VALUE_MSG)) )    {        printf( "Error on read: %ld, %ld\n",            msg->from_pe, msg->to_pe );    }    else    {        /*Buffer this message*/        msg->next = buffered_msgs;        buffered_msgs = msg;    }    return FM_CONTINUE;}/*---------------------------------------------------------------------------*/int main( int ac, char *av[] ){    int i = 0, trans = 0, maxtrans = 1;    TIMER_TYPE t1, t2;    {char *estr=getenv("RMTEST_DEBUG"); dbg = estr ? atoi(estr) : 1;}    {char *estr=getenv("RMTEST_MAXTRANS"); maxtrans= estr ? atoi(estr):100000;}if(dbg>0){printf("RMDEBUG=%d\n",dbg);fflush(stdout);}    FM_pre_init( &ac, &av ); /*XXX*/    MYASSERT( ac == 1, ("Usage: %s\nSet node config using NODEINFO", av[0]) );    {        NodeInfo nodeinfo;	FM_getenv_nodeinfo( &nodeinfo );	FM_setenv_nodenames( &nodeinfo );	myid = nodeinfo.my_index;	N = nodeinfo.nproc;    }if(dbg>0){printf("myid=%d N=%d\n", myid, N);fflush(stdout);}    FML_FMInit();    FML_RegisterHandler( &hid, fm_handler );    {    RVALUE_CLASS rv_class;    rv_class.rv_new = rv_new;    rv_class.rv_delete = rv_delete;    rv_class.rv_init = rv_init;    rv_class.rv_assign = rv_assign;    rv_class.rv_reduce = rv_reduce;    rv_class.rv_print = rv_print;    for(i=0;i<2;i++)    {	Transaction *act = &action[i];        act->usr = rm_register( N, myid, MAXN, &rv_class );        rm_init( act->usr, RM_SCHEDULE_GROUPED_BFLY );        act->trans_num = i;        act->nmsgs = 0;    }    }if(dbg>0){printf("%d: Started running %d transactions\n",myid,maxtrans);fflush(stdout);}  TIMER_NOW(t1);  for( trans = 0; trans < maxtrans; trans++ )  {    int started = FALSE;    Transaction *act = &action[trans%2];    MYASSERT( act->trans_num == trans, ("!") );    MYASSERT( rm_get_status( act->usr, 0 ) == RM_ACTIVE, ("!") );if(dbg>1){printf( "NODE %d TRANSACTION %d\n", myid, trans );}    if( myid == 0 ) act->nmsgs++; /*Start up first processor*/    /*----------------------------------------------------------------------*/if(dbg>1){printf("Entering loop...\n");fflush(stdout);}    while(TRUE)    {        if( !started )        /*if( !started && act->nmsgs > 0 )*/        {            RVALUE_TYPE rval;            rv_set( &rval, 10.0*(myid+1) );            rm_receive_value( act->usr, myid, &rval );if(dbg>1){printf("%d: myvalue: ", myid); rv_print(stdout, &rval);printf("\n");}            started = TRUE;        }if(dbg>3){static int nx=0;if(++nx%100000==0){printf("%d: nx=%d\n",myid,nx);fflush(stdout);}}	rm_receive_msgs( myid, N, trans );	rm_deliver_msgs( myid, N, trans );	/* Give cycles, and check for completion */	if( started )	{	    int done = FALSE;	    RVALUE_TYPE min_val;	    done = rm_resume( act->usr, &min_val,			      rm_send_start_msg,rm_send_value_msg,(void*)trans);	    if( done )	    {if(dbg>1){printf("%d: TRANSACTION %d  MIN= ",myid, trans); rv_print(stdout,&min_val);printf("\n"); fflush(stdout);}		{		    rm_init(act->usr, RM_SCHEDULE_GROUPED_BFLY);		    act->trans_num += 2;		    act->nmsgs = 0;		}	        break;	    }	}    }    /*----------------------------------------------------------------------*/  }  TIMER_NOW(t2);    FML_Barrier();{TIMER_TYPE t1,t2; double dt; TIMER_NOW(t1); do{FM_extract(~0);TIMER_NOW(t2);dt=TIMER_DIFF(t2,t1);}while(dt<1/*secs*/);}    FM_finalize();if(dbg>0){printf("%d: All done %d reductions %f us per reduction\n", myid, maxtrans, TIMER_DIFF(t2,t1)/maxtrans*1e6);fflush(stdout);}    return 0;}/*----------------------------------------------------------------------------*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -