📄 primepeer.java
字号:
package primecruncher;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.peergroup.PeerGroupID;import net.jxta.discovery.DiscoveryService;import net.jxta.pipe.*;import net.jxta.exception.PeerGroupException;import net.jxta.protocol.ModuleClassAdvertisement;import net.jxta.protocol.ModuleSpecAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.document.*;import net.jxta.platform.ModuleClassID;import net.jxta.platform.ModuleSpecID;import net.jxta.id.IDFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import java.io.*;import java.util.Enumeration;/** * A peer that generates lists of prime numbers between any 2 * integers. This peer advertises a module spec, which contains a * pipe advertisement. Other peers can connect to it via that pipe, * and send a message, containing two integers. Upon computing the list * of all prime numbers between those integers, this peer will send back * the resulting list to the requesting peers. */public class PrimePeer { private static PeerGroup group; private static DiscoveryService discoSvc; private static PipeService pipeSvc; private InputPipe inputPipe; //the file name to save a pipe advertisement to //This advertisement must be saved so that the service will //create a semantically identical pipe when it is rebooted private static final String PIPE_ADV_FILE = "primeserver_pipe.adv"; /** * The main method to start the service. */ public static void main(String[] argv) { PrimePeer server = new PrimePeer(); server.startJxta(); server.doAdvertise(); server.startService(); } public PrimePeer() { } /** * Create a module class advertisement for the service. Then, * create a module spec advertisement, and add a pipe advertisement to it. * Other peers can connect to back via that pipe, and send request messages. */ private void doAdvertise() { ModuleClassAdvertisement classAd = (ModuleClassAdvertisement) AdvertisementFactory.newAdvertisement( ModuleClassAdvertisement.getAdvertisementType()); ModuleClassID classID = IDFactory.newModuleClassID(); classAd.setModuleClassID(classID); classAd.setName(ServiceConstants.CLASS_NAME); classAd.setDescription("A prime number crunching service."); try { discoSvc.publish(classAd, DiscoveryService.ADV); discoSvc.remotePublish(classAd, DiscoveryService.ADV); System.out.println("Published module class adv."); } catch (IOException e) { System.out.println("Trouble publishing module class adv: " + e.getMessage()); } ModuleSpecAdvertisement specAd = (ModuleSpecAdvertisement) AdvertisementFactory.newAdvertisement( ModuleSpecAdvertisement.getAdvertisementType()); ModuleSpecID specID = IDFactory.newModuleSpecID(classID); specAd.setModuleSpecID(specID); specAd.setName(ServiceConstants.SPEC_NAME); specAd.setDescription("Specification for a prime number crunching service"); specAd.setCreator("Sams Publishing"); specAd.setSpecURI("http://www.samspublishing.com/p2p/primecruncher"); specAd.setVersion("Version 1.0"); PipeAdvertisement pipeAd = null; //if we already have a saved pipe advertisement, use that //Otherwise, create a new advertisement. try { FileInputStream is = new FileInputStream(PIPE_ADV_FILE); pipeAd = (PipeAdvertisement) AdvertisementFactory.newAdvertisement( new MimeMediaType("text/xml"), is); is.close(); System.out.println("Read pipe advert from disk."); } catch (IOException e) { pipeAd = (PipeAdvertisement) AdvertisementFactory.newAdvertisement( PipeAdvertisement.getAdvertisementType()); PipeID pid = IDFactory.newPipeID(group.getPeerGroupID()); pipeAd.setPipeID(pid); pipeAd.setName("JXTAPIPE:com.sams.p2p.primecruncher"); //save pipeAd in file Document pipeAdDoc = pipeAd.getDocument(new MimeMediaType("text/xml")); try { FileOutputStream os = new FileOutputStream(PIPE_ADV_FILE); pipeAdDoc.sendToStream(os); os.flush(); os.close(); System.out.println("Wrote pipe advertisement to disk."); } catch (IOException ex) { System.out.println("Can't save pipe advertisement to file " + PIPE_ADV_FILE); System.exit(-1); } } //add the pipe advertisement to the module spec advertisement specAd.setPipeAdvertisement(pipeAd); //to make sure we've created the right advertisements, print them //on the standard output. try { StructuredTextDocument doc = (StructuredTextDocument) specAd.getDocument(new MimeMediaType("text/plain")); StringWriter out = new StringWriter(); doc.sendToWriter(out); System.out.println(out); out.close(); } catch (IOException e) { e.printStackTrace(); } //publish the module spec advertisement both locally and remotely try { discoSvc.publish(specAd, DiscoveryService.ADV); discoSvc.remotePublish(specAd, DiscoveryService.ADV, ServiceConstants.ADVLENGTH); System.out.println("Published module spec adv"); } catch (IOException e) { System.out.println("Trouble publishing module spec adv: " + e.getMessage()); } //create an input pipe based on the advertisement try { inputPipe = pipeSvc.createInputPipe(pipeAd); System.out.println("Created input pipe"); } catch (IOException e) { System.out.println("Can't create input pipe. " + e.getMessage()); } } /** * This method just waits for incoming message on the pipe */ private void startService() { while (true) { Message msg = null; String highInt = null; String lowInt = null; String jobID = null; try { msg = inputPipe.waitForMessage(); System.out.println("Received a message"); } catch (InterruptedException ex) { } try { MessageElement hiElem = msg.getElement(ServiceConstants.HIGH_INT); MessageElement loElem = msg.getElement(ServiceConstants.LOW_INT); MessageElement idEl = msg.getElement(ServiceConstants.JOBID); StructuredTextDocument hiDoc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument( new MimeMediaType("text/xml"), hiElem.getStream()); StructuredTextDocument lowDoc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument( new MimeMediaType("text/xml"), loElem.getStream()); StructuredTextDocument idDoc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument( new MimeMediaType("text/xml"), idEl.getStream()); highInt = hiDoc.getTextValue(); lowInt = lowDoc.getTextValue(); jobID = idDoc.getTextValue(); } catch (IOException e) { e.printStackTrace(); } if (highInt != null || lowInt != null) { final Message res = pipeSvc.createMessage(); StructuredTextDocument resD = processInput(jobID, highInt, lowInt); try { MessageElement resElem = res.newMessageElement(ServiceConstants.RESULT, new MimeMediaType("text/xml"), resD.getStream()); res.addElement(resElem); } catch (IOException e) { e.printStackTrace(); } if (res != null) { try { //open a pipe back to the client, and send that message down the pipe MessageElement adEl = msg.getElement(ServiceConstants.PIPEADV); PipeAdvertisement backPipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement( new MimeMediaType("text/xml"), adEl.getStream()); pipeSvc.createOutputPipe(backPipeAdv, new OutputPipeListener() { public void outputPipeEvent(OutputPipeEvent event) { System.out.println("PrimePeer connected back to client"); OutputPipe outPipe = event.getOutputPipe(); try { outPipe.send(res); System.out.println("Sent response message!"); } catch (IOException e) { e.printStackTrace(); } } }); } catch (IOException e) { e.printStackTrace(); } } } } } private StructuredTextDocument processInput(String jobID, String high, String low) { StructuredTextDocument resDoc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(new MimeMediaType("text/xml"), ServiceConstants.RESULT); try { long startTime = System.currentTimeMillis(); int[] res = PrimeSearch.findPrimes(Integer.parseInt(low), Integer.parseInt(high)); long endTime = System.currentTimeMillis(); //response message elements: //jobID, high, low //starttime, end time, string containing comma-separated list of primes TextElement idEl = resDoc.createElement(ServiceConstants.JOBID, jobID); TextElement loEl = resDoc.createElement(ServiceConstants.LOW_INT, low); TextElement hiEl = resDoc.createElement(ServiceConstants.HIGH_INT, high); TextElement startTimeE = resDoc.createElement(ServiceConstants.STARTTIME, new Long(startTime).toString()); TextElement endTimeE = resDoc.createElement(ServiceConstants.ENDTIME, new Long(endTime).toString()); StringBuffer b = new StringBuffer(); for (int i = 0; i < res.length; i++) { b.append(new Integer(res[i]).toString() + ","); } //for the last one, chop off comma b.setLength(b.length() - 1); TextElement resultStringE = resDoc.createElement(ServiceConstants.RESULTSTRING, b.toString()); resDoc.appendChild(idEl); resDoc.appendChild(loEl); resDoc.appendChild(hiEl); resDoc.appendChild(startTimeE); resDoc.appendChild(endTimeE); resDoc.appendChild(resultStringE); } catch (NumberFormatException e) { System.out.println("Number format is wrong:" + e.getMessage()); } return resDoc; } /** * Initialize the JXTA runtime */ private void startJxta() { try { group = PeerGroupFactory.newNetPeerGroup(); discoSvc = group.getDiscoveryService(); pipeSvc = group.getPipeService(); } catch (PeerGroupException e) { System.out.println("Cannot create Net Peer Group: " + e.getMessage()); System.exit(-1); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -