📄 mpi.cpp
字号:
//=================================================================
// Name : 基于分段的平行排序
// Author : 袁冬(2107020046)
// Copyright : 中国海洋大学
// LastUpdate : 2007.10.03
// Develop Evrionment : Windows Server 2003 SP2
// + Eclipse 3.3.1 + CDT 4.0.1
// + MinGW 3.81 + GDB 6.6
// + mpich2-1.0.6-win32-ia32
//=================================================================
/*
* 算法描述: 分段 -> 段内排序 -> 归约结果。
* 1,根进程读取输入,将元素个数广播给各个进程。
* 2,然后各进程计算段长度和段偏移。
* 3,然后根进程选择第一个段,标记站位符。
* 4,跟进程将剩余元素发送给下一进程,下一进程选择段的同时,根进程排序。
* 5,下一进程继续此过程,直到最后一个进程,所有元素都进行排序。
* 6,进程将排序好的元素,按照段偏移归约给根进程。
* 7,根进程输入结果。
*
* 时间复杂度计算:
* 设共有进程p,共有元素n
* 则段数为p,每段长度为m = n / p
* 最长时间为从根进程分段开始,至末进程规约为止,又注意到,分段时串行进行的,故
*
* t = 分段时间 * 段数 + 最后一段的排序时间
*
* 用大欧米伽表示法表示
*
* O(分段时间) = m * n = n * n / p
* O(最后一段的排序时间) = m * m
*
* 所以
*
* O(t) = n * n / p * p + m * m
* = n * n + m * m;
* = n * n
*
* 所以,此算法排序时间复杂度为n的平方,和起泡排序的时间复杂度相同。
*
* 分析与优化:
* 从时间复杂度的分析可以知道,算法的瓶颈在于分段算法,因为该算法从本质上讲,是串行进行的。
* 因个人水平有限,分段没有找到并行算法,导致整个算法为伪并行。
* 但是有一个办法可以将分算法的时间减半,
* 即从最大和最小两边开始,分别进行分组,可以使时间复杂度减低一半,但总体时间复杂度的O认为n*n。
*
* 另外,在“取得当前段”的算法中,如果每次循环i时,段索引没有改变,再下一轮时,可以不再遍历。
* 相当于加入缓存,估计此缓存命中几率比较高,可以较大幅度的改善时间复杂度。
*
*/
//#define DEBUG 1 //调试符号
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <time.h>
#include "mpi.h"
using namespace std;
#define TRUE 1
#define FALSE 0
#define Bool int
#define MAX_LENGTH 5000 //允许的最大元素个数
#define MAX_LENGTH_WITH_MARK 10000 //允许的最大元素数及其对应的标记的长度,此值应为MAX_LENGTH的两倍
int pID, pSize; //pID:当前进程ID,pSize:总的进程数
int input[MAX_LENGTH][2], length, packageSize; //input:输入,length:输入的元素个数,packageSize:段大小
int result[MAX_LENGTH], finalResult[MAX_LENGTH], packageIndex[MAX_LENGTH]; //result:当前结果,finalResult:最终结果,packageIndex:段值索引
int resultOffset; //resultOffset:短偏移
//读取输入
void read() {
int i;
//读取元素个数
printf("Input the length of the array(100-5000):");
fflush(stdout);
scanf("%d", &length);
//读取元素
printf("Input the element of the array total %d:", length);
fflush(stdout);
//randomize();
for (i = 0; i < length; i++) {
//读取元素,经对应站位标记标记为0
input[i][0] = rand();
input[i][1] = 0;
}
}
//输出
void output() {
int i;
printf("\n");
for (i = 0; i < length; i++)
printf("%d\t", finalResult[i]);
fflush(stdout);
}
//准备:计算进程常量
int prepare() {
packageSize = length / pSize; //段大小
cout << endl << length << endl << pSize << endl;
resultOffset = packageSize * pID; //结果起始偏移值,表示该段位于整体的哪个部分
//对于最后一个进程,需要修正段大小,包含所有余下的元素
if (pID == pSize - 1)
packageSize += length % pSize;
#ifdef DEBUG
//调试信息:段大小
printf("resultOffset: %d, From %d.\n", resultOffset, pID);
#endif
return 0;
}
//分段,取得当前进程负责的段
void findCurrentRange() {
int i, j = 0, maxIndex, beginIndex = 0;
//填充默认的值
for (i = 0; i < packageSize; i++) {
while (input[beginIndex][1])
beginIndex++;
packageIndex[i] = beginIndex;
beginIndex++;
}
#ifdef DEBUG
//调试信息:默认值
for (i = 0; i < packageSize; i++)
printf("%d", packageIndex[i]);
printf(" From %d\n", pID);
#endif
//查找所有元素,找到最小的packageSize个元素,取得其索引值
for (i = beginIndex; i < length; i++) {
//忽略被其他进程占用的元素
if (input[i][1])
continue;
//查找比当前元素索引中最大的元素的索引
maxIndex = 0;
for (j = 1; j < packageSize; j++)
if (input[packageIndex[j]][0] > input[packageIndex[maxIndex]][0])
maxIndex = j;
//如果元素索引中的最大的小于当前元素,则替换
if (input[packageIndex[maxIndex]][0] > input[i][0])
packageIndex[maxIndex] = i;
}
#ifdef DEBUG
//调试信息:当前段索引,用于判断是否取得了正确的段索引
for (i = 0; i < packageSize; i++)
printf("%d", packageIndex[i]);
printf(" From %d\n", pID);
#endif
//将索引转化为值,存放在result中,并标记输入信息,表明已占用
for (j = 0; j < packageSize; j++) {
result[resultOffset + j] = input[packageIndex[j]][0];
input[packageIndex[j]][1] = 1;
}
#ifdef DEBUG
//调试信息:排序前的当前段,用于判断是否取得了正确的段
for (i = 0; i < length; i++)
printf("%d", result[i]);
printf(" From %d\n", pID);
#endif
}
//排序一个段
void sort() {
//段内起泡
int i, j, temp;
for (i = 0; i < packageSize; i++)
for (j = 0; j < packageSize; j++) {
if (result[resultOffset + i] < result[resultOffset + j]) {
temp = result[resultOffset + i];
result[resultOffset + i] = result[resultOffset + j];
result[resultOffset + j] = temp;
}
}
#ifdef DEBUG
//调试信息:排序后的当前段
for (i = 0; i < length; i++)
printf("%d", result[i]);
printf(" From %d\n", pID);
#endif
}
//主处理进程
void process() {
//取得该进程负责的段
findCurrentRange();
//如果此进程不是最后一个进程,则将剩余部分传递给下一个进程
if (pID != pSize - 1)
MPI_Send(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID + 1, 0, MPI_COMM_WORLD);
//排序当前进程负责的段
sort();
//归约结果,因为最终结果初始皆为零,故采用求和的形式归约当前结果到最终结果
MPI_Reduce(result, finalResult, MAX_LENGTH_WITH_MARK, MPI_INT, MPI_SUM, 0,
MPI_COMM_WORLD);
}
//入口,主函数
int main(int argc, char* argv[]) {
clock_t totalStart,totalEnd,sortStart,sortEnd;
MPI_Status status; //无用
MPI_Init(&argc, &argv); //初始化
MPI_Comm_rank(MPI_COMM_WORLD, &pID); //取得当前进程的ID
MPI_Comm_size(MPI_COMM_WORLD, &pSize); //取得总进程数
//根进程负责输入
if (!pID)
{
read();
totalStart = clock();
}
//广播输入数数组的长度
MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
//准备:计算各进程常量
prepare();
sortStart = clock();
//从根进程启动,处理第一段
if (!pID)
process();
else {
//其余进程等待上一进程传递的数据
MPI_Recv(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID - 1, 0, MPI_COMM_WORLD,
&status);
//收到数据后进行处理
process();
}
//根进程负责输出
if (!pID)
{
sortEnd = clock();
output();
totalEnd = clock();
printf("\n------------------------------\n");
printf("Total time is %f seconds\n",(double)(totalEnd - totalStart) / CLOCKS_PER_SEC);
printf("Sort time is %f seconds\n",(double)(sortEnd - sortStart) / CLOCKS_PER_SEC);
}
//结束
MPI_Finalize();
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -