📄 netmpi.cpp
字号:
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
#include "GetOpt.h"
#include <string.h>
#define DEFPORT 5002
#define TRIALS 7
#define REPEAT 1000
int g_NSAMP = 150;
#define PERT 3
int g_LATENCYREPS = 1000;
#define LONGTIME 1e99
#define CHARSIZE 8
#define PATIENCE 50
#define RUNTM 0.25
double g_STOPTM = 0.1;
#define MAXINT 2147483647
#define ABS(x) (((x) < 0)?(-(x)):(x))
#define MIN(x, y) (((x) < (y))?(x):(y))
#define MAX(x, y) (((x) > (y))?(x):(y))
int g_nIproc = 0, g_nNproc = 0;
typedef struct protocolstruct ProtocolStruct;
struct protocolstruct
{
int nbor, iproc;
};
typedef struct argstruct ArgStruct;
struct argstruct
{
/* This is the common information that is needed for all tests */
char *host; /* Name of receiving host */
short port; /* Port used for connection */
char *buff; /* Transmitted buffer */
char *buff1; /* Transmitted buffer */
int bufflen, /* Length of transmitted buffer */
tr, /* Transmit flag */
nbuff; /* Number of buffers to transmit */
/* Now we work with a union of information for protocol dependent stuff */
ProtocolStruct prot; /* Structure holding necessary info for TCP */
};
typedef struct data Data;
struct data
{
double t;
double bps;
double variance;
int bits;
int repeat;
};
double When();
int Setup(ArgStruct *p);
void Sync(ArgStruct *p);
void SendData(ArgStruct *p);
void RecvData(ArgStruct *p);
void SendRecvData(ArgStruct *p);
void SendTime(ArgStruct *p, double *t, int *rpt);
void RecvTime(ArgStruct *p, double *t, int *rpt);
int Establish(ArgStruct *p);
int CleanUp(ArgStruct *p);
double TestLatency(ArgStruct *p);
double TestSyncTime(ArgStruct *p);
void PrintOptions(void);
int DetermineLatencyReps(ArgStruct *p);
void PrintOptions()
{
printf("\n");
printf("Usage: netpipe flags\n");
printf(" flags:\n");
printf(" -reps #iterations\n");
printf(" -time stop_time\n");
printf(" -start initial_msg_size\n");
printf(" -end final_msg_size\n");
printf(" -out outputfile\n");
printf(" -nocache\n");
printf(" -headtohead\n");
printf(" -pert\n");
printf(" -noprint\n");
printf("Requires exactly two processes\n");
printf("\n");
}
void main(int argc, char *argv[])
{
FILE *out; /* Output data file */
char s[255]; /* Generic string */
char *memtmp;
char *memtmp1;
int i, j, n, nq, /* Loop indices */
bufoffset = 0, /* Align buffer to this */
bufalign = 16*1024, /* Boundary to align buffer to */
nrepeat, /* Number of time to do the transmission */
nzero = 0,
len, /* Number of bytes to be transmitted */
inc = 1, /* Increment value */
detailflag = 0, /* Set to examine the signature curve detail*/
bufszflag = 0, /* Set to change the TCP socket buffer size */
pert, /* Perturbation value */
ipert,
start = 1, /* Starting value for signature curve */
end = MAXINT, /* Ending value for signature curve */
streamopt = 0, /* Streaming mode flag */
printopt = 0; /* Debug print statements flag */
ArgStruct args; /* Argumentsfor all the calls */
double t, t0, t1, t2, /* Time variables */
tlast, /* Time for the last transmission */
tzero = 0,
latency, /* Network message latency */
synctime; /* Network synchronization time */
Data *bwdata; /* Bandwidth curve data */
bwdata = new Data[g_NSAMP];
short port = DEFPORT; /* Port number for connection */
bool bNoCache = false;
bool bHeadToHead = false;
bool bSavePert = false;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &g_nNproc);
MPI_Comm_rank(MPI_COMM_WORLD, &g_nIproc);
if (g_nNproc != 2)
{
if (g_nIproc == 0)
PrintOptions();
MPI_Finalize();
exit(0);
}
GetOpt(argc, argv, "-time", &g_STOPTM);
GetOpt(argc, argv, "-reps", &g_NSAMP);
GetOpt(argc, argv, "-start", &start);
GetOpt(argc, argv, "-end", &end);
bNoCache = GetOpt(argc, argv, "-nocache");
bHeadToHead = GetOpt(argc, argv, "-headtohead");
if (GetOpt(argc, argv, "-noprint"))
printopt = 0;
bSavePert = GetOpt(argc, argv, "-pert");
if (g_nIproc == 0)
strcpy(s, "Netpipe.out");
GetOpt(argc, argv, "-out", s);
printopt = 1;
if (start > end)
{
fprintf(stdout, "Start MUST be LESS than end\n");
exit(420132);
}
args.nbuff = TRIALS;
args.port = port;
Setup(&args);
Establish(&args);
if (args.tr)
{
if ((out = fopen(s, "w")) == NULL)
{
fprintf(stdout,"Can't open %s for output\n", s);
exit(1);
}
}
latency = TestLatency(&args);
synctime = TestSyncTime(&args);
if (args.tr)
{
SendTime(&args, &latency, &nzero);
}
else
{
RecvTime(&args, &latency, &nzero);
}
if (args.tr && printopt)
{
fprintf(stdout,"Latency: %lf\n", latency);
fprintf(stdout,"Sync Time: %lf\n", synctime);
fprintf(stdout,"Now starting main loop\n");
fflush(stdout);
}
tlast = latency;
inc = (start > 1 && !detailflag) ? start/2: inc;
args.bufflen = start;
/* Main loop of benchmark */
for (nq = n = 0, len = start;
n < g_NSAMP && tlast < g_STOPTM && len <= end;
len = len + inc, nq++)
{
if (nq > 2 && !detailflag)
inc = ((nq % 2))? inc + inc: inc;
/* This is a perturbation loop to test nearby values */
for (ipert = 0, pert = (!detailflag && inc > PERT + 1)? -PERT: 0;
pert <= PERT;
ipert++, n++, pert += (!detailflag && inc > PERT + 1)? PERT: PERT + 1)
{
/* Calculate howmany times to repeat the experiment. */
if (args.tr)
{
nrepeat = (int)(MAX((RUNTM / ((double)args.bufflen /
(args.bufflen - inc + 1.0) * tlast)), TRIALS));
SendTime(&args, &tzero, &nrepeat);
}
else
{
nrepeat = 1; /* Just needs to be greater than zero */
RecvTime(&args, &tzero, &nrepeat);
}
/* Allocate the buffer */
args.bufflen = len + pert;
/* printf("allocating %d bytes\n", args.bufflen * nrepeat + bufalign); */
if (bNoCache)
{
if ((args.buff = (char *)malloc(args.bufflen * nrepeat + bufalign)) == (char *)NULL)
{
fprintf(stdout,"Couldn't allocate memory\n");
break;
}
}
else
{
if ((args.buff = (char *)malloc(args.bufflen + bufalign)) == (char *)NULL)
{
fprintf(stdout,"Couldn't allocate memory\n");
break;
}
}
/* if ((args.buff1 = (char *)malloc(args.bufflen * nrepeat + bufalign)) == (char *)NULL) */
if ((args.buff1 = (char *)malloc(args.bufflen + bufalign)) == (char *)NULL)
{
fprintf(stdout,"Couldn't allocate memory\n");
break;
}
/* Possibly align the data buffer */
memtmp = args.buff;
memtmp1 = args.buff1;
if (!bNoCache)
{
if (bufalign != 0)
{
args.buff += (bufalign - ((int)args.buff % bufalign) + bufoffset) % bufalign;
/* args.buff1 += (bufalign - ((int)args.buff1 % bufalign) + bufoffset) % bufalign; */
}
}
args.buff1 += (bufalign - ((int)args.buff1 % bufalign) + bufoffset) % bufalign;
if (args.tr && printopt)
{
fprintf(stdout,"%3d: %9d bytes %4d times --> ",
n, args.bufflen, nrepeat);
fflush(stdout);
}
/* Finally, we get to transmit or receive and time */
if (args.tr)
{
bwdata[n].t = LONGTIME;
t2 = t1 = 0;
for (i = 0; i < TRIALS; i++)
{
if (bNoCache)
{
if (bufalign != 0)
{
args.buff = memtmp + ((bufalign - ((int)args.buff % bufalign) + bufoffset) % bufalign);
/* args.buff1 = memtmp1 + ((bufalign - ((int)args.buff1 % bufalign) + bufoffset) % bufalign); */
}
else
{
args.buff = memtmp;
/* args.buff1 = memtmp1; */
}
}
Sync(&args);
t0 = When();
for (j = 0; j < nrepeat; j++)
{
if (bHeadToHead)
SendRecvData(&args);
else
{
SendData(&args);
if (!streamopt)
{
RecvData(&args);
}
}
if (bNoCache)
{
args.buff += args.bufflen;
/* args.buff1 += args.bufflen; */
}
}
t = (When() - t0)/((1 + !streamopt) * nrepeat);
if (!streamopt)
{
t2 += t*t;
t1 += t;
bwdata[n].t = MIN(bwdata[n].t, t);
}
}
if (!streamopt)
SendTime(&args, &bwdata[n].t, &nzero);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -