⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatcherjob.java

📁 javaP2P技术内幕课程111213141516源代码
💻 JAVA
字号:
/* * Created by IntelliJ IDEA. * User: fsommers * Date: Apr 10, 2002 * Time: 10:00:18 PM * To change template for new class use * Code Style | Class Templates options (Tools | IDE Options). */package primecruncher;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.pipe.*;import net.jxta.peergroup.PeerGroup;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.ModuleSpecAdvertisement;import net.jxta.document.*;import net.jxta.id.IDFactory;import java.util.ArrayList;import java.util.HashMap;import java.util.Enumeration;import java.io.PrintWriter;import java.io.IOException;import java.io.InputStream;/** * An instance of this object is created for each client request. This object * does the following: * <ol> * <li>Extract the high, low numbers of the list of primes to be produced. DONE</li> * <li>Divide the list into subtasks, one subtask corresponding to each peer *      specified in the peers array of the constructor. How the work is divided *      is handled by an instance of <code>JobPartitioner</code>, which returns *      an array of <code>StructuredTextDocument</code> s corresponding to a piece of the list *      to be computed by each peer.</li> * <li>Create an input pipe for the results. Add that pipe's advertisement to the *     <code>StructuredTextDocument</code>s produced by <code>JobPartitioner</code>.</li> * <li>For each peer, create a Message that contains the *     <code>StructuredTextDocument</code>. Assign a message ID to each message.</li> * <li>Send that message to the peer, and mark when the message was sent. * <li>When a response arrives via the pipe, mark the message corresponding to that result *     as being answered.</li> * <li>When all messages are answered, sort the sublists to a master list. Create a *     result document, and call the <code>ResultListener</code> with that message.</li> * </ul> * REMIND: What to do if not all the requests are anwered before the timeout? */class DispatcherJob implements Runnable, PipeMsgListener, OutputPipeListener, Comparable  {    private int low = 0;    private int high = 0;    private String jobID;    private PeerInfoBundle peerInfoBundle;    private PendingJob pendingJob;    private int count = 0;    private PeerGroup group;    private static PipeService pService = null;    private InputPipe inPipe = null;    private PipeAdvertisement inPipeAdv = null;    /**     * Start a new job for a compute request.     *     * @param request this document contains the HIGH, LOW, and JobID     * @param peers the working set of peers     * @param listener what to notify when we have all the results in     * @param our current peer group     */    public DispatcherJob(String jobID, int count, int low, int high, PeerInfoBundle peerInfoBundle,                         PendingJob job,                         PeerGroup group) {        this.high = high;        this.low = low;        this.jobID = jobID;        this.count = count;        this.group = group;        this.peerInfoBundle = peerInfoBundle;        this.pendingJob = job;        pService = group.getPipeService();    }    /**     * Create a pipe advert for an input pipe so that the other side can get back to us     * Open an outpipe into each peer, and send that Message     * The peer would then send a message back with the response via the input pipe     * When we get an event from the input pipe, we process it, then call call     * pending job's gotResult     */    public void run() {        System.out.println("Starting dispatcher job execution with low=" + low +                " and high=" + high);        //create input pipe that the other peer will use to send back a result        //add ourselves as a listener on that pipe        inPipeAdv =                (PipeAdvertisement) AdvertisementFactory.newAdvertisement(                        PipeAdvertisement.getAdvertisementType());        final PipeService pService = group.getPipeService();        PipeID pid = IDFactory.newPipeID(group.getPeerGroupID());        inPipeAdv.setPipeID(pid);        try {            inPipe = pService.createInputPipe(inPipeAdv, this);            //open a pipe to the other peer, create a request message,            //add this peer's pipe adv to that message, and send that message            ModuleSpecAdvertisement adv = peerInfoBundle.getModuleSpecAdvertisement();            PipeAdvertisement otherPipe = adv.getPipeAdvertisement();            pService.createOutputPipe(otherPipe, this);        } catch (IOException e) {            System.out.println("An IO Exception occured:" + e.getMessage());            e.printStackTrace();        }        //pendingJob.gotResult(this, result);    }    public void outputPipeEvent(OutputPipeEvent event) {        System.out.println("Connected to other peer's pipe");        try {            Message message = pService.createMessage();            //add the high, low, and the input pipe advert to the message            StructuredTextDocument doc = (StructuredTextDocument)                    inPipeAdv.getDocument(new MimeMediaType("text/xml"));            InputStream is = doc.getStream();            MessageElement advElement = message.newMessageElement(                    ServiceConstants.PIPEADV,                    new MimeMediaType("text/xml"), is);            MessageElement highNum = message.newMessageElement(ServiceConstants.HIGH_INT,                    new MimeMediaType("text/plain"),                    new Integer(high).toString().getBytes());            MessageElement lowNum = message.newMessageElement(ServiceConstants.LOW_INT,                    new MimeMediaType("text/plain"),                    new Integer(low).toString().getBytes());            MessageElement idEl = message.newMessageElement(ServiceConstants.JOBID,                    new MimeMediaType("text/plain"),                    jobID.getBytes());            message.addElement(advElement);            message.addElement(highNum);            message.addElement(lowNum);            message.addElement(idEl);            event.getOutputPipe().send(message);            System.out.println("Sent the message");        } catch (IOException e) {            e.printStackTrace();        }    }    public int compareTo(Object o) {        if (o instanceof DispatcherJob) {            DispatcherJob other = (DispatcherJob)o;            if (count < other.count) return -1;            else if (count > other.count) return 1;        }        return 0;    }    public void pipeMsgEvent(PipeMsgEvent event) {        //here's where we listen for results coming from other peers        System.out.println("Received a message!");        Message mes = event.getMessage();        pendingJob.gotResult(this, new Result(mes));    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -