⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatcher.java

📁 javaP2P技术内幕课程111213141516源代码
💻 JAVA
字号:
/* * Created by IntelliJ IDEA. * User: fsommers * Date: Apr 7, 2002 * Time: 10:26:30 PM * To change template for new class use  * Code Style | Class Templates options (Tools | IDE Options). */package primecruncher;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.discovery.DiscoveryService;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryEvent;import net.jxta.pipe.PipeService;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.PipeID;import net.jxta.exception.PeerGroupException;import net.jxta.protocol.DiscoveryResponseMsg;import net.jxta.protocol.ModuleSpecAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.PeerAdvertisement;import net.jxta.document.*;import net.jxta.id.IDFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.codat.Codat;import java.util.*;import java.io.*;import java.net.URL;import java.net.MalformedURLException;import java.net.UnknownServiceException;/** * This class acts as a client-side proxy for the service. */public class Dispatcher implements DiscoveryListener {    private static PeerGroup group;    private static DiscoveryService discoSvc;    private static PipeService pipeSvc;    private static Random random;    //Keep cached list of peers that offer the service    private HashSet peerCache = new HashSet();    private OutputPipe outputPipe;    private HashMap jobMap = new HashMap();    public Dispatcher() {        startJxta();        random = new Random();        doDiscovery();    }    private void doDiscovery() {        System.out.println("Starting service discovery...");  /*        System.out.println("Searching local cache for " +            ServiceConstants.SPEC_NAME + " advertisements");        Enumeration res = null;        try {            res = discoSvc.getLocalAdvertisements(DiscoveryService.ADV,                "Name", ServiceConstants.SPEC_NAME);        } catch (IOException e) {            System.out.println("IO Exception.");        }        if (res != null) {            while (res.hasMoreElements()) {                //this will be a ModuleSpecAdvertisement                //adverts.add(res.nextElement());            }        }    */        System.out.println("Starting remote discovery...");        discoSvc.getRemoteAdvertisements(null, DiscoveryService.ADV,                "Name", ServiceConstants.SPEC_NAME, 1, this);    }    //create request ID. break down list into smaller lists, according to metrics on each    //peer in the cache. Create a new dispatcher job for each peer in a new    //thread, associate that list with the request ID.    //resigter a result listener with each    //each dispatcher job will make a call-back to this class, when (a)    //it got the results, or (b) if the timeout for that peer has expired    //Once all results are received, we call ResultListener and produce the    //result document.    //    public void processPrimes(int low, int high, ResultListener listener) {        //since pending jobs are hashed against a job id, check if no existing job        //has the same id        String jobID = null;        while (jobID == null || (jobMap.containsKey(jobID))) {            jobID = new Long(random.nextLong()).toString();        }        System.out.println("Allocated job id: " + jobID);        PendingJob pendingJob = new PendingJob(jobID, this, listener);        jobMap.put(jobID, pendingJob);        //TODO: prune the cache here        //create array of all known peers        PeerInfoBundle[] peers = (PeerInfoBundle[])                   peerCache.toArray(new PeerInfoBundle[peerCache.size()]);        //TODO: What to do when there are no peers ready to take the job,        //i.e., peers.length == 0        //1. Create an empty PendingJob -- it collects all the DispatcherJobs        // when a job is ready, it calls its pendingJob. whenver a such a call        //arrives, PendingJob checks if all jobs are complete. If so, it calls        //this Dispatcher's jobComplete(). JobComplete() then normalizes the results,        //and calls result listener with the answer. A PendingJob is hashed againsta  JobID.        System.out.println("Peers length is " + peers.length);        //we include the highest number        int segment = high - low + 1;        int mod = segment % peers.length;        int perPiece = (segment - mod)/peers.length;        int l = low;        int h = 0;        int count = 1;        for (int i=0; i < peers.length; i++) {            h = l + perPiece -1;            if (i == peers.length - 1) {                h  += mod;            }            DispatcherJob job = new DispatcherJob(jobID, count, l, h, peers[i], pendingJob, group);            Thread thread = new Thread(job);            thread.setName("Dispatcher job thread " + i);            pendingJob.addJob(job);            thread.start();            l += perPiece;            count++;        }    }    /**     * All computations for a pending job have completed. We're ready.     */    void jobComplete(PendingJob job, Result[] subResults) {        //normalize results, we're done,        Result finalResult = normalizeResults(subResults);        //call result listener with result.        ResultListener listener = job.getResultListener();        listener.resultEvent(finalResult);        jobMap.remove(job.getID());        System.out.println("Removed pending job from job map.");        //shell we null out listener? It should destroy all subjobs as well    }    private Result normalizeResults(Result[] subRes) {        //create a string for the result        StringBuffer b = new StringBuffer();        for (int i=0; i < subRes.length; i++) {            Message mes = subRes[i].getMessage();            MessageElement el = mes.getElement(ServiceConstants.PRIMELIST);            b.append(new String(el.getBytesOffset()));        }        Result finRes = new Result(b.toString());        return finRes;    }    private void pruneCache() {     //do some cache management here - eliminate stale adverts         HashSet adCopy = null;         synchronized(peerCache) {             adCopy = (HashSet)peerCache.clone();         }         //do some pruning here         long currentTime = System.currentTimeMillis();         Iterator it = adCopy.iterator();         while (it.hasNext()) {             ModuleSpecAdvertisement ad = (ModuleSpecAdvertisement)it.next();             if (ad.getLocalExpirationTime() < currentTime + (2 * 60 * 1000)) {                 adCopy.remove(ad);             }         }         //REMIND: Is this OK?         peerCache = adCopy;    }    private void startJxta() {        try {            group = PeerGroupFactory.newNetPeerGroup();            discoSvc = group.getDiscoveryService();            pipeSvc = group.getPipeService();        } catch (PeerGroupException e) {            System.out.println("Can't create net peer group: " + e.getMessage());            System.exit(-1);        }    }    public void discoveryEvent(DiscoveryEvent event) {        System.out.println("DiscoveryEvent called");        DiscoveryResponseMsg  mes = event.getResponse();        String padv = mes.getPeerAdv();        PeerAdvertisement peerAdv = null;        try {            peerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(                    new MimeMediaType("text/xml"),                    new ByteArrayInputStream(padv.getBytes()));        } catch (IOException ex) {            ex.printStackTrace();        }        //get the ModuleSpecAdvs from this        ModuleSpecAdvertisement moduleSpecAd = null;        Enumeration en = mes.getResponses();        try {            //REMIND: can there be many???            while (en.hasMoreElements()) {                String st = (String)en.nextElement();                moduleSpecAd =                    (ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement(                            new MimeMediaType("text/xml"),                            new ByteArrayInputStream(st.getBytes()));            }        } catch (IOException e) {            e.printStackTrace();        }        if (peerAdv != null && moduleSpecAd != null) {            PeerInfoBundle bundle = new PeerInfoBundle(peerAdv, moduleSpecAd);            if (!peerCache.contains(bundle)) {                peerCache.add(bundle);                System.out.println("Discovered peer, added to cache");            } else {                System.out.println("Discovered peer, but it's already cached.");            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -