📄 parallelapriori.cpp
字号:
#include "stdio.h"
#include <fstream.h>
#include <iostream.h>
#include "stdlib.h"
#include "string.h"
#include <iomanip.h>
#include <time.h>
#include "mpi.h"
struct tItem
{
int tag;
char itemset[15];
int count;
struct tItem *next;
};
#define LEN sizeof(struct tItem);
int GetColNum(char fileName1[]); //读取源数据的列数
int GetRecordNum(char fileName2[]); //读取源数据的行数
int Index(char *List, char c, int ListLength);
void GetSourceFile(char fileName[],int recordNum,int colNum); //获取源数据
void FindFrequence1itemsets(char **fullSet1,double sup);
void Gen_association_rules(tItem *kf, tItem *p, double minCon); //生成关联规则
void SetminSup(); //设置最小值支持度
void SetminCon(); //设置最小置信度
struct tItem *Find_candidate_1itemsets(char **fullSet2, double sup2,int recordNum,int colNum); //生成频繁1项集
struct tItem *apriori_gen(tItem *freq); //生成候选项集
struct tItem *Gen_candidate_itemset(tItem *cp,tItem *cp1,tItem *cp2); //生成后选项
struct tItem *Gen_frequence(tItem *frequence, int recordNum, int colNum, int minNum, char **fullSet); //生成频繁项集
struct tItem *Gen_frequence1(tItem *frequence, int recordNum, int colNum, int minNum, char **fullSet);
struct tItem *Compute_candidateItem_Num(tItem *c,int recordNum, int colNum, char **fullSet); //计算候选项集中每一项的最小支持数
struct tItem *Gen_kFrequence_subsets(tItem *f); //生成频繁k项集每项的子集的集合
struct tItem *SubSet(char *List, int m, char *Buffer, int flag, tItem *fis, int ListLength); //生成频繁集中每一项的子集
struct tItem *AddToSubset(char *Buffer, int flag, tItem *fis); //将一个子集加入链表
struct tItem *Combine_candidate_itemsets(tItem *lc, char rItemset, int rCount); //合并局部候选1项集成为总的候选1项集
struct tItem *Combine_candidate_k_itemsets(tItem *lc,tItem *r);
bool Has_infrequence_subset(tItem *pp, tItem *f); //判断候选k项集的子集是否在频繁k-1项集中
bool Is_before_equal(tItem *pp1, tItem *pp2); //判断两个项目集的前k-1项是否相同
static void masterprocess(int rankNum);
static void slaveprocess(int rankNum);
char **fullSet;
int colNum=0;
int recordNum=0;
double minSup=0.37;
double minCon=0.50;
int main(int argc,char *argv[])
{
int rank,size;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
if(rank==0)
{
//printf("#%d#\n",size);
masterprocess(size);
}
else
{
slaveprocess(size);
}
MPI_Finalize();
return(0);
}
void SetminSup()
{
cout<<"Please input the minSup: ";
cin>>minSup;
}
void SetminCon()
{
cout<<"Please input the minCon: ";
cin>>minCon;
}
int GetColNum(char fileName1[]) //读取源数据的列数
{
ifstream file(fileName1);
char ch;
//int colNum=0;
while(ch!='\n')
{
if(ch==',')
{
colNum++;
ch=file.get();
}
ch=file.get();
}
//colNum++;
file.close();
return colNum;
}
int GetRecordNum(char fileName2[]) //读取源数据的行数
{
ifstream file(fileName2);
char ch=NULL;
//int recordNum=0;
while(!file.eof())
{
if(ch=='\n')
{
recordNum++;
ch=file.get();
}
ch=file.get();
}
file.close();
return recordNum;
}
void GetSourceFile(char fileName[],int rNum,int cNum) //获取源数据读入数组fullSet[][]中
{
ifstream file(fileName);
char ch;
//char **fullSet;
int item=0;
int i=0,j=0;
int temp1='A';
int temp2='a';
/*b=(int**)malloc(row*sizeof(int*));
for(i=0;i<row;i++)
{
b[i]=(int*)malloc(col*sizeof(int));
}*/
fullSet=(char**)malloc(rNum*sizeof(char*));
for(int m=0;m<rNum;m++)
{
fullSet[m]=(char*)malloc(cNum*sizeof(char));
}
while(i<rNum)
{
while(j<colNum)
{
if(ch==',')
{
item++;
ch=file.get();
if(ch=='T')
{
fullSet[i][j]=(char)(item+temp1);
//printf("%c",fullSet[i][j]);
}
if(ch=='F')
{
fullSet[i][j]=(char)(item+temp2);
//printf("%c",fullSet[i][j]);
}
j++;
}
ch=file.get();
}
//printf("\n");
j=0;
ch=file.get();
item=0;
i++;
}
file.close();
}
static void masterprocess(int rankNum)
{
char inputFileName[]="c:\\shujuku.txt";
char *buffer;
int colNum,recordNum,blockNum,bound,i,j,m,k;
int rank,destRank,tag;
double starttime,endtime;
MPI_Status status;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
starttime=MPI_Wtime();
//SetminSup();
//SetminCon();
//printf("#%d#\n",rankNum);
colNum=GetColNum(inputFileName);
recordNum=GetRecordNum(inputFileName);
//system("pause");
printf("recordNum= %d", recordNum);
printf("\n");
printf("colNum= %d",colNum);
printf("\n");
GetSourceFile(inputFileName,recordNum,colNum);
//system("pause");
/*for(i=0;i<recordNum;i++)
{
for(j=0;j<colNum;j++)
{
printf(" %c ",fullSet[i][j]);
}
printf("\n");
}*/
blockNum=recordNum/(rankNum-1);
bound=blockNum*(rankNum-1);
//printf("%d\n",blockNum);
m=0;
k=1;
destRank=1;
tag=1;
buffer=(char*)malloc(colNum*sizeof(char*));
while(m!=recordNum)
{
if(m==(k*blockNum-1)&&(m!=bound-1))
{
MPI_Send(&colNum,1,MPI_INT,destRank,100,MPI_COMM_WORLD);
MPI_Send(&recordNum,1,MPI_INT,destRank,101,MPI_COMM_WORLD);
MPI_Send(&blockNum,1,MPI_INT,destRank,tag,MPI_COMM_WORLD);
//printf("%d\n",blockNum);
for(i=((k-1)*blockNum);i<k*blockNum;i++)
{
for(j=0;j<colNum;j++)
{
buffer[j]=fullSet[i][j];
printf("%c ",buffer[j]);
}
MPI_Send(buffer,colNum,MPI_CHAR,destRank,tag,MPI_COMM_WORLD);
printf("\n");
}
printf("\n");
destRank++;
tag++;
k++;
}
else if(m==bound-1)
{
blockNum=recordNum-m-1+blockNum;
MPI_Send(&colNum,1,MPI_INT,destRank,100,MPI_COMM_WORLD);
MPI_Send(&recordNum,1,MPI_INT,destRank,101,MPI_COMM_WORLD);
MPI_Send(&blockNum,1,MPI_INT,destRank,tag,MPI_COMM_WORLD);
//MPI_Send(&flag,1,MPI_INT,destRank,102,MPI_COMM_WORLD);
//printf("%d\n",blockNum);
for(i=bound-blockNum;i<recordNum;i++)
{
for(j=0;j<colNum;j++)
{
buffer[j]=fullSet[i][j];
printf("%c ",buffer[j]);
}
MPI_Send(buffer,colNum,MPI_CHAR,destRank,tag,MPI_COMM_WORLD);
printf("\n");
}
printf("\n");
}
m++;
}
/*int s,rItemNum;
tItem *r;
r=(struct tItem*)malloc(sizeof(struct tItem));
s=0;
MPI_Recv(&rItemNum,1,MPI_INT,1,1000,MPI_COMM_WORLD,&status);
while(s<rItemNum)
{
for(j=0;j<=5;j++)
{
MPI_Recv(&r->itemset[j],1,MPI_CHAR,1,1001,MPI_COMM_WORLD,&status);
}
MPI_Recv(&r->count,1,MPI_INT,1,1002,MPI_COMM_WORLD,&status);
MPI_Recv(&r->tag,1,MPI_INT,0,1003,MPI_COMM_WORLD,&status);
s++;
}*/
endtime=MPI_Wtime();
printf("The master process tooks %f seconds.\n",endtime-starttime);
printf("\n");
}
static void slaveprocess(int rankNum)
{
int rowNum,recordNum,colNum,tag,stag,rank,destRank,m,i,j;
char **fSet,*buffer;
double minSup=0.45;
double minCon=0.50;
double starttime,endtime;
MPI_Status status;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
starttime=MPI_Wtime();
MPI_Recv(&colNum,1,MPI_INT,0,100,MPI_COMM_WORLD,&status);
MPI_Recv(&recordNum,1,MPI_INT,0,101,MPI_COMM_WORLD,&status);
//printf("%d\n",recordNum);
//printf("%d\n",colNum);
tag=rank;
MPI_Recv(&rowNum,1,MPI_INT,0,tag,MPI_COMM_WORLD,&status);
printf("\n");
printf("\n");
printf("The block in rank %d has %d records.\n",rank,rowNum);
//MPI_Recv(&flag,1,MPI_INT,0,102,MPI_COMM_WORLD,&status);
buffer=(char*)malloc(colNum*sizeof(char*));
fSet=(char**)malloc(rowNum*sizeof(char*));
for(m=0;m<rowNum;m++)
{
fSet[m]=(char*)malloc(colNum*sizeof(char));
}
//接收进程0发送来的初始矩阵的一块
for(i=0;i<rowNum;i++)
{
MPI_Recv(buffer,colNum,MPI_CHAR,0,tag,MPI_COMM_WORLD,&status);
for(j=0;j<colNum;j++)
{
fSet[i][j]=buffer[j];
}
}
printf("\n");
printf("Rank %d receives part of the matrix is as follow:\n",rank);
printf("\n");
for(i=0;i<rowNum;i++)
{
for(j=0;j<colNum;j++)
{
printf("%c ",fSet[i][j]);
}
printf("\n");
}
printf("\n");
tItem *c; //定义局部候选项集
tItem *cr; //接收其他进程发送来的局部候选集
tItem *p;
int count=0,s,rCount,rTag;
char rItemset;
c=(struct tItem*)malloc(sizeof(struct tItem));
c=Find_candidate_1itemsets(fSet,minSup,rowNum,colNum);
destRank=1;
stag=1;
p=c;
while(p!=NULL)
{
count++;
p=p->next;
}
for(i=1;i<=rankNum-1;i++) //向其他进程发送局部候选1项集
{
p=c;
if(i!=rank)
{
MPI_Send(&count,1,MPI_INT,destRank,14,MPI_COMM_WORLD);
while(p!=NULL)
{
MPI_Send(&p->itemset[p->tag],1,MPI_CHAR,destRank,15,MPI_COMM_WORLD);
MPI_Send(&p->count,1,MPI_INT,destRank,16,MPI_COMM_WORLD);
MPI_Send(&p->tag,1,MPI_INT,destRank,17,MPI_COMM_WORLD);
p=p->next;
}
}
destRank++;
stag++;
}
int reRank=1,reTag=1,itemNum;
for(j=1;j<=rankNum-1;j++) //接收其他进程发送来的局部候选1项集
{
if(j!=rank)
{
s=0;
MPI_Recv(&itemNum,1,MPI_INT,reRank,14,MPI_COMM_WORLD,&status);
while(s<itemNum)
{
MPI_Recv(&rItemset,1,MPI_CHAR,reRank,15,MPI_COMM_WORLD,&status);
MPI_Recv(&rCount,1,MPI_INT,reRank,16,MPI_COMM_WORLD,&status);
MPI_Recv(&rTag,1,MPI_INT,reRank,17,MPI_COMM_WORLD,&status);
c=Combine_candidate_itemsets(c,rItemset,rCount);
//printf(" %c",rItemset);
//printf(" %d",rCount);
//printf(" %d",rTag);
//printf("\n");
s++;
}
printf("\n");
}
reRank++;
reTag++;
}
//输出合并后得到的全部候选1项集
p=c;
printf("Now every rank get the whole candidate itemsets as follow:\n\n");
while(p!=NULL)
{
printf(" %c",p->itemset[p->tag]);
printf(" %d",p->count);
printf(" %d",p->tag);
printf("\n");
p=p->next;
}
//由候选1项集生成频繁1项集
tItem *pp,*fHead,*fp,*temp;
int minNum;
fp=fHead=(struct tItem*)malloc(sizeof(struct tItem));
minNum=(int)((recordNum*minSup)+1);
//printf("%d",minNum);
pp=c;
while(pp!=NULL)
{
if(pp->count>minNum)
{
fHead->count=pp->count;
fHead->tag=pp->tag;
fHead->itemset[fHead->tag]=pp->itemset[pp->tag];
fHead->next=NULL;
break;
}
pp=pp->next;
}
fp=fHead;
pp=pp->next;
while(pp!=NULL)
{
if(pp->count>minNum)
{
temp=(struct tItem*)malloc(sizeof(struct tItem));
temp->count=pp->count;
temp->next=NULL;
temp->tag=pp->tag;
temp->itemset[temp->tag]=pp->itemset[pp->tag];
fp->next=temp;
fp=temp;
//printf("%c",pp->itemset[pp->tag]);
}
pp=pp->next;
}
/*printf("\n");
printf("--------The local frequence 1_itemsets--------\n");
fp=fHead;
while(fp!=NULL)
{
printf(" %c",fp->itemset[fp->tag]);
printf(" %d",fp->count);
printf(" %d",fp->tag);
printf("\n");
fp=fp->next;
}*/
tItem *f1,*kFrequence,*kf,*fis,*kfTemp;
//kFrequence=(struct tItem*)malloc(sizeof(struct tItem));
f1=(struct tItem*)malloc(sizeof(struct tItem));
//f1=Find_frequence_1itemsets(fullSet,minSup);
f1=fHead;
//system("pause");
//printf(" %d",rowNum);
//printf(" %d*&*&*&\n",colNum);
kFrequence=Gen_frequence(f1,rowNum,colNum,minNum,fSet);//经过连接和剪枝生成最终的频繁k项集
i=2;
while(i<=5)
{
kfTemp=Gen_frequence(kFrequence,rowNum,colNum,minNum,fSet);
if(kfTemp==NULL)
{
break;
}
kFrequence=kfTemp;
i++;
}
//MPI_Barrier(MPI_COMM_WORLD);
/*printf("^^^^^^^^^^^^^^^\n");
kf=kFrequence;
while(kf!=NULL)
{
for(j=0;j<=kf->tag;j++)
{
printf(" %c",kf->itemset[j]);
}
printf(" %d",kf->count);
printf(" %d",kf->tag);
printf("\n");
kf=kf->next;
}
printf("^^^^^^^^^^^^^^^\n");*/
printf("\n");
kf=kFrequence;
printf("--------The frequence %d_itemsets--------\n",kf->tag+1);
while(kf!=NULL)
{
for(j=0;j<=kf->tag;j++)
{
printf(" %c",kf->itemset[j]);
}
printf(" %d",kf->count);
printf(" %d",kf->tag);
printf("\n");
kf=kf->next;
}
printf("\n");
/*kf=kFrequence;
int kItemNum=0;
while(kf!=NULL)
{
kItemNum++;
kf=kf->next;
}
kf=kFrequence;
if(rank==1)
{
MPI_Send(&kItemNum,1,MPI_INT,0,1000,MPI_COMM_WORLD);
while(kf!=NULL)
{
for(j=0;j<=kf->tag;j++)
{
MPI_Send(&kf->itemset[j],1,MPI_CHAR,0,1001,MPI_COMM_WORLD);
}
MPI_Send(&kf->count,1,MPI_INT,0,1002,MPI_COMM_WORLD);
MPI_Send(&kf->tag,1,MPI_INT,0,1003,MPI_COMM_WORLD);
kf=kf->next;
}
}*/
kf=kFrequence;
printf("\n\nThe subset of the frequence is:\n\n");
while(kf!=NULL)
{
fis=Gen_kFrequence_subsets(kf); //生成频繁k项集每项的子集的集合
fis=Compute_candidateItem_Num(fis,rowNum,colNum,fSet); //计算每一个子集的支持度
/*p=fis;
p=p->next;
while(p!=NULL)
{
for(i=0;i<=p->tag;i++)
{
printf(" %c",p->itemset[i]);
}
printf(" %d",p->count);
printf(" %d",p->tag);
printf("\n");
p=p->next;
}*/
printf("\n\nNow we get the association rules:\n\n");
p=fis;
p=p->next;
while(p!=NULL)
{
//printf("\n");
Gen_association_rules(kf,p,minCon);
printf("\n");
p=p->next;
}
kf=kf->next;
}
endtime=MPI_Wtime();
printf("The slave process %d tooks %f seconds.\n",rank,endtime-starttime);
printf("\n");
}
struct tItem *Gen_frequence(tItem *frequence, int recordNum, int colNum, int minNum, char **fullSet) //生成频繁项集
{
tItem *candidate,*cp,*fp,*fpb,*p,*p1;
int i,j,k,m,count=0,rank,rankNum,destRank,stag;
//int i,m,n;
//char *buffer;
MPI_Status status;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&rankNum);
/*printf("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n");
printf("##### %d\n",recordNum);
printf("##### %d\n",colNum);
printf("##### %d\n",minNum);
for(i=0;i<recordNum;i++)
{
for(j=0;j<colNum;j++)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -