📄 aspbroadcastpart.java
字号:
package com.ibm.user.examples;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.io.Serializable;
import com.ibm.dthreads.DThread;
import com.ibm.dthreads.Logger;
import com.ibm.dthreads.MP.ObjectMP;
import com.ibm.DataPartition.*;
//Application class has to extend DThread or implement DRunnable
public class ASPBroadcastPart extends DThread{
int[][] data;
int low;
int high;
int size;
int extra;
int id;
int total;
int noofvertices;
//Constructor that initializes the array
ASPBroadcastPart(int noofvertices)
{
this.noofvertices = noofvertices;
data = new int[noofvertices][noofvertices];
for(int i=0;i<noofvertices;i++)
{
for(int j=0;j<noofvertices;j++)
{
data[i][j]=2;
if(i==j) data[i][j]=0;
}
}
}
//Extends the run() method of the DThread class
public void run()
{
try
{
long start,end;
int totalthreads;
int temp;
start=System.currentTimeMillis();
int rank=getContext().getIdentity();
totalthreads = getContext().getNumberOfThreads();
ObjectMP messagepasser = getContext().getMP();
id=rank;
int size=noofvertices/totalthreads;
int extra=(noofvertices%totalthreads);
int[] broadcastRow=new int[data.length];
int bSourceid;
low=id*size;
high=low+size;
int totalRows=data.length;
if(id==totalthreads-1)
high=noofvertices;
System.out.println("Full data length is " + data.length);
System.out.println("My DThread ID is "+id+"\n My Data Size is "+size+"\n My Lower Limit is "+low+"& Higher Limit is "+high);
//Does the partitioning of the 2D data array
Partition2D part=new PartitionInt2D(data,data.length,data.length,id,totalthreads,Config.ROW_WISE_DECOMPOSITION);
//Getting the partial 1D data array of this DThread
SpmdIntData2D partData=(SpmdIntData2D)part.getPartition(id);
int[][] tempArray=new int[high-low][data.length];
int index=0;
for(int i=0;i<high-low;i++)
{
for(int j=0;j<data.length;j++)
{
tempArray[i][j]=partData.data[index++];
}
}
//DThread with rank 0 is considered as the Root node for this application
if(rank==0)
{
System.out.println("Rank 0 Started");
for(int k=0;k<data.length;k++)
{
//If kth row is present in this DThread, then broadcast the row to all the other DThreads running
if(k>=low && k<high)
{
BroadcastData2 bData=new BroadcastData2(tempArray[k-low]);
messagepasser.broadcast(bData,Integer.toString(k));
broadcastRow=data[k];
}
//If kth row is not present in this DThread, then get the row from the DThread which has the kth row
else
{
messagepasser.barrier();
bSourceid=0;
temp=k;
while(temp>=size)
{
bSourceid++;
temp-=size;
}
if(bSourceid>=totalthreads) bSourceid=totalthreads-1;
BroadcastData2 bRow=(BroadcastData2)messagepasser.get(bSourceid,Integer.toString(k));
broadcastRow=bRow.data;
}
//Computation part
for(int i=low;i<high;i++)
{
for(int j=0;j<data.length;j++)
{
data[i][j]=Math.min(data[i][j],(data[i][k]+broadcastRow[j]));
}
}
}
//Create the Output file in the current path
File output=new File("outputDthread.dat");
PrintWriter out=new PrintWriter(new FileWriter(output));
//Get the partial result from all the Other Dthreads and compute the final result matrix
if(totalthreads!=1)
{
messagepasser.barrier();
for (int i=1;i<totalthreads;i++)
{
PartialResult3 iobj = (PartialResult3) messagepasser.get(i,"0");
int rowIndex=0;
for(int j=iobj.low;j<iobj.high;j++)
data[j]=iobj.data[rowIndex++];
}
}
//Write the result matrix into the output file
for(int i=0;i<data.length;i++)
{
for(int j=0;j<data.length;j++)
{
out.println(new Integer(data[i][j]).toString());
}
}
out.close();
System.out.println("Please see outputDthread.dat for the result");
}
//If the DThread is not root
else
{
System.out.println("Full data length is " + data.length);
for(int k=0;k<data.length;k++)
{
//If kth row is present in this DThread, then broadcast the row to all the other DThreads running
if(k>=low && k<high)
{
BroadcastData2 bData=new BroadcastData2(tempArray[k-low]);
messagepasser.broadcast(bData,Integer.toString(k));
broadcastRow=data[k];
}
//If kth row is not present in this DThread, then get the row from the DThread which has the kth row
else
{
messagepasser.barrier();
bSourceid=0;
temp=k;
while(temp>=size)
{
bSourceid++;
temp-=size;
}
if(bSourceid>=totalthreads) bSourceid=totalthreads-1;
BroadcastData2 bRow=(BroadcastData2)messagepasser.get(bSourceid,Integer.toString(k));
broadcastRow=bRow.data;
}
//Computation part
for(int i=low;i<high;i++)
{
for(int j=0;j<data.length;j++)
{
data[i][j]=Math.min(data[i][j],(data[i][k]+broadcastRow[j]));
}
}
}
//Build the Partial result and send to the root node ( in my case it is DThread with rank 0)
PartialResult3 presult=new PartialResult3(data,low,high);
messagepasser.put(presult,rank,0,"0");
messagepasser.barrier();
}
System.out.println(" I with id "+id+"Completed");
end=System.currentTimeMillis();
System.out.println("Time taken is "+(end-start)+"milliSeconds");
}
catch(Exception e)
{
System.out.println("Exception ::" + e.getMessage());
e.printStackTrace();
}
}
}
//Class which will hold the partial result
class PartialResult3 implements Serializable
{
int[][] data;
int row,col;
int low;
int high;
PartialResult3(int[][] data,int low,int high)
{
row=high-low;
col=data.length;
this.data=new int[row][col];
for(int i=low,m=0;i<high;i++,m++)
{
for(int j=0,n=0;j<col;j++,n++)
{
this.data[m][n]=data[i][j];
}
}
this.low=low;
this.high=high;
}
}
//Class to have the Row to be broadcasted
class BroadcastData2 implements Serializable
{
int[] data;
BroadcastData2(int[] data)
{
this.data = data;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -