⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatcher.java

📁 服务器模块主要负责分配计算任务
💻 JAVA
字号:
package primecruncher;import net.jxta.peergroup.*;import net.jxta.discovery.*;import net.jxta.pipe.*;import net.jxta.exception.*;import net.jxta.protocol.*;import net.jxta.document.*;import net.jxta.id.*;import net.jxta.endpoint.*;import net.jxta.codat.*;import java.util.*;import java.io.*;import java.net.*;/** * 这个类作为服务的边缘代理 */public class Dispatcher implements DiscoveryListener{    private static PeerGroup group;    private static DiscoveryService discoSvc;    private static PipeService pipeSvc;    private static Random random;    //保持对等机的缓冲列表    private HashSet peerCache=new HashSet();    private OutputPipe outputPipe;    private HashMap jobMap=new HashMap();/*****************************************************************/    public Dispatcher(){        startJxta();        random=new Random();        doLocalDiscovery();        //discoSvc.getRemoteAdvertisements(DiscoveryService.ADV )        //discoSvc.getRemoteAdvertisements(null,DiscoveryService.ADV,"Name",ServiceConstants.SPEC_NAME,1,this);        //discoSvc.getRemoteAdvertisements()         discoSvc.getRemoteAdvertisements(DiscoveryService.ADV,"Name",ServiceConstants.SPEC_NAME,1,this);    }/****************************************************************/     //启动JXTA平台,创建对等组,并创建公告信息服务和管道服务    private void startJxta(){        try {            group=PeerGroupFactory.newNetPeerGroup();            discoSvc=group.getDiscoveryService();            pipeSvc=group.getPipeService();        } catch(PeerGroupException e){            System.out.println("Can't create net peer group:"+e.getMessage());            System.exit(-1);        }    }/*****************************************************************/    public Dispatcher(PeerGroup peerGroup){        this.group=peerGroup;        pipeSvc=group.getPipeService();        discoSvc=group.getDiscoveryService();        random=new Random();        doLocalDiscovery();        discoSvc.getRemoteAdvertisements(null,DiscoveryService.ADV,"Name",ServiceConstants.SPEC_NAME,1,this);    }/*****************************************************************/   //进行本地检索    private void doLocalDiscovery(){        Enumeration res=null;        try{            res=discoSvc.getLocalAdvertisements(DiscoveryService.ADV,"Name",ServiceConstants.SPEC_NAME);        }catch (IOException e){            System.out.println("IO Exception.");        }        if(res != null){            while (res.hasMoreElements()){                //这将是一个ModuleSpecAdvertisement对象                ModuleSpecAdvertisement mspa=(ModuleSpecAdvertisement)res.nextElement(); //依次取对等节点公告信息放入mspa中                if(mspa!=null){                    PeerInfoBundle bundle=new PeerInfoBundle(mspa);    //调用了PeerInfoBundle类###############                    if(!peerCache.contains(bundle)){                        peerCache.add(bundle);                        System.out.println("Discovered service locally, added to cache");                    }else{                        System.out.println("Discovered peer locally, but it's already cached.");                    }                }            }        }    }/*****************************************************************/    public void processPrimes(int low,int high,ResultListener listener){//建立所有已知对等机的列表        if(peerCache.size()==0){            doLocalDiscovery();        }        PeerInfoBundle[] peers=(PeerInfoBundle[])peerCache.toArray(new PeerInfoBundle[peerCache.size()]);//将缓存中的每个对等点节点转化为PeerInfoBundle类型的数组        if(peers.length==0){            System.out.println("No peers, submit job again!");            return;        }        String jobID=null;                            // 当jobID空或者jobMap中已经存在这个jobID都要重新创建一个 jobID  1,2,3   2        while(jobID==null||(jobMap.containsKey(jobID))){            jobID=new Long(random.nextLong()).toString();  //随机函数创建一个字符串型的随机数赋给jobID        }        System.out.println("Allocated job id:"+jobID);        PendingJob pendingJob=new PendingJob(jobID,this,listener);        jobMap.put(jobID,pendingJob);        System.out.println("Peers length is"+peers.length);        int segment=high-low+1;  //共有多少个数        int mod=segment%peers.length;  //余数        int perPiece=(segment-mod)/peers.length;  //每个对等节点处理的数据个数        int l=low;        int h=0;        int count=1;        for(int i=0;i<peers.length;i++){            h=l+perPiece-1;            if(i==peers.length-1){  //最后一个对等节点                h+=mod;//h=h+mod            }            DispatcherJob job=new DispatcherJob(jobID,count,l,h,peers[i],pendingJob,group);  //count表示第几个任务           // job.create();          Thread thread=new Thread(job);           thread.setName("Dispatcher job thread"+i);            pendingJob.addJob(job);           thread.start();            l += perPiece;            count++;        }    }/*****************************************************************/    void jobComplete(PendingJob job, Map[] subResults){        String finalResultString=mergeResults(subResults);    //subResults转换成String类型数据        ResultListener listener=job.getResultListener();        Map finalResult=new HashMap();        finalResult.put(ServiceConstants.RESULTSTRING,finalResultString);        finalResult.put(ServiceConstants.JOBSTATS,makeJobStats(job.getID(),subResults));     //调用makeJobStats()        listener.resultEvent(finalResult);                     //调用客户端PrimeClient类的resultEvent()        jobMap.remove(job.getID());        System.out.println("Removed pending job from job map .");    }/*****************************************************************/    private ComputeStats makeJobStats(String jobID,Map[] subresults){        ComputeStats stats=new ComputeStats(jobID);                      //实例化ComputeStats类        for(int i=0;i<subresults.length;i++){            long computeTime=Long.parseLong((String)subresults[i].get(ServiceConstants.ENDTIME))-                    Long.parseLong((String)subresults[i].get(ServiceConstants.STARTTIME));            stats.addNodeStat(new Integer(i).toString(),Integer.parseInt((String)subresults[i].get(ServiceConstants.LOW_INT)),   //调用ComputeStats 类的addNodeStat()                Integer.parseInt((String)subresults[i].get(ServiceConstants.HIGH_INT)),computeTime,0,0);        }return stats;    }/*****************************************************************/    private String mergeResults(Map[] subRes){        //将结果数据转换,并添加一个“,”形成整个结果字符串        StringBuffer b=new StringBuffer();        for(int i=0;i<subRes.length;i++){            String resString=(String)subRes[i].get(ServiceConstants.RESULTSTRING);            b.append(resString+",");        }        b.setLength(b.length()-1);        return b.toString();    }/*****************************************************************/    public void discoveryEvent(DiscoveryEvent event){        System.out.println("DiscoveryEvent called");        DiscoveryResponseMsg mes=event.getResponse();        ModuleSpecAdvertisement moduleSpecAd=null;        Enumeration en=mes.getResponses();        try {              while(en.hasMoreElements()){                String st=(String)en.nextElement();                try {                  moduleSpecAd=(ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement(                            new MimeMediaType("text/xml"),new ByteArrayInputStream(st.getBytes()));                }catch(ClassCastException classEx){              }            }        }catch(IOException e){            e.printStackTrace();        }        if(moduleSpecAd!= null){            PeerInfoBundle bundle=new PeerInfoBundle(moduleSpecAd);            if(!peerCache.contains(bundle)){                peerCache.add(bundle);                System.out.println("Discovered peer, added to cache");            }else{                System.out.println("Discovered peer, but it's already cached.");            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -