📄 dispatcher.java
字号:
package primecruncher;import net.jxta.peergroup.*;import net.jxta.discovery.*;import net.jxta.pipe.*;import net.jxta.exception.*;import net.jxta.protocol.*;import net.jxta.document.*;import net.jxta.id.*;import net.jxta.endpoint.*;import net.jxta.codat.*;import java.util.*;import java.io.*;import java.net.*;/** * 这个类作为服务的边缘代理 */public class Dispatcher implements DiscoveryListener{ private static PeerGroup group; private static DiscoveryService discoSvc; private static PipeService pipeSvc; private static Random random; //保持对等机的缓冲列表 private HashSet peerCache=new HashSet(); private OutputPipe outputPipe; private HashMap jobMap=new HashMap();/*****************************************************************/ public Dispatcher(){ startJxta(); random=new Random(); doLocalDiscovery(); //discoSvc.getRemoteAdvertisements(DiscoveryService.ADV ) //discoSvc.getRemoteAdvertisements(null,DiscoveryService.ADV,"Name",ServiceConstants.SPEC_NAME,1,this); //discoSvc.getRemoteAdvertisements() discoSvc.getRemoteAdvertisements(DiscoveryService.ADV,"Name",ServiceConstants.SPEC_NAME,1,this); }/****************************************************************/ //启动JXTA平台,创建对等组,并创建公告信息服务和管道服务 private void startJxta(){ try { group=PeerGroupFactory.newNetPeerGroup(); discoSvc=group.getDiscoveryService(); pipeSvc=group.getPipeService(); } catch(PeerGroupException e){ System.out.println("Can't create net peer group:"+e.getMessage()); System.exit(-1); } }/*****************************************************************/ public Dispatcher(PeerGroup peerGroup){ this.group=peerGroup; pipeSvc=group.getPipeService(); discoSvc=group.getDiscoveryService(); random=new Random(); doLocalDiscovery(); discoSvc.getRemoteAdvertisements(null,DiscoveryService.ADV,"Name",ServiceConstants.SPEC_NAME,1,this); }/*****************************************************************/ //进行本地检索 private void doLocalDiscovery(){ Enumeration res=null; try{ res=discoSvc.getLocalAdvertisements(DiscoveryService.ADV,"Name",ServiceConstants.SPEC_NAME); }catch (IOException e){ System.out.println("IO Exception."); } if(res != null){ while (res.hasMoreElements()){ //这将是一个ModuleSpecAdvertisement对象 ModuleSpecAdvertisement mspa=(ModuleSpecAdvertisement)res.nextElement(); //依次取对等节点公告信息放入mspa中 if(mspa!=null){ PeerInfoBundle bundle=new PeerInfoBundle(mspa); //调用了PeerInfoBundle类############### if(!peerCache.contains(bundle)){ peerCache.add(bundle); System.out.println("Discovered service locally, added to cache"); }else{ System.out.println("Discovered peer locally, but it's already cached."); } } } } }/*****************************************************************/ public void processPrimes(int low,int high,ResultListener listener){//建立所有已知对等机的列表 if(peerCache.size()==0){ doLocalDiscovery(); } PeerInfoBundle[] peers=(PeerInfoBundle[])peerCache.toArray(new PeerInfoBundle[peerCache.size()]);//将缓存中的每个对等点节点转化为PeerInfoBundle类型的数组 if(peers.length==0){ System.out.println("No peers, submit job again!"); return; } String jobID=null; // 当jobID空或者jobMap中已经存在这个jobID都要重新创建一个 jobID 1,2,3 2 while(jobID==null||(jobMap.containsKey(jobID))){ jobID=new Long(random.nextLong()).toString(); //随机函数创建一个字符串型的随机数赋给jobID } System.out.println("Allocated job id:"+jobID); PendingJob pendingJob=new PendingJob(jobID,this,listener); jobMap.put(jobID,pendingJob); System.out.println("Peers length is"+peers.length); int segment=high-low+1; //共有多少个数 int mod=segment%peers.length; //余数 int perPiece=(segment-mod)/peers.length; //每个对等节点处理的数据个数 int l=low; int h=0; int count=1; for(int i=0;i<peers.length;i++){ h=l+perPiece-1; if(i==peers.length-1){ //最后一个对等节点 h+=mod;//h=h+mod } DispatcherJob job=new DispatcherJob(jobID,count,l,h,peers[i],pendingJob,group); //count表示第几个任务 // job.create(); Thread thread=new Thread(job); thread.setName("Dispatcher job thread"+i); pendingJob.addJob(job); thread.start(); l += perPiece; count++; } }/*****************************************************************/ void jobComplete(PendingJob job, Map[] subResults){ String finalResultString=mergeResults(subResults); //subResults转换成String类型数据 ResultListener listener=job.getResultListener(); Map finalResult=new HashMap(); finalResult.put(ServiceConstants.RESULTSTRING,finalResultString); finalResult.put(ServiceConstants.JOBSTATS,makeJobStats(job.getID(),subResults)); //调用makeJobStats() listener.resultEvent(finalResult); //调用客户端PrimeClient类的resultEvent() jobMap.remove(job.getID()); System.out.println("Removed pending job from job map ."); }/*****************************************************************/ private ComputeStats makeJobStats(String jobID,Map[] subresults){ ComputeStats stats=new ComputeStats(jobID); //实例化ComputeStats类 for(int i=0;i<subresults.length;i++){ long computeTime=Long.parseLong((String)subresults[i].get(ServiceConstants.ENDTIME))- Long.parseLong((String)subresults[i].get(ServiceConstants.STARTTIME)); stats.addNodeStat(new Integer(i).toString(),Integer.parseInt((String)subresults[i].get(ServiceConstants.LOW_INT)), //调用ComputeStats 类的addNodeStat() Integer.parseInt((String)subresults[i].get(ServiceConstants.HIGH_INT)),computeTime,0,0); }return stats; }/*****************************************************************/ private String mergeResults(Map[] subRes){ //将结果数据转换,并添加一个“,”形成整个结果字符串 StringBuffer b=new StringBuffer(); for(int i=0;i<subRes.length;i++){ String resString=(String)subRes[i].get(ServiceConstants.RESULTSTRING); b.append(resString+","); } b.setLength(b.length()-1); return b.toString(); }/*****************************************************************/ public void discoveryEvent(DiscoveryEvent event){ System.out.println("DiscoveryEvent called"); DiscoveryResponseMsg mes=event.getResponse(); ModuleSpecAdvertisement moduleSpecAd=null; Enumeration en=mes.getResponses(); try { while(en.hasMoreElements()){ String st=(String)en.nextElement(); try { moduleSpecAd=(ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement( new MimeMediaType("text/xml"),new ByteArrayInputStream(st.getBytes())); }catch(ClassCastException classEx){ } } }catch(IOException e){ e.printStackTrace(); } if(moduleSpecAd!= null){ PeerInfoBundle bundle=new PeerInfoBundle(moduleSpecAd); if(!peerCache.contains(bundle)){ peerCache.add(bundle); System.out.println("Discovered peer, added to cache"); }else{ System.out.println("Discovered peer, but it's already cached."); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -