⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sendtest_conv.java

📁 MPI for java for Distributed Programming
💻 JAVA
字号:
package mpi.signals;import java.util.*;import mpi.*;import java.io.*;/** * Test program to emulate Pool of tasks.... * need to get back to this soon. */public class SendTest_Conv{        /**     * Serializes the given object and returns it as a byte aray     */    public static byte[] objToByte(Object obje){	System.out.println("objToByte called");	try{	    ByteArrayOutputStream baos = new ByteArrayOutputStream();	    ObjectOutputStream oos = new ObjectOutputStream(baos);	    System.out.println("before writeObject in objToByte");	    oos.writeObject(obje);	    System.out.println("before flush in objToByte");	    oos.flush(); // just to be sure;	    byte[] objectBytes = baos.toByteArray();	    System.out.println("objToByte Successful");	    baos.close();	    oos.close();	    return objectBytes;	}catch(Exception e){	    e.printStackTrace();	}	System.err.println("error in SendTest_conv.objToByte()");	return null;    }    /**     * Deserializes a byte array into an object     */    public static Object byteToObject(byte[] bin){		try{	    ByteArrayInputStream bais = new ByteArrayInputStream(bin);	    ObjectInputStream ois = new ObjectInputStream(bais);	    Object obj_in = ois.readObject();	    ois.close();	    bais.close();	    System.out.println("byteToObj Successful");	    return obj_in;	}catch(Exception e){	    e.printStackTrace();	}		System.err.println("error in SendTest_conv.byteToObj()");	return null;    }    /**     *  Custom implementation of blocking send for objects.     */    public static void Send(Object obj,int dest,int flag){	byte[] b = objToByte(obj);	int[] len = {b.length};	try{	    MPI.COMM_WORLD.Send(len,0,1,MPI.INT,dest,flag);	    MPI.COMM_WORLD.Send(b,0,len[0],MPI.BYTE,dest,flag);		}catch(Exception mpie){	    mpie.printStackTrace();	}    }    /**     *  Custom implementation of blocking Recv for objects     */    public static Object Recv(int src,int flag){	try{	    int[] len = { 0 }; 	    MPI.COMM_WORLD.Recv(len,0,1,MPI.INT,src,flag);	    System.out.println("expecting b_arr of len : "+ len[0]);	    byte[] br = new byte[len[0]];	    MPI.COMM_WORLD.Recv(br,0,len[0],MPI.BYTE,src,flag);	    return SendTest_Conv.byteToObject(br);	}catch(Exception mpie){	    mpie.printStackTrace();	}	System.err.println("Error in Recv");	return null;    }    /**     * Custom implementation of blocking Send for Objects     */    public static void Send(Object[] obj_arr,int offset,int no_items,int dest,int flag){	for(int i=offset;i<offset+no_items;i++)	    Send(obj_arr[i],dest,flag);    }    /**     * Custom implementation of blocking Recv for Objects     */    public static void Recv(Object[] obj_arr,int offset,int no_items,int src,int flag){	for(int i=offset;i<offset+no_items;i++)	    obj_arr[i]= Recv(src,flag);    }    public static void main(String[] args) throws MPIException{		MPI.Init(args);		int my_pe = MPI.COMM_WORLD.Rank(); 		int npes = MPI.COMM_WORLD.Size();								if(my_pe == 0){	    String proc_name = MPI.Get_processor_name();	    //Job[] j = new Job[2];	    Task[] t = new TestTask[2];	    //MyLinkList[] t = new MyLinkList[2];	    for(int i=0;i<2;i++){		t[i] = new TestTask(proc_name);		//t[i] = new MyLinkList(1,proc_name);		System.out.println("Created Task["+i+"] : \n "+t[i]);	    }	    for(int i=1;i<npes;i++){		System.out.println("Sending Task["+(i-1)+"]: \n "+ t[i-1]);				//MPI.COMM_WORLD.Send(t,i-1,1,MPI.OBJECT,i,0);		SendTest_Conv.Send(t[i-1],i,0);		/*		  byte[] b = SendTest_Conv.objToByte(t[i-1]);		  int[] len = {b.length};		  MPI.COMM_WORLD.Send(len,0,1,MPI.INT,i,0);		  MPI.COMM_WORLD.Send(b,0,len[0],MPI.BYTE,i,0);		*/	    }	    //recv loop	    	    Task[] t2 = new TestTask[2];	    //MyLinkList[] t2 = new MyLinkList[2];	    for(int i=1;i<npes;i++){		System.out.println("waiting to hear from proc "+ (i-1));				//MPI.COMM_WORLD.Recv(t2,i-1,1,MPI.OBJECT,i,0);				t2[i-1]= (TestTask)SendTest_Conv.Recv(i,0);		/*		  int[] len = { 0 }; 		  MPI.COMM_WORLD.Recv(len,0,1,MPI.INT,i,0);		  System.out.println("expecting b_arry of len : "+ len[0] +" on "+my_pe);		  byte[] b = new byte[len[0]];		  MPI.COMM_WORLD.Recv(b,0,len[0],MPI.BYTE,i,0);		  t2[i-1] = (TestTask)SendTest_Conv.byteToObject(b);		*/	    }	    	    System.out.println("Recieved Items: ");	    for(int i=1;i<npes;i++)		System.out.println(t2[i-1]);	    	    System.out.println("Exiting....");	    	}else{	    // recieve task object , 	    // call the run method , 	    // return the result object	    	    Object[] inbuf= new Object[1];	    //Job[] jin = new Job[1];	    	    System.out.println("before Recv on proc no "+my_pe);	    	    //Task t = null;	    //recieving the task	    try{		//Status stat = MPI.COMM_WORLD.Recv(inbuf,0,1,MPI.OBJECT,0,0);		//if(stat.Test_cancelled())		//    System.out.println("Communication was cancelled");		inbuf[0]= SendTest_Conv.Recv(0,0);		/*		  int[] len = { 0 }; 		  MPI.COMM_WORLD.Recv(len,0,1,MPI.INT,0,0);		  System.out.println("expecting b_arry of len : "+ len[0] +" on "+my_pe);		  byte[] b = new byte[len[0]];		  MPI.COMM_WORLD.Recv(b,0,len[0],MPI.BYTE,0,0);		  inbuf[0] = SendTest_Conv.byteToObject(b);		*/		//System.out.println(my_pe+" : " +stat.Get_count(MPI.OBJECT)); 				System.out.println("Recieved : " + (TestTask)inbuf[0] +" on " +my_pe);			    }catch(Exception e){		System.err.println(e);	    }	    	    String s = " "+ my_pe;	    	    ((TestTask)inbuf[0]).setAck(new String(s));	    //System.out.println( (MyLinkList)inbuf[0]);	    //MPI.COMM_WORLD.Send(inbuf,0,1,MPI.OBJECT,0,0);	    SendTest_Conv.Send(inbuf[0],0,0);	    /*	      byte[] b = SendTest_Conv.objToByte(inbuf[0]);	      int[] len = {b.length};	      MPI.COMM_WORLD.Send(len,0,1,MPI.INT,0,0);	      MPI.COMM_WORLD.Send(b,0,len[0],MPI.BYTE,0,0);	    */	}	MPI.Finalize();    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -