📄 rmtest.c
字号:
/*----------------------------------------------------------------------------*//* Reduction management test program using FM for communication. *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 07May2002 *//* $Revision: 1.8 $ $Name: v26apr05 $ $Date: 2005/02/03 14:56:23 $ *//*----------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include <math.h>#include <ctype.h>#include "mycompat.h"#include "fm.h"#include "rm.h"/*---------------------------------------------------------------------------*/#define htond(x) (x) /*XXX*/#define ntohd(x) (x) /*XXX*//*---------------------------------------------------------------------------*/#ifndef FALSE #define TRUE 1 #define FALSE 0#endif/*---------------------------------------------------------------------------*/#ifndef MAXDBL #define MAXDBL 10000#endif/*----------------------------------------------------------------------------*/struct RVALUE_TYPE_STRUCT{ double val; #if !PLATFORM_WIN long padding; /*Compatibility with size generated by Windows compiler*/ #endif};/*----------------------------------------------------------------------------*/static void hton_RVALUE_TYPE( RVALUE_TYPE *v ) { (v)->val = htond((v)->val); }static void ntoh_RVALUE_TYPE( RVALUE_TYPE *v ) { (v)->val = ntohd((v)->val); }/*----------------------------------------------------------------------------*/typedef struct rv_shell { RVALUE_TYPE rv; struct rv_shell *next; } RVALUE_SHELL;static RVALUE_SHELL *rv_shells = 0;static void alloc_rv_shells( void ) { int i = 0, n = 100; MYASSERT( !rv_shells, ("!") ); rv_shells = (RVALUE_SHELL *)malloc(sizeof(RVALUE_SHELL)*n); for(i=0;i<n-1;i++){rv_shells[i].next=&rv_shells[i+1];} rv_shells[n-1].next = 0; }static RVALUE_TYPE *rv_new( void ) { RVALUE_SHELL *s = rv_shells; if(!s){alloc_rv_shells();} MYASSERT(rv_shells, ("!")); s = rv_shells; rv_shells = s->next; return &s->rv; }static void rv_delete( RVALUE_TYPE *v ) { RVALUE_SHELL *s = (RVALUE_SHELL *)v; s->next = rv_shells; rv_shells = s; }static void rv_init(RVALUE_TYPE *a) { if(a){ (a)->val = 0/*MAXDBL*/; } }static void rv_assign( RVALUE_TYPE *a, RVALUE_TYPE *b ) { if((a)&&(b)) *(a) = *(b); }static void rv_reduce( RVALUE_TYPE *a, RVALUE_TYPE *b) { if((a)&&(b)) { (a)->val += (b)->val; } } /*{ if((a)&&(b)) { if((b)->val < (a)->val) (a)->val = (b)->val; } }*/static void rv_set( RVALUE_TYPE *a, double v ) { if(a) { (a)->val = v; } }static void rv_print( FILE *fp, RVALUE_TYPE *a ) { if(a)fprintf( (fp), "(%f)", (a)->val ); }/*----------------------------------------------------------------------------*/enum { RM_START_MSG, RM_VALUE_MSG };#define RM_MSG_TYPE_STR(t) ((t)==RM_START_MSG?"RM_START_MSG":"RM_VALUE_MSG")typedef struct RMMesgStruct{ long trans; long type; long from_pe, to_pe; RVALUE_TYPE val; struct RMMesgStruct *next; /*Scratch for msg buffering*/} RMMesg;#define hton_RMMesg(/*RMMesg **/m) \ do{ \ (m)->trans = htonl((m)->trans); \ (m)->type = htonl((m)->type); \ (m)->from_pe = htonl((m)->from_pe); \ (m)->to_pe = htonl((m)->to_pe); \ hton_RVALUE_TYPE(&(m)->val); \ }while(0)#define ntoh_RMMesg(/*RMMesg **/m) \ do{ \ (m)->trans = ntohl((m)->trans); \ (m)->type = ntohl((m)->type); \ (m)->from_pe = ntohl((m)->from_pe); \ (m)->to_pe = ntohl((m)->to_pe); \ ntoh_RVALUE_TYPE(&(m)->val); \ }while(0)/*----------------------------------------------------------------------------*/typedef struct msg_shell { RMMesg msg; struct msg_shell *next; } MSG_SHELL;static MSG_SHELL *msg_shells = 0;static void alloc_msg_shells( void ) { int i = 0, n = 100; MYASSERT( !msg_shells, ("!") ); msg_shells = (MSG_SHELL *)malloc(sizeof(MSG_SHELL)*n); for(i=0;i<n-1;i++){msg_shells[i].next=&msg_shells[i+1];} msg_shells[n-1].next = 0; }static RMMesg *msg_new( void ) { MSG_SHELL *s = msg_shells; if(!s){alloc_msg_shells();} MYASSERT(msg_shells, ("!")); s = msg_shells; msg_shells = s->next; return &s->msg; }static void msg_delete( RMMesg *v ) { MSG_SHELL *s = (MSG_SHELL *)v; s->next = msg_shells; msg_shells = s; } /*----------------------------------------------------------------------------*/#define MAXN 512static RMMesg *buffered_msgs = 0;/*----------------------------------------------------------------------------*/typedef struct{ RMUserHandle usr; long trans_num; long nmsgs;} Transaction;static Transaction action[2];/*---------------------------------------------------------------------------*/static int dbg = 0;static int myid = 0, N = 1;static unsigned int hid = 0;/*----------------------------------------------------------------------------*/void rm_send_msg( RMMesg *msg );/*----------------------------------------------------------------------------*/static void rm_send_start_msg( RMUserHandle usr, void *closure, int from_pe, int to_pe ){ RMMesg msg; msg.trans = (long)closure; MYASSERT( usr == action[msg.trans%2].usr, ("!") ); msg.type = RM_START_MSG; msg.from_pe = from_pe; msg.to_pe = to_pe; rv_init( &msg.val ); rm_send_msg( &msg );}/*----------------------------------------------------------------------------*/static void rm_send_value_msg( RMUserHandle usr, void *closure, int from_pe, int to_pe, RVALUE_TYPE *v ){ RMMesg msg; msg.trans = (long)closure; MYASSERT( usr == action[msg.trans%2].usr, ("!") ); msg.type = RM_VALUE_MSG; msg.from_pe = from_pe; msg.to_pe = to_pe; rv_assign( &msg.val, v ); rm_send_msg( &msg );}/*----------------------------------------------------------------------------*/static int rm_receive_msgs( int myid, int N, long trans ){ FM_extract(1000); return 0;}/*----------------------------------------------------------------------------*/static int rm_deliver_msgs( int myid, int N, long trans ){ RMMesg **b = &buffered_msgs; int ndelivered = 0; while( *b ) { if( FALSE /*XXX*/ && random() & 01 ) { /*Skip this mesg*/ b = &((*b)->next); } else { /*Deliver this mesg*/ Transaction *act = 0; RMMesg *m = *b; *b = m->next; if( m->trans == trans ) { act = &action[trans%2]; MYASSERT( act->trans_num == trans, ("!") ); } else if( m->trans == trans+1 ) { act = &action[m->trans%2]; MYASSERT( act->trans_num == m->trans, ("!") );if(dbg>2){printf("*** %d: Future msg! %ld,%ld\n", myid, trans, m->trans);} } else if( m->trans > trans+1 ) { MYASSERT(0, ("%d: TRANS ERROR! %ld, %ld\n",myid,trans,m->trans)); } else { /*This msg must be a START msg for prev trans. Just drop it.*/ MYASSERT( m->type == RM_START_MSG && m->trans < trans, ("!") ); } if( m->type == RM_START_MSG ) {if(dbg>2){printf("%d START_MSG%ld<-%ld\n",myid,m->trans,m->from_pe);} if( !act ) {if(dbg>2){printf("%d ignoring old START_MSG%ld\n", myid, m->trans);} } else { rm_receive_start( act->usr, m->from_pe ); act->nmsgs++; ndelivered++; } } else /*m->type == RM_VALUE_MSG*/ {if(dbg>2){printf("%d VALUE_MSG%ld<-%ld\n",myid,m->trans,m->from_pe);} MYASSERT( act /*Action must exist!*/, ("!") ); rm_receive_value( act->usr, m->from_pe, &m->val ); act->nmsgs++; ndelivered++; } msg_delete(m); } } return ndelivered;}/*----------------------------------------------------------------------------*/void rm_send_msg( RMMesg *msg ){ RMMesg out_msg = *msg; FM_stream *stream = 0; int dest = msg->to_pe, maxlen = 100; msg->next = 0; hton_RMMesg( &out_msg ); stream = FM_begin_message( dest, maxlen, hid ); FM_send_piece( stream, &out_msg, sizeof(out_msg) ); FM_end_message( stream );if(dbg>2){printf("%ld sending %s#%ld to %ld\n",msg->from_pe,RM_MSG_TYPE_STR(msg->type),msg->trans,msg->to_pe);}}/*---------------------------------------------------------------------------*/static int fm_handler( FM_stream *stream, unsigned int sender ){ RMMesg *msg = (RMMesg *)msg_new();if(dbg>2){printf("fm_handler() from %d\n", sender);fflush(stdout);} FM_receive( msg, stream, sizeof(*msg) ); ntoh_RMMesg( msg ); if( !(0<=msg->from_pe && msg->from_pe<N && 0<=msg->to_pe && msg->to_pe<N && (msg->type==RM_START_MSG || msg->type==RM_VALUE_MSG)) ) { printf( "Error on read: %ld, %ld\n", msg->from_pe, msg->to_pe ); } else { /*Buffer this message*/ msg->next = buffered_msgs; buffered_msgs = msg; } return FM_CONTINUE;}/*---------------------------------------------------------------------------*/int main( int ac, char *av[] ){ int i = 0, trans = 0, maxtrans = 1; TIMER_TYPE t1, t2; {char *estr=getenv("RMTEST_DEBUG"); dbg = estr ? atoi(estr) : 1;} {char *estr=getenv("RMTEST_MAXTRANS"); maxtrans= estr ? atoi(estr):100000;}if(dbg>0){printf("RMDEBUG=%d\n",dbg);fflush(stdout);} FM_pre_init( &ac, &av ); /*XXX*/ MYASSERT( ac == 1, ("Usage: %s\nSet node config using NODEINFO", av[0]) ); { NodeInfo nodeinfo; FM_getenv_nodeinfo( &nodeinfo ); FM_setenv_nodenames( &nodeinfo ); myid = nodeinfo.my_index; N = nodeinfo.nproc; }if(dbg>0){printf("myid=%d N=%d\n", myid, N);fflush(stdout);} FML_FMInit(); FML_RegisterHandler( &hid, fm_handler ); { RVALUE_CLASS rv_class; rv_class.rv_new = rv_new; rv_class.rv_delete = rv_delete; rv_class.rv_init = rv_init; rv_class.rv_assign = rv_assign; rv_class.rv_reduce = rv_reduce; rv_class.rv_print = rv_print; for(i=0;i<2;i++) { Transaction *act = &action[i]; act->usr = rm_register( N, myid, MAXN, &rv_class ); rm_init( act->usr, RM_SCHEDULE_GROUPED_BFLY ); act->trans_num = i; act->nmsgs = 0; } }if(dbg>0){printf("%d: Started running %d transactions\n",myid,maxtrans);fflush(stdout);} TIMER_NOW(t1); for( trans = 0; trans < maxtrans; trans++ ) { int started = FALSE; Transaction *act = &action[trans%2]; MYASSERT( act->trans_num == trans, ("!") ); MYASSERT( rm_get_status( act->usr, 0 ) == RM_ACTIVE, ("!") );if(dbg>1){printf( "NODE %d TRANSACTION %d\n", myid, trans );} if( myid == 0 ) act->nmsgs++; /*Start up first processor*/ /*----------------------------------------------------------------------*/if(dbg>1){printf("Entering loop...\n");fflush(stdout);} while(TRUE) { if( !started ) /*if( !started && act->nmsgs > 0 )*/ { RVALUE_TYPE rval; rv_set( &rval, 10.0*(myid+1) ); rm_receive_value( act->usr, myid, &rval );if(dbg>1){printf("%d: myvalue: ", myid); rv_print(stdout, &rval);printf("\n");} started = TRUE; }if(dbg>3){static int nx=0;if(++nx%100000==0){printf("%d: nx=%d\n",myid,nx);fflush(stdout);}} rm_receive_msgs( myid, N, trans ); rm_deliver_msgs( myid, N, trans ); /* Give cycles, and check for completion */ if( started ) { int done = FALSE; RVALUE_TYPE min_val; done = rm_resume( act->usr, &min_val, rm_send_start_msg,rm_send_value_msg,(void*)trans); if( done ) {if(dbg>1){printf("%d: TRANSACTION %d MIN= ",myid, trans); rv_print(stdout,&min_val);printf("\n"); fflush(stdout);} { rm_init(act->usr, RM_SCHEDULE_GROUPED_BFLY); act->trans_num += 2; act->nmsgs = 0; } break; } } } /*----------------------------------------------------------------------*/ } TIMER_NOW(t2); FML_Barrier();{TIMER_TYPE t1,t2; double dt; TIMER_NOW(t1); do{FM_extract(~0);TIMER_NOW(t2);dt=TIMER_DIFF(t2,t1);}while(dt<1/*secs*/);} FM_finalize();if(dbg>0){printf("%d: All done %d reductions %f us per reduction\n", myid, maxtrans, TIMER_DIFF(t2,t1)/maxtrans*1e6);fflush(stdout);} return 0;}/*----------------------------------------------------------------------------*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -