⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 parallelapriori.cpp

📁 关联规则并行算法Count Distribution的C与MPI实现
💻 CPP
📖 第 1 页 / 共 3 页
字号:

#include "stdio.h"
#include <fstream.h>
#include <iostream.h>
#include "stdlib.h"
#include "string.h"
#include <iomanip.h>
#include <time.h>
#include "mpi.h"

struct tItem
{
	int tag;
	char itemset[15];
	int count;
	struct tItem *next;
};

#define LEN sizeof(struct tItem);

int GetColNum(char fileName1[]);  //读取源数据的列数
int GetRecordNum(char fileName2[]);  //读取源数据的行数
int Index(char *List, char c, int ListLength);
void GetSourceFile(char fileName[],int recordNum,int colNum);  //获取源数据
void FindFrequence1itemsets(char **fullSet1,double sup);
void Gen_association_rules(tItem *kf, tItem *p, double minCon);  //生成关联规则
void SetminSup();  //设置最小值支持度
void SetminCon();  //设置最小置信度
struct tItem *Find_candidate_1itemsets(char **fullSet2, double sup2,int recordNum,int colNum);  //生成频繁1项集
struct tItem *apriori_gen(tItem *freq);  //生成候选项集
struct tItem *Gen_candidate_itemset(tItem *cp,tItem *cp1,tItem *cp2);  //生成后选项
struct tItem *Gen_frequence(tItem *frequence, int recordNum, int colNum, int minNum, char **fullSet);  //生成频繁项集
struct tItem *Gen_frequence1(tItem *frequence, int recordNum, int colNum, int minNum, char **fullSet);
struct tItem *Compute_candidateItem_Num(tItem *c,int recordNum, int colNum, char **fullSet);  //计算候选项集中每一项的最小支持数
struct tItem *Gen_kFrequence_subsets(tItem *f);  //生成频繁k项集每项的子集的集合
struct tItem *SubSet(char *List, int m, char *Buffer, int flag, tItem *fis, int ListLength);  //生成频繁集中每一项的子集
struct tItem *AddToSubset(char *Buffer, int flag, tItem *fis);  //将一个子集加入链表
struct tItem *Combine_candidate_itemsets(tItem *lc, char rItemset, int rCount);  //合并局部候选1项集成为总的候选1项集
struct tItem *Combine_candidate_k_itemsets(tItem *lc,tItem *r);
bool Has_infrequence_subset(tItem *pp, tItem *f);  //判断候选k项集的子集是否在频繁k-1项集中
bool Is_before_equal(tItem *pp1, tItem *pp2);  //判断两个项目集的前k-1项是否相同
static void masterprocess(int rankNum);
static void slaveprocess(int rankNum);

char **fullSet;
int colNum=0;
int recordNum=0;
double minSup=0.37;
double minCon=0.50;

int main(int argc,char *argv[])
{
	int rank,size;

	MPI_Init(&argc,&argv);
	MPI_Comm_rank(MPI_COMM_WORLD,&rank);
	MPI_Comm_size(MPI_COMM_WORLD,&size);

	if(rank==0)
	{
		//printf("#%d#\n",size);
		masterprocess(size);
	}
	else
	{
		slaveprocess(size);
	}

	MPI_Finalize();
	return(0);
}

void SetminSup()
{
	cout<<"Please input the minSup: ";
	cin>>minSup;
}

void SetminCon()
{
	cout<<"Please input the minCon: ";
	cin>>minCon;
}

int GetColNum(char fileName1[])  //读取源数据的列数
{
	ifstream file(fileName1);

	char ch;
	//int colNum=0;
	while(ch!='\n')
	{
		if(ch==',')
		{
			colNum++;
			ch=file.get();
		}
		ch=file.get();
	}
	//colNum++;
	file.close();
	return colNum;
}


int GetRecordNum(char fileName2[])  //读取源数据的行数
{
	ifstream file(fileName2);

	char ch=NULL;
	//int recordNum=0;

	while(!file.eof())
	{
		if(ch=='\n')
		{
			recordNum++;
			ch=file.get();
		}
		ch=file.get();
	}

	file.close();
	return recordNum;
}

void GetSourceFile(char fileName[],int rNum,int cNum)  //获取源数据读入数组fullSet[][]中
{
	ifstream file(fileName);

	char ch;
	//char **fullSet;
	int item=0;
	int i=0,j=0;
	int temp1='A';
	int temp2='a';

	/*b=(int**)malloc(row*sizeof(int*));
	for(i=0;i<row;i++)
	{
		b[i]=(int*)malloc(col*sizeof(int));
	}*/

	fullSet=(char**)malloc(rNum*sizeof(char*));
	for(int m=0;m<rNum;m++)
	{
		fullSet[m]=(char*)malloc(cNum*sizeof(char));
	}
	
	while(i<rNum)
	{
		while(j<colNum)
		{
			if(ch==',')
			{
				item++;
				ch=file.get();
				if(ch=='T')
				{
					fullSet[i][j]=(char)(item+temp1);
					//printf("%c",fullSet[i][j]);
				}
				if(ch=='F')
				{
					fullSet[i][j]=(char)(item+temp2);
					//printf("%c",fullSet[i][j]);
				}
				j++;
			}
			ch=file.get();
		}
		//printf("\n");
		j=0;
		ch=file.get();
		item=0;
		i++;
	}
	file.close();
}

static void masterprocess(int rankNum)
{

	char inputFileName[]="c:\\shujuku.txt";
	char *buffer;
	int colNum,recordNum,blockNum,bound,i,j,m,k;
	int rank,destRank,tag;
    double starttime,endtime;

	MPI_Status status;
	MPI_Comm_rank(MPI_COMM_WORLD,&rank);
    starttime=MPI_Wtime();

	//SetminSup();
	//SetminCon();

	//printf("#%d#\n",rankNum);

	colNum=GetColNum(inputFileName);
	recordNum=GetRecordNum(inputFileName);

	//system("pause");
	printf("recordNum= %d", recordNum);
	printf("\n");
	printf("colNum= %d",colNum);
	printf("\n");

	GetSourceFile(inputFileName,recordNum,colNum);
	
	//system("pause");

	/*for(i=0;i<recordNum;i++)
	{
		for(j=0;j<colNum;j++)
		{
			printf(" %c ",fullSet[i][j]);
		}
		printf("\n");
	}*/

	blockNum=recordNum/(rankNum-1);
	bound=blockNum*(rankNum-1);
	//printf("%d\n",blockNum);

	m=0;
	k=1;
	destRank=1;
	tag=1;
	buffer=(char*)malloc(colNum*sizeof(char*));
	while(m!=recordNum)
	{
		if(m==(k*blockNum-1)&&(m!=bound-1))
		{
			MPI_Send(&colNum,1,MPI_INT,destRank,100,MPI_COMM_WORLD);
			MPI_Send(&recordNum,1,MPI_INT,destRank,101,MPI_COMM_WORLD);
			MPI_Send(&blockNum,1,MPI_INT,destRank,tag,MPI_COMM_WORLD);
			//printf("%d\n",blockNum);
			for(i=((k-1)*blockNum);i<k*blockNum;i++)
			{
				for(j=0;j<colNum;j++)
				{
					buffer[j]=fullSet[i][j];
					printf("%c ",buffer[j]);
				}
				MPI_Send(buffer,colNum,MPI_CHAR,destRank,tag,MPI_COMM_WORLD);
				printf("\n");
			}
			printf("\n");
			destRank++;
			tag++;
			k++;
		}
		else if(m==bound-1)
		{
			blockNum=recordNum-m-1+blockNum;
			MPI_Send(&colNum,1,MPI_INT,destRank,100,MPI_COMM_WORLD);
			MPI_Send(&recordNum,1,MPI_INT,destRank,101,MPI_COMM_WORLD);
			MPI_Send(&blockNum,1,MPI_INT,destRank,tag,MPI_COMM_WORLD);
			//MPI_Send(&flag,1,MPI_INT,destRank,102,MPI_COMM_WORLD);
			//printf("%d\n",blockNum);
			for(i=bound-blockNum;i<recordNum;i++)
			{
				for(j=0;j<colNum;j++)
				{
					buffer[j]=fullSet[i][j];
					printf("%c ",buffer[j]);
				}
				MPI_Send(buffer,colNum,MPI_CHAR,destRank,tag,MPI_COMM_WORLD);
				printf("\n");
			}
			printf("\n");
		}

		m++;
	}


	/*int s,rItemNum;
	tItem *r;
	r=(struct tItem*)malloc(sizeof(struct tItem));

	s=0;
	MPI_Recv(&rItemNum,1,MPI_INT,1,1000,MPI_COMM_WORLD,&status);
	while(s<rItemNum)
	{
		for(j=0;j<=5;j++)
		{
			MPI_Recv(&r->itemset[j],1,MPI_CHAR,1,1001,MPI_COMM_WORLD,&status);
		}
		MPI_Recv(&r->count,1,MPI_INT,1,1002,MPI_COMM_WORLD,&status);
		MPI_Recv(&r->tag,1,MPI_INT,0,1003,MPI_COMM_WORLD,&status);
		s++;
	}*/

	endtime=MPI_Wtime();
	printf("The master process tooks %f seconds.\n",endtime-starttime);
	printf("\n");
}

static void slaveprocess(int rankNum)
{

	int rowNum,recordNum,colNum,tag,stag,rank,destRank,m,i,j;
	char **fSet,*buffer;
	double minSup=0.45;
    double minCon=0.50;
	double starttime,endtime;

	MPI_Status status;
	MPI_Comm_rank(MPI_COMM_WORLD,&rank);
	starttime=MPI_Wtime();

	MPI_Recv(&colNum,1,MPI_INT,0,100,MPI_COMM_WORLD,&status);
	MPI_Recv(&recordNum,1,MPI_INT,0,101,MPI_COMM_WORLD,&status);
	//printf("%d\n",recordNum);
	//printf("%d\n",colNum);

	tag=rank;
	MPI_Recv(&rowNum,1,MPI_INT,0,tag,MPI_COMM_WORLD,&status);
	printf("\n");
	printf("\n");
    printf("The block in rank %d has %d records.\n",rank,rowNum);
    //MPI_Recv(&flag,1,MPI_INT,0,102,MPI_COMM_WORLD,&status);

	buffer=(char*)malloc(colNum*sizeof(char*));
	fSet=(char**)malloc(rowNum*sizeof(char*));
	for(m=0;m<rowNum;m++)
	{
		fSet[m]=(char*)malloc(colNum*sizeof(char));
	}

	//接收进程0发送来的初始矩阵的一块
	for(i=0;i<rowNum;i++)
	{
		MPI_Recv(buffer,colNum,MPI_CHAR,0,tag,MPI_COMM_WORLD,&status);
		for(j=0;j<colNum;j++)
		{
			fSet[i][j]=buffer[j];
		}
	}

	printf("\n");
	printf("Rank %d receives part of the matrix is as follow:\n",rank);
	printf("\n");
	for(i=0;i<rowNum;i++)
	{
		for(j=0;j<colNum;j++)
		{
			printf("%c ",fSet[i][j]);
		}
		printf("\n");
	}
	printf("\n");

	tItem *c;  //定义局部候选项集
	tItem *cr;  //接收其他进程发送来的局部候选集
	tItem *p;
	int count=0,s,rCount,rTag;
	char rItemset;


	c=(struct tItem*)malloc(sizeof(struct tItem));
	c=Find_candidate_1itemsets(fSet,minSup,rowNum,colNum);

	destRank=1;
	stag=1;
	
	p=c;
	while(p!=NULL)
	{
		count++;
		p=p->next;
	}

	for(i=1;i<=rankNum-1;i++)  //向其他进程发送局部候选1项集
	{
		p=c;
		if(i!=rank)
		{
			MPI_Send(&count,1,MPI_INT,destRank,14,MPI_COMM_WORLD);
			while(p!=NULL)
			{
				MPI_Send(&p->itemset[p->tag],1,MPI_CHAR,destRank,15,MPI_COMM_WORLD);
				MPI_Send(&p->count,1,MPI_INT,destRank,16,MPI_COMM_WORLD);
				MPI_Send(&p->tag,1,MPI_INT,destRank,17,MPI_COMM_WORLD);
				p=p->next;
			}
		}
		destRank++;
		stag++;	
	}
	
	int reRank=1,reTag=1,itemNum;
	for(j=1;j<=rankNum-1;j++)  //接收其他进程发送来的局部候选1项集
	{
		if(j!=rank)
		{
			s=0;
			MPI_Recv(&itemNum,1,MPI_INT,reRank,14,MPI_COMM_WORLD,&status);
			while(s<itemNum)
			{
				MPI_Recv(&rItemset,1,MPI_CHAR,reRank,15,MPI_COMM_WORLD,&status);
				MPI_Recv(&rCount,1,MPI_INT,reRank,16,MPI_COMM_WORLD,&status);
				MPI_Recv(&rTag,1,MPI_INT,reRank,17,MPI_COMM_WORLD,&status);
				
				c=Combine_candidate_itemsets(c,rItemset,rCount);

				//printf("  %c",rItemset);
				//printf("  %d",rCount);
				//printf("  %d",rTag);
				//printf("\n");
				s++;
			}
			printf("\n");
		}
		reRank++;
		reTag++;
	}

	//输出合并后得到的全部候选1项集
	p=c;
	printf("Now every rank get the whole candidate itemsets as follow:\n\n");
	while(p!=NULL)
	{
		printf("  %c",p->itemset[p->tag]);
		printf("  %d",p->count);
		printf("  %d",p->tag);
		printf("\n");
		p=p->next;
	}
	
	//由候选1项集生成频繁1项集 
	tItem *pp,*fHead,*fp,*temp;
	int minNum;

	fp=fHead=(struct tItem*)malloc(sizeof(struct tItem));
	minNum=(int)((recordNum*minSup)+1);
	//printf("%d",minNum);
	pp=c;

	while(pp!=NULL)
	{
		if(pp->count>minNum)
		{
			fHead->count=pp->count;
			fHead->tag=pp->tag;
			fHead->itemset[fHead->tag]=pp->itemset[pp->tag];
			fHead->next=NULL;
			break;
		}
		pp=pp->next;
	}
	
	fp=fHead;
	pp=pp->next;

	while(pp!=NULL)
	{
		if(pp->count>minNum)
		{
			temp=(struct tItem*)malloc(sizeof(struct tItem));
			temp->count=pp->count;
			temp->next=NULL;
			temp->tag=pp->tag;
			temp->itemset[temp->tag]=pp->itemset[pp->tag];
			fp->next=temp;
			fp=temp;
			//printf("%c",pp->itemset[pp->tag]);
		}
		pp=pp->next;
	}

	/*printf("\n");
	printf("--------The local frequence 1_itemsets--------\n");
	fp=fHead;
	while(fp!=NULL)
	{
		printf("  %c",fp->itemset[fp->tag]);
		printf("  %d",fp->count);
		printf("  %d",fp->tag);
		printf("\n");
		fp=fp->next;
	}*/

	tItem *f1,*kFrequence,*kf,*fis,*kfTemp;

	//kFrequence=(struct tItem*)malloc(sizeof(struct tItem));
	f1=(struct tItem*)malloc(sizeof(struct tItem));
	//f1=Find_frequence_1itemsets(fullSet,minSup);

	f1=fHead;

	//system("pause");
	//printf(" %d",rowNum);
	//printf(" %d*&*&*&\n",colNum);
	kFrequence=Gen_frequence(f1,rowNum,colNum,minNum,fSet);//经过连接和剪枝生成最终的频繁k项集

	i=2;
	while(i<=5)
	{
		kfTemp=Gen_frequence(kFrequence,rowNum,colNum,minNum,fSet);
		if(kfTemp==NULL)
		{
			break;
		}
		kFrequence=kfTemp;
		i++;
	}
	//MPI_Barrier(MPI_COMM_WORLD);
	/*printf("^^^^^^^^^^^^^^^\n");
	kf=kFrequence;
	while(kf!=NULL)
	{
		for(j=0;j<=kf->tag;j++)
		{
			printf("  %c",kf->itemset[j]);
		}
		printf("  %d",kf->count);
		printf("  %d",kf->tag);
		printf("\n");
		kf=kf->next;
	}
	printf("^^^^^^^^^^^^^^^\n");*/
	

	printf("\n");
	kf=kFrequence;
	printf("--------The frequence %d_itemsets--------\n",kf->tag+1);
	while(kf!=NULL)
	{
		for(j=0;j<=kf->tag;j++)
		{
			printf("  %c",kf->itemset[j]);
		}
		printf("  %d",kf->count);
		printf("  %d",kf->tag);
		printf("\n");
		kf=kf->next;
	}
	printf("\n");

	/*kf=kFrequence;
	int kItemNum=0;
	while(kf!=NULL)
	{
		kItemNum++;
		kf=kf->next;
	}

	kf=kFrequence;
	if(rank==1)
	{
		MPI_Send(&kItemNum,1,MPI_INT,0,1000,MPI_COMM_WORLD);
		while(kf!=NULL)
		{
			for(j=0;j<=kf->tag;j++)
			{
				MPI_Send(&kf->itemset[j],1,MPI_CHAR,0,1001,MPI_COMM_WORLD);
			}
			MPI_Send(&kf->count,1,MPI_INT,0,1002,MPI_COMM_WORLD);
			MPI_Send(&kf->tag,1,MPI_INT,0,1003,MPI_COMM_WORLD);
			kf=kf->next;
		}
	}*/

	kf=kFrequence;
	printf("\n\nThe subset of the frequence is:\n\n");
	while(kf!=NULL)
	{
		
		fis=Gen_kFrequence_subsets(kf);  //生成频繁k项集每项的子集的集合
		fis=Compute_candidateItem_Num(fis,rowNum,colNum,fSet); //计算每一个子集的支持度

		/*p=fis;
		p=p->next;
		while(p!=NULL)
		{
			for(i=0;i<=p->tag;i++)
			{
				printf("  %c",p->itemset[i]);
			}
			printf("  %d",p->count);
			printf("  %d",p->tag);
			printf("\n");
			p=p->next;
		}*/

		printf("\n\nNow we get the association rules:\n\n");
		p=fis;
		p=p->next;
		while(p!=NULL)
		{
			//printf("\n");
			Gen_association_rules(kf,p,minCon);
			printf("\n");
			p=p->next;
		}

		kf=kf->next;
	}
    endtime=MPI_Wtime();
	printf("The slave process %d tooks %f seconds.\n",rank,endtime-starttime);
	printf("\n");

}

struct tItem *Gen_frequence(tItem *frequence, int recordNum, int colNum, int minNum, char **fullSet)  //生成频繁项集
{

	tItem *candidate,*cp,*fp,*fpb,*p,*p1;
	int i,j,k,m,count=0,rank,rankNum,destRank,stag;
	//int i,m,n;
	//char *buffer;
	MPI_Status status;
	MPI_Comm_rank(MPI_COMM_WORLD,&rank);
	MPI_Comm_size(MPI_COMM_WORLD,&rankNum);

	/*printf("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n");
	printf("##### %d\n",recordNum);
	printf("##### %d\n",colNum);
	printf("##### %d\n",minNum);
	for(i=0;i<recordNum;i++)
	{
		for(j=0;j<colNum;j++)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -