📄 dispatcher.java
字号:
package primecruncher;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.discovery.DiscoveryService;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryEvent;import net.jxta.pipe.PipeService;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.PipeID;import net.jxta.exception.PeerGroupException;import net.jxta.protocol.DiscoveryResponseMsg;import net.jxta.protocol.ModuleSpecAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.PeerAdvertisement;import net.jxta.document.*;import net.jxta.id.IDFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.codat.Codat;import java.util.*;import java.io.*;import java.net.URL;import java.net.MalformedURLException;import java.net.UnknownServiceException;/** * This class acts as a client-side proxy for the service. */public class Dispatcher implements DiscoveryListener { private static PeerGroup group; private static DiscoveryService discoSvc; private static PipeService pipeSvc; private static Random random; //Keep cached list of peers that offer the service private HashSet peerCache = new HashSet(); private OutputPipe outputPipe; private HashMap jobMap = new HashMap(); public Dispatcher() { startJxta(); random = new Random(); doLocalDiscovery(); discoSvc.getRemoteAdvertisements(null, DiscoveryService.ADV, "Name", ServiceConstants.SPEC_NAME, 1, this); } public Dispatcher(PeerGroup peerGroup) { this.group = peerGroup; pipeSvc = group.getPipeService(); discoSvc = group.getDiscoveryService(); random = new Random(); doLocalDiscovery(); discoSvc.getRemoteAdvertisements(null, DiscoveryService.ADV, "Name", ServiceConstants.SPEC_NAME, 1, this); } private void doLocalDiscovery() { Enumeration res = null; try { res = discoSvc.getLocalAdvertisements(DiscoveryService.ADV, "Name", ServiceConstants.SPEC_NAME); } catch (IOException e) { System.out.println("IO Exception."); } if (res != null) { while (res.hasMoreElements()) { //this will be a ModuleSpecAdvertisement ModuleSpecAdvertisement mspa = (ModuleSpecAdvertisement)res.nextElement(); if (mspa != null) { PeerInfoBundle bundle = new PeerInfoBundle(mspa); if (!peerCache.contains(bundle)) { peerCache.add(bundle); System.out.println("Discovered service locally, added to cache"); } else { System.out.println("Discovered peer locally, but it's already cached."); } } } } } //create request ID. break down list into smaller lists, according to metrics on each //peer in the cache. Create a new dispatcher job for each peer in a new //thread, associate that list with the request ID. //resigter a result listener with each //each dispatcher job will make a call-back to this class, when (a) //it got the results, or (b) if the timeout for that peer has expired //Once all results are received, we call ResultListener and produce the //result document. // public void processPrimes(int low, int high, ResultListener listener) { //create array of all known peers if (peerCache.size() == 0) { doLocalDiscovery(); } PeerInfoBundle[] peers = (PeerInfoBundle[]) peerCache.toArray(new PeerInfoBundle[peerCache.size()]); if (peers.length == 0) { System.out.println("No peers, submit job again!"); return; } //since pending jobs are hashed against a job id, check if no existing job //has the same id String jobID = null; while (jobID == null || (jobMap.containsKey(jobID))) { jobID = new Long(random.nextLong()).toString(); } System.out.println("Allocated job id: " + jobID); PendingJob pendingJob = new PendingJob(jobID, this, listener); jobMap.put(jobID, pendingJob); //TODO: prune the cache here pruneCache(); //1. Create an empty PendingJob -- it collects all the DispatcherJobs // when a job is ready, it calls its pendingJob. whenver a such a call //arrives, PendingJob checks if all jobs are complete. If so, it calls //this Dispatcher's jobComplete(). JobComplete() then normalizes the results, //and calls result listener with the answer. A PendingJob is hashed againsta JobID. System.out.println("Peers length is " + peers.length); //we include the highest number int segment = high - low + 1; int mod = segment % peers.length; int perPiece = (segment - mod)/peers.length; int l = low; int h = 0; int count = 1; for (int i=0; i < peers.length; i++) { h = l + perPiece -1; if (i == peers.length - 1) { h += mod; } DispatcherJob job = new DispatcherJob(jobID, count, l, h, peers[i], pendingJob, group); Thread thread = new Thread(job); thread.setName("Dispatcher job thread " + i); pendingJob.addJob(job); thread.start(); l += perPiece; count++; } } /** * All computations for a pending job have completed. We're ready. */ void jobComplete(PendingJob job, Map[] subResults) { //normalize results, we're done, String finalResultString = mergeResults(subResults); //call result listener with result. ResultListener listener = job.getResultListener(); Map finalResult = new HashMap(); finalResult.put(ServiceConstants.RESULTSTRING, finalResultString); finalResult.put(ServiceConstants.JOBSTATS, makeJobStats(job.getID(), subResults)); listener.resultEvent(finalResult); jobMap.remove(job.getID()); System.out.println("Removed pending job from job map ."); } private ComputeStats makeJobStats(String jobID, Map[] subresults) { ComputeStats stats = new ComputeStats(jobID); for (int i=0; i < subresults.length; i++) { long computeTime = Long.parseLong((String)subresults[i].get(ServiceConstants.ENDTIME)) - Long.parseLong((String)subresults[i].get(ServiceConstants.STARTTIME)); stats.addNodeStat(new Integer(i).toString(), Integer.parseInt((String)subresults[i].get(ServiceConstants.LOW_INT)), Integer.parseInt((String)subresults[i].get(ServiceConstants.HIGH_INT)), computeTime, 0, 0); } return stats; } private String mergeResults(Map[] subRes) { //create a string for the result StringBuffer b = new StringBuffer(); for (int i=0; i < subRes.length; i++) { String resString = (String)subRes[i].get(ServiceConstants.RESULTSTRING); b.append(resString + ","); } //remove last "," b.setLength(b.length()-1); return b.toString(); } private void pruneCache() { //do some cache management here - eliminate stale adverts HashSet adCopy = null; synchronized(peerCache) { adCopy = (HashSet)peerCache.clone(); //} //do some pruning here long currentTime = System.currentTimeMillis(); Iterator it = adCopy.iterator(); while (it.hasNext()) { PeerInfoBundle b = (PeerInfoBundle)it.next(); ModuleSpecAdvertisement ad = b.getModuleSpecAdvertisement(); if (ad.getLocalExpirationTime() < currentTime + (60 * 1000)) { System.out.println("Local expiration: " + ad.getLocalExpirationTime()); peerCache.remove(b); } } } } private void startJxta() { try { group = PeerGroupFactory.newNetPeerGroup(); discoSvc = group.getDiscoveryService(); pipeSvc = group.getPipeService(); } catch (PeerGroupException e) { System.out.println("Can't create net peer group: " + e.getMessage()); System.exit(-1); } } public void discoveryEvent(DiscoveryEvent event) { System.out.println("DiscoveryEvent called"); DiscoveryResponseMsg mes = event.getResponse(); //get the ModuleSpecAdvs from this ModuleSpecAdvertisement moduleSpecAd = null; Enumeration en = mes.getResponses(); try { //REMIND: can there be many??? while (en.hasMoreElements()) { String st = (String)en.nextElement(); try { moduleSpecAd = (ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement( new MimeMediaType("text/xml"), new ByteArrayInputStream(st.getBytes())); } catch (ClassCastException classEx) { //it was not a module spec } } } catch (IOException e) { e.printStackTrace(); } if (moduleSpecAd != null) { PeerInfoBundle bundle = new PeerInfoBundle(moduleSpecAd); if (!peerCache.contains(bundle)) { peerCache.add(bundle); System.out.println("Discovered peer, added to cache"); } else { System.out.println("Discovered peer, but it's already cached."); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -