📄 rm.c
字号:
/*----------------------------------------------------------------------------*//* Reduction management functions. *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 20July2000 *//* $Revision: 1.6 $ $Name: v26apr05 $ $Date: 2004/01/23 15:35:43 $ *//*----------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include "mycompat.h"#include "rm.h"#ifndef FALSE #define TRUE 1 #define FALSE 0#endif/*----------------------------------------------------------------------------*/typedef struct{ int must_recv; /* TRUE if this processor must receive; FALSE if it */ /* must send -- at this point in the schedule */ /* Interpretation & use of the following fields depend on type of action */ int id; /* processor ID (e.g. source or destination) */ int heard; /* Do we know if this processor started its processing? */ int procd; /* Recd(sent) from(to) this processor? */ RVALUE_TYPE *recd_rvalue; /* Recd(sent) value from(to) this processor */ int jumpstart; /* If must_recv, should this processor be jumpstarted? */ int reduce_or_overwrite; /* How to use recd_rvalue: to reduce or overwrite*/} RMAction;/*----------------------------------------------------------------------------*/typedef struct{ int N; int myid; int maxn; RVALUE_CLASS rv_class;} RMUserInfo;/*----------------------------------------------------------------------------*/typedef struct{ int dbg; /*Debugging on/off?*/ RMStatus status; int started; /* Has a reduction started? */ int done; /* Has this reduction completed? */ RMScheduleType old_schedule;/*Currently used schedule type*/ RMScheduleType new_schedule;/*Change to this schedule type on next rm_init*/ RVALUE_TYPE *min_so_far;/* Minimum reduced value so far at this processor */ int do_jumpstarting; /*Should jumpstart messages be sent during reduction?*/ #define MAXACTIONS 1000 /*CUSTOMIZE*/ int index_of_blocked_action; int nactions; RMAction actions[MAXACTIONS];} RMState;/*----------------------------------------------------------------------------*/typedef struct{ RMUserInfo info; RMState state;} RMUser;/*----------------------------------------------------------------------------*/static void rm_state_alloc( RMState *st, RVALUE_CLASS *rvc, int maxn ){ int k = 0; st->min_so_far = rvc->rv_new(); for( k = 0; k < MAXACTIONS; k++ ) { RMAction *act = &st->actions[k]; act->recd_rvalue = rvc->rv_new(); }}/*----------------------------------------------------------------------------*/static void rm_state_init( RMState *st, RVALUE_CLASS *rvc, int i, int N ){ st->started = FALSE; st->done = FALSE; st->index_of_blocked_action = -1; st->do_jumpstarting = FALSE; rvc->rv_init( st->min_so_far ); /* Compute/Update the communication pattern/schedule for this processor */ if( st->old_schedule == st->new_schedule ) { int k = 0; for( k = 0; k < st->nactions; k++ ) { RMAction *act = &st->actions[k]; act->heard = FALSE; act->procd = FALSE; rvc->rv_init( act->recd_rvalue ); } } else { int k = 0; int ids[MAXACTIONS], rs[MAXACTIONS], js[MAXACTIONS], ro[MAXACTIONS]; st->nactions = 0; compute_schedule( st->new_schedule, i, N, MAXACTIONS, &st->nactions, ids, rs, js, ro );if(st->dbg>=1){print_schedule( stdout, i, N, st->nactions, ids, rs, js, ro);fflush(stdout);} for( k = 0; k < st->nactions; k++ ) { RMAction *act = &st->actions[k]; act->id = ids[k]; act->must_recv = rs[k]; act->heard = FALSE; act->procd = FALSE; rvc->rv_init( act->recd_rvalue ); act->jumpstart = js[k]; act->reduce_or_overwrite = ro[k]; assert( !( rs[k] == FALSE && ro[k] == TRUE ) ); } st->old_schedule = st->new_schedule; }}/*----------------------------------------------------------------------------*/static int rm_find_proc( RMUser *usr, int pe, int rs, int first_action ){ int k = 0; RMState *st = &usr->state; for( k = first_action; k < st->nactions; k++ ) { RMAction *a = &st->actions[k]; assert( 0 <= a->id && a->id < usr->info.N ); if( a->id == pe && a->must_recv == rs ) break; } if( k >= st->nactions ) k = -1; return k;}/*----------------------------------------------------------------------------*//* A processor has explicitly or implicitly asked us to start. *//*----------------------------------------------------------------------------*/static void rm_do_start(RMUser *usr, RM_SEND_START_MSG start_func,void *closure){ RMUserHandle uh = (RMUserHandle)usr; RMState *st = &usr->state; if( !st->started ) { int k = 0; st->started = TRUE; /* Send out start messages to all from whom this */ /* processor must hear (later) in the schedule */ for( k = 0; st->do_jumpstarting && k < st->nactions; k++ ) { RMAction *a = &st->actions[k]; if( a->must_recv && a->id != usr->info.myid /*No need to start self*/ ) { if( a->heard ) { /* Don't need to start a processor */ /* from whom we have already heard */ /* (it has already started!), so, do nothing */ } else if( a->jumpstart ) { start_func( uh, closure, usr->info.myid, a->id ); /* Send to another processor in the "other segment" of */ /* the butterfly pattern, to rapidly propagate the start */ { int another = (a->id + usr->info.N/4) % usr->info.N; assert( 0 <= another && another < usr->info.N ); if( another != usr->info.myid && another != a->id ) start_func( uh, closure, usr->info.myid, another ); } } else { /* No need to send a jumpstart message to this processor */ /* Based on the schedule, the processor is either known */ /* to have started, or will start implicity when it */ /* receives a value message from us (no need to waste */ /* an extra start message)! */ } } } }}/*----------------------------------------------------------------------------*/static void rm_abort( RMUserHandle uh ){ RMUser *usr = (RMUser *)uh; RMState *st = &usr->state; /*Nothing special for now, since this function is not accessible by user*/ st->status = RM_DONE;}/*----------------------------------------------------------------------------*//* Register a user. *//*----------------------------------------------------------------------------*/RMUserHandle rm_register( int N, int myid, int maxn, RVALUE_CLASS *rv_class){ RMUser *usr = (RMUser *)malloc( sizeof( RMUser ) ); RMState *st = &usr->state; usr->info.N = N; usr->info.myid = myid; usr->info.maxn = maxn; usr->info.rv_class = *rv_class; rm_state_alloc( st, rv_class, maxn ); st->status = RM_REGISTERED; st->dbg = getenv("RM_DEBUG") ? atoi(getenv("RM_DEBUG")) : 0; st->old_schedule = RM_SCHEDULE_UNDEFINED; st->new_schedule = RM_SCHEDULE_GROUPED_BFLY; { char *sch=getenv("RM_SCHEDULE"); if(!sch){} else if(!strcmp(sch,"A2A")) st->new_schedule = RM_SCHEDULE_ALL_TO_ALL; else if(!strcmp(sch,"STAR")) st->new_schedule = RM_SCHEDULE_STAR; else if(!strcmp(sch,"BFLY")) st->new_schedule = RM_SCHEDULE_BUTTERFLY; else if(!strcmp(sch,"GBFLY")) st->new_schedule = RM_SCHEDULE_GROUPED_BFLY; else if(!strcmp(sch,"PBFLY")) st->new_schedule = RM_SCHEDULE_PHASE_BFLY; else {printf("Bad reduction schedule\n");exit(1);} } #define STRING_RM_SCHEDULE(_st) (_st==RM_SCHEDULE_ALL_TO_ALL ? "A2A":\ _st==RM_SCHEDULE_STAR ? "STAR":\ _st==RM_SCHEDULE_BUTTERFLY ? "BUTTERFLY":\ _st==RM_SCHEDULE_GROUPED_BFLY ? "GBFLY":\ _st==RM_SCHEDULE_PHASE_BFLY ? "PBFLY":\ "UNDEFINED")if(st->dbg>=1){printf("\n** RM using schedule of type %s.\n\n", STRING_RM_SCHEDULE(st->new_schedule));fflush(stdout);} return (RMUserHandle)usr;}/*----------------------------------------------------------------------------*//* Init/Re-init user. *//*----------------------------------------------------------------------------*/void rm_init( RMUserHandle uh, RMScheduleType new_sched ){ RMUser *usr = (RMUser *)uh; RMUserInfo *info = &usr->info; RMState *st = &usr->state; if( st->status == RM_ACTIVE ) rm_abort( uh ); assert( st->status == RM_REGISTERED || st->status == RM_DONE ); st->new_schedule = new_sched; rm_state_init( st, &info->rv_class, info->myid, info->N ); st->status = RM_ACTIVE;}/*----------------------------------------------------------------------------*//* A processor has explicitly or implicitly asked us to start. *//*----------------------------------------------------------------------------*/void rm_receive_start( RMUserHandle uh, int from_pe ){ RMUser *usr = (RMUser *)uh; RMState *st = &usr->state; int k = 0; assert( st->status == RM_ACTIVE ); assert( 0 <= from_pe && from_pe < usr->info.N ); k = rm_find_proc( usr, from_pe, TRUE, 0 ); if( k < 0 ) { /* It is possible that we receive a start message from a processor */ /* which is not in the schedule, because that processor can send */ /* to processors that are not in its schedule; see the code in this */ /* function below */if(st->dbg>=2){printf("%d heard STARTX from %d\n",usr->info.myid,from_pe);fflush(stdout);} } else { RMAction *a = &st->actions[k]; a->heard = TRUE;if(st->dbg>=2){printf("%d heard START from %d\n",usr->info.myid,from_pe);fflush(stdout);} } st->status = RM_ACTIVE;}/*----------------------------------------------------------------------------*//* A processor has sent its value. *//*----------------------------------------------------------------------------*/void rm_receive_value( RMUserHandle uh, int from_pe, RVALUE_TYPE *recd_value ){ RMUser *usr = (RMUser *)uh; RMState *st = &usr->state; RVALUE_CLASS *rvc = &usr->info.rv_class; int k = -1; assert( st->status == RM_ACTIVE );if(st->dbg>=2){printf("%d recd new value from %d: ",usr->info.myid,from_pe);rvc->rv_print(stdout, recd_value); printf("\n");fflush(stdout);} do { k = rm_find_proc( usr, from_pe, TRUE, k+1 ); assert( 0 <= k && k < st->nactions ); { RMAction *a = &st->actions[k]; if( !a->procd ) { a->procd = TRUE; a->heard = TRUE; rvc->rv_assign( a->recd_rvalue, recd_value ); break; } } }while(1); st->status = RM_ACTIVE;}/*----------------------------------------------------------------------------*/int rm_resume( RMUserHandle uh, RVALUE_TYPE *result_so_far, RM_SEND_START_MSG send_start_function, RM_SEND_VALUE_MSG send_value_function, void *closure){ RMUser *usr = (RMUser *)uh; RMState *st = &usr->state; RVALUE_CLASS *rvc = &usr->info.rv_class; int k = 0; int blocked = FALSE; assert( st->status == RM_ACTIVE ); /*Performance optimization*/ if( st->index_of_blocked_action >= 0 ) { RMAction *a = &st->actions[st->index_of_blocked_action]; if( a->must_recv && !a->procd ) return 0; st->index_of_blocked_action = -1; } st->status = RM_PROCESSING; rm_do_start( usr, send_start_function, closure ); rvc->rv_init( st->min_so_far ); for( k = 0; k < st->nactions; k++ ) { RMAction *a = &st->actions[k]; if( a->must_recv && !a->procd ) { blocked = TRUE; st->index_of_blocked_action = k; break; } else if( a->must_recv && a->procd ) { /*Already received the value; use it to reduce/overwrite min*/ if( a->reduce_or_overwrite ) { rvc->rv_reduce( st->min_so_far, a->recd_rvalue ); /*reduce*/ } else { rvc->rv_assign( st->min_so_far, a->recd_rvalue ); /*overwrite*/ } } else if( !a->must_recv && !a->procd ) { send_value_function( uh, closure, usr->info.myid, a->id, st->min_so_far); a->procd = TRUE; } else { /*Sending already done; do nothing*/ } } st->done = !blocked; if( st->done ) {if(st->dbg>=2){printf("%d DONE! Min_so_far: ", usr->info.myid);rvc->rv_print(stdout, st->min_so_far); printf("\n");fflush(stdout);} st->status = RM_DONE; } else { st->status = RM_ACTIVE; } rvc->rv_assign( result_so_far, st->min_so_far ); return (st->done ? 1 : 0);}/*----------------------------------------------------------------------------*/RMStatus rm_get_status( RMUserHandle uh, RVALUE_TYPE *rvalue ){ RMUser *usr = (RMUser *)uh; RMState *st = &usr->state; RVALUE_CLASS *rvc = &usr->info.rv_class; rvc->rv_assign( rvalue, st->min_so_far ); return st->status;}/*----------------------------------------------------------------------------*//* Reduction management utilities. *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 20July2000 *//*----------------------------------------------------------------------------*/static int dbg = 0;/*----------------------------------------------------------------------------*//* Returns the highest exponent of 2 which is contained in N *//*----------------------------------------------------------------------------*/static int highest_exponent_of_2_in( int N ){ int k = 0, M = N; while( M > 1 ) { k++; M = M/2; }if(dbg>=2){printf("highest_exponent_of_2_in(%d) = %d\n",N,k);fflush(stdout);} return k;}/*----------------------------------------------------------------------------*//* Returns the number which is the highest power of 2 contained in N *//*----------------------------------------------------------------------------*/static int highest_power_of_2_in( int N ){ int m = 1, M = N; while( M > 1 ) { m *= 2; M = M/2; }if(dbg>=2){printf("highest_power_of_2_in(%d) = %d\n",N,m);fflush(stdout);} return m;}/*----------------------------------------------------------------------------*//* Returns the communication schedule based on star pattern. *//*----------------------------------------------------------------------------*/static void compute_all_to_all_schedule( int i, int N, int mx, int *ssize, int ids[], int rs[], int js[], int ro[]){ int ss = 0; /* Size of the schedule returned */ assert( 0 <= i && i < N ); #define CHK() do{if(ss>=mx){printf("Add more actions!\n");return;}}while(0) /* First receive local value from self */ {CHK(); ids[ss]=i; rs[ss]=TRUE; js[ss]=FALSE; ro[ss]=TRUE; ss++;} { int j = 0; for( j = 0; j < N; j++ ) { if( j != i ) {CHK(); ids[ss]=j; rs[ss]=FALSE; js[ss]=FALSE; ro[ss]=FALSE; ss++;}/*send*/ } for( j = 0; j < N; j++ ) { if( j != i ) {CHK(); ids[ss]=j; rs[ss]=TRUE; js[ss]=FALSE; ro[ss]=TRUE; ss++;} /*recv*/ } } #undef CHK *ssize = ss;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -