⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpi.cpp

📁 * 算法描述: 分段 -> 段内排序 -> 归约结果。 * 1
💻 CPP
字号:
//=================================================================  
// Name : 基于分段的平行排序  
// Author : 袁冬(2107020046)  
// Copyright : 中国海洋大学  
// LastUpdate : 2007.10.03      
// Develop Evrionment : Windows Server 2003 SP2  
//                        + Eclipse 3.3.1 + CDT 4.0.1   
//                        + MinGW 3.81 + GDB 6.6   
//                        + mpich2-1.0.6-win32-ia32
//=================================================================

/*
* 算法描述: 分段 -> 段内排序 -> 归约结果。
* 1,根进程读取输入,将元素个数广播给各个进程。
* 2,然后各进程计算段长度和段偏移。
* 3,然后根进程选择第一个段,标记站位符。
* 4,跟进程将剩余元素发送给下一进程,下一进程选择段的同时,根进程排序。
* 5,下一进程继续此过程,直到最后一个进程,所有元素都进行排序。
* 6,进程将排序好的元素,按照段偏移归约给根进程。
* 7,根进程输入结果。
* 
* 时间复杂度计算:
* 设共有进程p,共有元素n
* 则段数为p,每段长度为m = n / p
* 最长时间为从根进程分段开始,至末进程规约为止,又注意到,分段时串行进行的,故
* 
*         t = 分段时间 * 段数 + 最后一段的排序时间
* 
* 用大欧米伽表示法表示
* 
*         O(分段时间) = m * n = n * n / p
*         O(最后一段的排序时间) = m * m
* 
* 所以
* 
*         O(t) = n * n / p * p + m * m
*              = n * n + m * m; 
*              = n * n
* 
* 所以,此算法排序时间复杂度为n的平方,和起泡排序的时间复杂度相同。
* 
* 分析与优化:
* 从时间复杂度的分析可以知道,算法的瓶颈在于分段算法,因为该算法从本质上讲,是串行进行的。
* 因个人水平有限,分段没有找到并行算法,导致整个算法为伪并行。
* 但是有一个办法可以将分算法的时间减半,
* 即从最大和最小两边开始,分别进行分组,可以使时间复杂度减低一半,但总体时间复杂度的O认为n*n。
* 
* 另外,在“取得当前段”的算法中,如果每次循环i时,段索引没有改变,再下一轮时,可以不再遍历。
* 相当于加入缓存,估计此缓存命中几率比较高,可以较大幅度的改善时间复杂度。
* 
*/

//#define DEBUG 1 //调试符号
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <time.h>
#include "mpi.h"

using namespace std;

#define TRUE 1
#define FALSE 0
#define Bool int

#define MAX_LENGTH 5000 //允许的最大元素个数
#define MAX_LENGTH_WITH_MARK 10000 //允许的最大元素数及其对应的标记的长度,此值应为MAX_LENGTH的两倍


int pID, pSize; //pID:当前进程ID,pSize:总的进程数
int input[MAX_LENGTH][2], length, packageSize; //input:输入,length:输入的元素个数,packageSize:段大小
int result[MAX_LENGTH], finalResult[MAX_LENGTH], packageIndex[MAX_LENGTH]; //result:当前结果,finalResult:最终结果,packageIndex:段值索引
int resultOffset; //resultOffset:短偏移

//读取输入
void read() {
	int i;
	//读取元素个数
	printf("Input the length of the array(100-5000):");
	fflush(stdout);
	scanf("%d", &length);

	//读取元素
	printf("Input the element of the array total %d:", length);
	fflush(stdout);
	//randomize(); 
	for (i = 0; i < length; i++) {
		//读取元素,经对应站位标记标记为0
		input[i][0] = rand();
		input[i][1] = 0;
	}
}
//输出
void output() {
	int i;
	printf("\n");
	for (i = 0; i < length; i++)
		printf("%d\t", finalResult[i]);
	fflush(stdout);
}

//准备:计算进程常量
int prepare() {
	packageSize = length / pSize; //段大小
	cout << endl << length << endl << pSize << endl;
	resultOffset = packageSize * pID; //结果起始偏移值,表示该段位于整体的哪个部分

	//对于最后一个进程,需要修正段大小,包含所有余下的元素
	if (pID == pSize - 1)
		packageSize += length % pSize;

#ifdef DEBUG
	//调试信息:段大小
	printf("resultOffset: %d, From %d.\n", resultOffset, pID);
#endif
	return 0;
}

//分段,取得当前进程负责的段
void findCurrentRange() {
	int i, j = 0, maxIndex, beginIndex = 0;

	//填充默认的值
	for (i = 0; i < packageSize; i++) {
		while (input[beginIndex][1])
			beginIndex++;
		packageIndex[i] = beginIndex;
		beginIndex++;
	}

#ifdef DEBUG    
	//调试信息:默认值
	for (i = 0; i < packageSize; i++)
		printf("%d", packageIndex[i]);
	printf(" From %d\n", pID);
#endif

	//查找所有元素,找到最小的packageSize个元素,取得其索引值
	for (i = beginIndex; i < length; i++) {
		//忽略被其他进程占用的元素
		if (input[i][1])
			continue;

		//查找比当前元素索引中最大的元素的索引
		maxIndex = 0;
		for (j = 1; j < packageSize; j++)
			if (input[packageIndex[j]][0] > input[packageIndex[maxIndex]][0])
				maxIndex = j;

		//如果元素索引中的最大的小于当前元素,则替换
		if (input[packageIndex[maxIndex]][0] > input[i][0])
			packageIndex[maxIndex] = i;
	}
#ifdef DEBUG    
	//调试信息:当前段索引,用于判断是否取得了正确的段索引
	for (i = 0; i < packageSize; i++)
		printf("%d", packageIndex[i]);
	printf(" From %d\n", pID);
#endif

	//将索引转化为值,存放在result中,并标记输入信息,表明已占用
	for (j = 0; j < packageSize; j++) {
		result[resultOffset + j] = input[packageIndex[j]][0];
		input[packageIndex[j]][1] = 1;
	}

#ifdef DEBUG    
	//调试信息:排序前的当前段,用于判断是否取得了正确的段
	for (i = 0; i < length; i++)
		printf("%d", result[i]);
	printf(" From %d\n", pID);
#endif
}
//排序一个段
void sort() {
	//段内起泡
	int i, j, temp;
	for (i = 0; i < packageSize; i++)
		for (j = 0; j < packageSize; j++) {
			if (result[resultOffset + i] < result[resultOffset + j]) {
				temp = result[resultOffset + i];
				result[resultOffset + i] = result[resultOffset + j];
				result[resultOffset + j] = temp;
			}
		}
#ifdef DEBUG
		//调试信息:排序后的当前段
		for (i = 0; i < length; i++)
			printf("%d", result[i]);
		printf(" From %d\n", pID);
#endif
}
//主处理进程
void process() {
	//取得该进程负责的段
	findCurrentRange();
	//如果此进程不是最后一个进程,则将剩余部分传递给下一个进程
	if (pID != pSize - 1)
		MPI_Send(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID + 1, 0, MPI_COMM_WORLD);

	//排序当前进程负责的段
	sort();
	//归约结果,因为最终结果初始皆为零,故采用求和的形式归约当前结果到最终结果
	MPI_Reduce(result, finalResult, MAX_LENGTH_WITH_MARK, MPI_INT, MPI_SUM, 0,
		MPI_COMM_WORLD);
}

//入口,主函数
int main(int argc, char* argv[]) {

	clock_t totalStart,totalEnd,sortStart,sortEnd;

	MPI_Status status; //无用

	MPI_Init(&argc, &argv); //初始化
	MPI_Comm_rank(MPI_COMM_WORLD, &pID); //取得当前进程的ID
	MPI_Comm_size(MPI_COMM_WORLD, &pSize); //取得总进程数

	//根进程负责输入
	if (!pID)
	{
		read();
		totalStart = clock();
	}

	//广播输入数数组的长度
	MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
	//准备:计算各进程常量
	prepare();

	sortStart = clock();

	//从根进程启动,处理第一段
	if (!pID)
		process();
	else {
		//其余进程等待上一进程传递的数据
		MPI_Recv(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID - 1, 0, MPI_COMM_WORLD,
			&status);
		//收到数据后进行处理
		process();
	}


	//根进程负责输出
	if (!pID)
	{
		sortEnd = clock();
		output();
		totalEnd = clock();

		printf("\n------------------------------\n");
		printf("Total time is %f seconds\n",(double)(totalEnd - totalStart) / CLOCKS_PER_SEC);
		printf("Sort time is %f seconds\n",(double)(sortEnd - sortStart) / CLOCKS_PER_SEC);
	}

	//结束
	MPI_Finalize();
	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -