⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatcher.java

📁 由Robert Flenner,Michael Abbott,Toufic Boubez,Frand Cohen,Navaneeth Krishnan,Alan Moffet,Rajam Famamu
💻 JAVA
字号:
package primecruncher;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.discovery.DiscoveryService;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryEvent;import net.jxta.pipe.PipeService;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.PipeID;import net.jxta.exception.PeerGroupException;import net.jxta.protocol.DiscoveryResponseMsg;import net.jxta.protocol.ModuleSpecAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.PeerAdvertisement;import net.jxta.document.*;import net.jxta.id.IDFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.codat.Codat;import java.util.*;import java.io.*;import java.net.URL;import java.net.MalformedURLException;import java.net.UnknownServiceException;/** * This class acts as a client-side proxy for the service. */public class Dispatcher implements DiscoveryListener {    private static PeerGroup group;    private static DiscoveryService discoSvc;    private static PipeService pipeSvc;    private static Random random;    //Keep cached list of peers that offer the service    private HashSet peerCache = new HashSet();    private OutputPipe outputPipe;    private HashMap jobMap = new HashMap();    public Dispatcher() {        startJxta();        random = new Random();        doLocalDiscovery();        discoSvc.getRemoteAdvertisements(null, DiscoveryService.ADV,                "Name", ServiceConstants.SPEC_NAME, 1, this);    }    public Dispatcher(PeerGroup peerGroup) {        this.group = peerGroup;        pipeSvc = group.getPipeService();        discoSvc = group.getDiscoveryService();        random = new Random();        doLocalDiscovery();         discoSvc.getRemoteAdvertisements(null, DiscoveryService.ADV,                "Name", ServiceConstants.SPEC_NAME, 1, this);    }    private void doLocalDiscovery() {        Enumeration res = null;        try {            res = discoSvc.getLocalAdvertisements(DiscoveryService.ADV,                "Name", ServiceConstants.SPEC_NAME);        } catch (IOException e) {            System.out.println("IO Exception.");        }        if (res != null) {            while (res.hasMoreElements()) {                //this will be a ModuleSpecAdvertisement                ModuleSpecAdvertisement mspa = (ModuleSpecAdvertisement)res.nextElement();                if (mspa != null) {                    PeerInfoBundle bundle = new PeerInfoBundle(mspa);                    if (!peerCache.contains(bundle)) {                        peerCache.add(bundle);                        System.out.println("Discovered service locally, added to cache");                    } else {                        System.out.println("Discovered peer locally, but it's already cached.");                    }                }            }        }    }    //create request ID. break down list into smaller lists, according to metrics on each    //peer in the cache. Create a new dispatcher job for each peer in a new    //thread, associate that list with the request ID.    //resigter a result listener with each    //each dispatcher job will make a call-back to this class, when (a)    //it got the results, or (b) if the timeout for that peer has expired    //Once all results are received, we call ResultListener and produce the    //result document.    //    public void processPrimes(int low, int high, ResultListener listener) {        //create array of all known peers        if (peerCache.size() == 0) {            doLocalDiscovery();        }        PeerInfoBundle[] peers = (PeerInfoBundle[])                   peerCache.toArray(new PeerInfoBundle[peerCache.size()]);        if (peers.length == 0) {            System.out.println("No peers, submit job again!");            return;        }        //since pending jobs are hashed against a job id, check if no existing job        //has the same id        String jobID = null;        while (jobID == null || (jobMap.containsKey(jobID))) {            jobID = new Long(random.nextLong()).toString();        }        System.out.println("Allocated job id: " + jobID);        PendingJob pendingJob = new PendingJob(jobID, this, listener);        jobMap.put(jobID, pendingJob);        //TODO: prune the cache here        pruneCache();        //1. Create an empty PendingJob -- it collects all the DispatcherJobs        // when a job is ready, it calls its pendingJob. whenver a such a call        //arrives, PendingJob checks if all jobs are complete. If so, it calls        //this Dispatcher's jobComplete(). JobComplete() then normalizes the results,        //and calls result listener with the answer. A PendingJob is hashed againsta  JobID.        System.out.println("Peers length is " + peers.length);        //we include the highest number        int segment = high - low + 1;        int mod = segment % peers.length;        int perPiece = (segment - mod)/peers.length;        int l = low;        int h = 0;        int count = 1;        for (int i=0; i < peers.length; i++) {            h = l + perPiece -1;            if (i == peers.length - 1) {                h  += mod;            }            DispatcherJob job = new DispatcherJob(jobID, count, l, h, peers[i], pendingJob, group);            Thread thread = new Thread(job);            thread.setName("Dispatcher job thread " + i);            pendingJob.addJob(job);            thread.start();            l += perPiece;            count++;        }    }    /**     * All computations for a pending job have completed. We're ready.     */    void jobComplete(PendingJob job, Map[] subResults) {        //normalize results, we're done,        String finalResultString = mergeResults(subResults);        //call result listener with result.        ResultListener listener = job.getResultListener();        Map finalResult = new HashMap();        finalResult.put(ServiceConstants.RESULTSTRING, finalResultString);        finalResult.put(ServiceConstants.JOBSTATS, makeJobStats(job.getID(), subResults));        listener.resultEvent(finalResult);        jobMap.remove(job.getID());        System.out.println("Removed pending job from job map .");    }    private ComputeStats makeJobStats(String jobID, Map[] subresults) {        ComputeStats stats = new ComputeStats(jobID);        for (int i=0; i < subresults.length; i++) {            long computeTime =                    Long.parseLong((String)subresults[i].get(ServiceConstants.ENDTIME)) -                    Long.parseLong((String)subresults[i].get(ServiceConstants.STARTTIME));            stats.addNodeStat(new Integer(i).toString(),                Integer.parseInt((String)subresults[i].get(ServiceConstants.LOW_INT)),                Integer.parseInt((String)subresults[i].get(ServiceConstants.HIGH_INT)),                computeTime, 0, 0);        }        return stats;    }    private String mergeResults(Map[] subRes) {        //create a string for the result        StringBuffer b = new StringBuffer();        for (int i=0; i < subRes.length; i++) {            String resString = (String)subRes[i].get(ServiceConstants.RESULTSTRING);            b.append(resString + ",");        }        //remove last ","        b.setLength(b.length()-1);        return b.toString();    }    private void pruneCache() {     //do some cache management here - eliminate stale adverts         HashSet adCopy = null;         synchronized(peerCache) {             adCopy = (HashSet)peerCache.clone();        //}         //do some pruning here         long currentTime = System.currentTimeMillis();         Iterator it = adCopy.iterator();         while (it.hasNext()) {             PeerInfoBundle b = (PeerInfoBundle)it.next();             ModuleSpecAdvertisement ad = b.getModuleSpecAdvertisement();             if (ad.getLocalExpirationTime() < currentTime + (60 * 1000)) {                 System.out.println("Local expiration: " + ad.getLocalExpirationTime());                 peerCache.remove(b);             }         }         }    }    private void startJxta() {        try {            group = PeerGroupFactory.newNetPeerGroup();            discoSvc = group.getDiscoveryService();            pipeSvc = group.getPipeService();        } catch (PeerGroupException e) {            System.out.println("Can't create net peer group: " + e.getMessage());            System.exit(-1);        }    }    public void discoveryEvent(DiscoveryEvent event) {        System.out.println("DiscoveryEvent called");        DiscoveryResponseMsg  mes = event.getResponse();        //get the ModuleSpecAdvs from this        ModuleSpecAdvertisement moduleSpecAd = null;        Enumeration en = mes.getResponses();        try {            //REMIND: can there be many???            while (en.hasMoreElements()) {                String st = (String)en.nextElement();                try {                moduleSpecAd =                    (ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement(                            new MimeMediaType("text/xml"),                            new ByteArrayInputStream(st.getBytes()));                } catch (ClassCastException classEx) {                    //it was not a module spec                }            }        } catch (IOException e) {            e.printStackTrace();        }        if (moduleSpecAd != null) {            PeerInfoBundle bundle = new PeerInfoBundle(moduleSpecAd);            if (!peerCache.contains(bundle)) {                peerCache.add(bundle);                System.out.println("Discovered peer, added to cache");            } else {                System.out.println("Discovered peer, but it's already cached.");            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -