⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rm.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
📖 第 1 页 / 共 2 页
字号:
/*----------------------------------------------------------------------------*//* Reduction management functions.                                            *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 20July2000  *//* $Revision: 1.6 $ $Name: v26apr05 $ $Date: 2004/01/23 15:35:43 $ *//*----------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include "mycompat.h"#include "rm.h"#ifndef FALSE  #define TRUE 1  #define FALSE 0#endif/*----------------------------------------------------------------------------*/typedef struct{    int must_recv;    /* TRUE if this processor must receive; FALSE if it */                      /* must send -- at this point in the schedule */    /* Interpretation & use of the following fields depend on type of action */    int id;           /* processor ID (e.g. source or destination) */    int heard;        /* Do we know if this processor started its processing? */    int procd;        /* Recd(sent) from(to) this processor? */    RVALUE_TYPE *recd_rvalue; /* Recd(sent) value from(to) this processor */    int jumpstart;    /* If must_recv, should this processor be jumpstarted? */    int reduce_or_overwrite; /* How to use recd_rvalue: to reduce or overwrite*/} RMAction;/*----------------------------------------------------------------------------*/typedef struct{    int N;    int myid;    int maxn;    RVALUE_CLASS rv_class;} RMUserInfo;/*----------------------------------------------------------------------------*/typedef struct{    int dbg;          /*Debugging on/off?*/    RMStatus status;    int started;      /* Has a reduction started? */    int done;         /* Has this reduction completed? */    RMScheduleType old_schedule;/*Currently used schedule type*/    RMScheduleType new_schedule;/*Change to this schedule type on next rm_init*/    RVALUE_TYPE *min_so_far;/* Minimum reduced value so far at this processor */    int do_jumpstarting; /*Should jumpstart messages be sent during reduction?*/    #define MAXACTIONS 1000 /*CUSTOMIZE*/    int index_of_blocked_action;    int nactions;    RMAction actions[MAXACTIONS];} RMState;/*----------------------------------------------------------------------------*/typedef struct{    RMUserInfo info;    RMState state;} RMUser;/*----------------------------------------------------------------------------*/static void rm_state_alloc( RMState *st, RVALUE_CLASS *rvc, int maxn ){    int k = 0;    st->min_so_far = rvc->rv_new();    for( k = 0; k < MAXACTIONS; k++ )    {        RMAction *act = &st->actions[k];	act->recd_rvalue = rvc->rv_new();    }}/*----------------------------------------------------------------------------*/static void rm_state_init( RMState *st, RVALUE_CLASS *rvc, int i, int N ){    st->started = FALSE;    st->done = FALSE;    st->index_of_blocked_action = -1;    st->do_jumpstarting = FALSE;    rvc->rv_init( st->min_so_far );    /* Compute/Update the communication pattern/schedule for this processor */    if( st->old_schedule == st->new_schedule )    {        int k = 0;        for( k = 0; k < st->nactions; k++ )        {            RMAction *act = &st->actions[k];            act->heard = FALSE;            act->procd = FALSE;	    rvc->rv_init( act->recd_rvalue );        }    }    else    {        int k = 0;	int ids[MAXACTIONS], rs[MAXACTIONS], js[MAXACTIONS], ro[MAXACTIONS];        st->nactions = 0;        compute_schedule( st->new_schedule,	    i, N, MAXACTIONS, &st->nactions, ids, rs, js, ro );if(st->dbg>=1){print_schedule( stdout, i, N, st->nactions, ids, rs, js, ro);fflush(stdout);}        for( k = 0; k < st->nactions; k++ )        {            RMAction *act = &st->actions[k];            act->id = ids[k];            act->must_recv = rs[k];            act->heard = FALSE;            act->procd = FALSE;	    rvc->rv_init( act->recd_rvalue );            act->jumpstart = js[k];            act->reduce_or_overwrite = ro[k];	    assert( !( rs[k] == FALSE && ro[k] == TRUE ) );        }	st->old_schedule = st->new_schedule;    }}/*----------------------------------------------------------------------------*/static int rm_find_proc( RMUser *usr, int pe, int rs, int first_action ){    int k = 0;    RMState *st = &usr->state;    for( k = first_action; k < st->nactions; k++ )    {        RMAction *a = &st->actions[k];        assert( 0 <= a->id && a->id < usr->info.N );	if( a->id == pe && a->must_recv == rs ) break;    }    if( k >= st->nactions ) k = -1;    return k;}/*----------------------------------------------------------------------------*//* A processor has explicitly or implicitly asked us to start.                *//*----------------------------------------------------------------------------*/static void rm_do_start(RMUser *usr, RM_SEND_START_MSG start_func,void *closure){    RMUserHandle uh = (RMUserHandle)usr;    RMState *st = &usr->state;    if( !st->started )    {	int k = 0;        st->started = TRUE;	/* Send out start messages to all from whom this */	/* processor must hear (later) in the schedule */	for( k = 0; st->do_jumpstarting && k < st->nactions; k++ )	{	    RMAction *a = &st->actions[k];	    if( a->must_recv &&		a->id != usr->info.myid /*No need to start self*/ )	    {		if( a->heard )		{		    /* Don't need to start a processor */		    /* from whom we have already heard */		    /* (it has already started!), so, do nothing */		}		else if( a->jumpstart )		{	            start_func( uh, closure, usr->info.myid, a->id );		    /* Send to another processor in the "other segment" of */		    /* the butterfly pattern, to rapidly propagate the start */		    {		        int another = (a->id + usr->info.N/4) % usr->info.N;			assert( 0 <= another && another < usr->info.N );		        if( another != usr->info.myid && another != a->id )	                    start_func( uh, closure, usr->info.myid, another );		    }		}		else		{		    /* No need to send a jumpstart message to this processor */		    /* Based on the schedule, the processor is either known  */		    /* to have started, or will start implicity when it      */		    /* receives a value message from us (no need to waste    */		    /* an extra start message)!                              */		}	    }	}    }}/*----------------------------------------------------------------------------*/static void rm_abort( RMUserHandle uh ){    RMUser *usr = (RMUser *)uh;    RMState *st = &usr->state;    /*Nothing special for now, since this function is not accessible by user*/    st->status = RM_DONE;}/*----------------------------------------------------------------------------*//* Register a user.                                                           *//*----------------------------------------------------------------------------*/RMUserHandle rm_register(    int N,    int myid,    int maxn,    RVALUE_CLASS *rv_class){    RMUser *usr = (RMUser *)malloc( sizeof( RMUser ) );    RMState *st = &usr->state;    usr->info.N = N; usr->info.myid = myid; usr->info.maxn = maxn;    usr->info.rv_class = *rv_class;    rm_state_alloc( st, rv_class, maxn );    st->status = RM_REGISTERED;    st->dbg = getenv("RM_DEBUG") ? atoi(getenv("RM_DEBUG")) : 0;    st->old_schedule = RM_SCHEDULE_UNDEFINED;    st->new_schedule = RM_SCHEDULE_GROUPED_BFLY;    {    char *sch=getenv("RM_SCHEDULE");    if(!sch){}    else if(!strcmp(sch,"A2A")) st->new_schedule = RM_SCHEDULE_ALL_TO_ALL;    else if(!strcmp(sch,"STAR")) st->new_schedule = RM_SCHEDULE_STAR;    else if(!strcmp(sch,"BFLY")) st->new_schedule = RM_SCHEDULE_BUTTERFLY;    else if(!strcmp(sch,"GBFLY")) st->new_schedule = RM_SCHEDULE_GROUPED_BFLY;    else if(!strcmp(sch,"PBFLY")) st->new_schedule = RM_SCHEDULE_PHASE_BFLY;    else                         {printf("Bad reduction schedule\n");exit(1);}    }    #define STRING_RM_SCHEDULE(_st) (_st==RM_SCHEDULE_ALL_TO_ALL ? "A2A":\				     _st==RM_SCHEDULE_STAR       ? "STAR":\				     _st==RM_SCHEDULE_BUTTERFLY  ? "BUTTERFLY":\				     _st==RM_SCHEDULE_GROUPED_BFLY ? "GBFLY":\				     _st==RM_SCHEDULE_PHASE_BFLY ? "PBFLY":\				     "UNDEFINED")if(st->dbg>=1){printf("\n** RM using schedule of type %s.\n\n", STRING_RM_SCHEDULE(st->new_schedule));fflush(stdout);}    return (RMUserHandle)usr;}/*----------------------------------------------------------------------------*//*  Init/Re-init user.                                                        *//*----------------------------------------------------------------------------*/void rm_init( RMUserHandle uh, RMScheduleType new_sched ){    RMUser *usr = (RMUser *)uh;    RMUserInfo *info = &usr->info;    RMState *st = &usr->state;    if( st->status == RM_ACTIVE ) rm_abort( uh );    assert( st->status == RM_REGISTERED || st->status == RM_DONE );    st->new_schedule = new_sched;    rm_state_init( st, &info->rv_class, info->myid, info->N );    st->status = RM_ACTIVE;}/*----------------------------------------------------------------------------*//* A processor has explicitly or implicitly asked us to start.                *//*----------------------------------------------------------------------------*/void rm_receive_start( RMUserHandle uh, int from_pe ){    RMUser *usr = (RMUser *)uh;    RMState *st = &usr->state;    int k = 0;    assert( st->status == RM_ACTIVE );    assert( 0 <= from_pe && from_pe < usr->info.N );    k = rm_find_proc( usr, from_pe, TRUE, 0 );    if( k < 0 )    {	/* It is possible that we receive a start message from a processor */	/* which is not in the schedule, because that processor can send */	/* to processors that are not in its schedule; see the code in this */	/* function below */if(st->dbg>=2){printf("%d heard STARTX from %d\n",usr->info.myid,from_pe);fflush(stdout);}    }    else    {        RMAction *a = &st->actions[k];	a->heard = TRUE;if(st->dbg>=2){printf("%d heard START from %d\n",usr->info.myid,from_pe);fflush(stdout);}    }    st->status = RM_ACTIVE;}/*----------------------------------------------------------------------------*//* A processor has sent its value.                                            *//*----------------------------------------------------------------------------*/void rm_receive_value( RMUserHandle uh, int from_pe, RVALUE_TYPE *recd_value ){    RMUser *usr = (RMUser *)uh;    RMState *st = &usr->state;    RVALUE_CLASS *rvc = &usr->info.rv_class;    int k = -1;    assert( st->status == RM_ACTIVE );if(st->dbg>=2){printf("%d recd new value from %d: ",usr->info.myid,from_pe);rvc->rv_print(stdout, recd_value); printf("\n");fflush(stdout);}    do    {        k = rm_find_proc( usr, from_pe, TRUE, k+1 );        assert( 0 <= k && k < st->nactions );	{	  RMAction *a = &st->actions[k];	  if( !a->procd )	  {	    a->procd = TRUE;	    a->heard = TRUE;	    rvc->rv_assign( a->recd_rvalue, recd_value );	    break;	  }	}    }while(1);    st->status = RM_ACTIVE;}/*----------------------------------------------------------------------------*/int rm_resume(    RMUserHandle uh,    RVALUE_TYPE *result_so_far,    RM_SEND_START_MSG send_start_function,    RM_SEND_VALUE_MSG send_value_function,    void *closure){    RMUser *usr = (RMUser *)uh;    RMState *st = &usr->state;    RVALUE_CLASS *rvc = &usr->info.rv_class;    int k = 0;    int blocked = FALSE;    assert( st->status == RM_ACTIVE );    /*Performance optimization*/    if( st->index_of_blocked_action >= 0 )    {        RMAction *a = &st->actions[st->index_of_blocked_action];        if( a->must_recv && !a->procd )	    return 0;        st->index_of_blocked_action = -1;    }    st->status = RM_PROCESSING;    rm_do_start( usr, send_start_function, closure );    rvc->rv_init( st->min_so_far );    for( k = 0; k < st->nactions; k++ )    {	RMAction *a = &st->actions[k];	if( a->must_recv && !a->procd )	{	    blocked = TRUE;	    st->index_of_blocked_action = k;	    break;	}	else if( a->must_recv && a->procd )	{	    /*Already received the value; use it to reduce/overwrite min*/	    if( a->reduce_or_overwrite )	    {	        rvc->rv_reduce( st->min_so_far, a->recd_rvalue ); /*reduce*/	    }	    else	    {	        rvc->rv_assign( st->min_so_far, a->recd_rvalue ); /*overwrite*/	    }	}	else if( !a->must_recv && !a->procd )	{	    send_value_function( uh, closure,				 usr->info.myid, a->id, st->min_so_far);	    a->procd = TRUE;	}	else	{	    /*Sending already done; do nothing*/	}    }    st->done = !blocked;    if( st->done )    {if(st->dbg>=2){printf("%d DONE! Min_so_far: ", usr->info.myid);rvc->rv_print(stdout, st->min_so_far); printf("\n");fflush(stdout);}        st->status = RM_DONE;    }    else    {        st->status = RM_ACTIVE;    }    rvc->rv_assign( result_so_far, st->min_so_far );    return (st->done ? 1 : 0);}/*----------------------------------------------------------------------------*/RMStatus rm_get_status( RMUserHandle uh, RVALUE_TYPE *rvalue ){    RMUser *usr = (RMUser *)uh;    RMState *st = &usr->state;    RVALUE_CLASS *rvc = &usr->info.rv_class;    rvc->rv_assign( rvalue, st->min_so_far );    return st->status;}/*----------------------------------------------------------------------------*//* Reduction management utilities.                                            *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 20July2000  *//*----------------------------------------------------------------------------*/static int dbg = 0;/*----------------------------------------------------------------------------*//* Returns the highest exponent of 2 which is contained in N                  *//*----------------------------------------------------------------------------*/static int highest_exponent_of_2_in( int N ){    int k = 0, M = N;    while( M > 1 )    {        k++;        M = M/2;    }if(dbg>=2){printf("highest_exponent_of_2_in(%d) = %d\n",N,k);fflush(stdout);}    return k;}/*----------------------------------------------------------------------------*//* Returns the number which is the highest power of 2 contained in N          *//*----------------------------------------------------------------------------*/static int highest_power_of_2_in( int N ){    int m = 1, M = N;    while( M > 1 )    {        m *= 2;        M = M/2;    }if(dbg>=2){printf("highest_power_of_2_in(%d) = %d\n",N,m);fflush(stdout);}    return m;}/*----------------------------------------------------------------------------*//* Returns the communication schedule based on star pattern.                  *//*----------------------------------------------------------------------------*/static void compute_all_to_all_schedule(    int i,    int N,    int mx,    int *ssize,    int ids[],    int rs[],    int js[],    int ro[]){    int ss = 0;  /* Size of the schedule returned */    assert( 0 <= i && i < N );    #define CHK() do{if(ss>=mx){printf("Add more actions!\n");return;}}while(0)    /* First receive local value from self */    {CHK(); ids[ss]=i; rs[ss]=TRUE; js[ss]=FALSE; ro[ss]=TRUE; ss++;}    {	int j = 0;	for( j = 0; j < N; j++ )	{	    if( j != i )              {CHK(); ids[ss]=j; rs[ss]=FALSE; js[ss]=FALSE;	       ro[ss]=FALSE; ss++;}/*send*/	}	for( j = 0; j < N; j++ )	{	    if( j != i )              {CHK(); ids[ss]=j; rs[ss]=TRUE; js[ss]=FALSE;	       ro[ss]=TRUE; ss++;} /*recv*/	}    }    #undef CHK    *ssize = ss;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -