📄 hungrypeer.java
字号:
import java.io.*;import java.util.Enumeration;import java.util.Vector;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.exception.PeerGroupException;import net.jxta.document.AdvertisementFactory;import net.jxta.document.Advertisement;import net.jxta.document.StructuredDocument;import net.jxta.document.Element;import net.jxta.document.StructuredDocumentFactory;import net.jxta.document.MimeMediaType;import net.jxta.discovery.DiscoveryService;import net.jxta.pipe.PipeService;import net.jxta.pipe.InputPipe;import net.jxta.pipe.PipeID;import net.jxta.pipe.OutputPipe;import net.jxta.endpoint.Message;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.PeerGroupAdvertisement;import net.jxta.id.IDFactory;import net.jxta.discovery.DiscoveryEvent;import net.jxta.discovery.DiscoveryListener;import net.jxta.protocol.DiscoveryResponseMsg;// The HungryPeer joins the RestoNet PeerGroup and searches for// RestoPeers. The HungryPeer then establishes a pipe connection to// all the RestoPeers that it discovered. The HungryPeer sends// auction requests for French fries to RestoPeers and then waits for// auction bids from RestoPeerspublic class HungryPeer implements DiscoveryListener { private PeerGroup netpg = null; // NetPeergroup private PeerGroup restoNet = null; // Resto Peergroup // Services within the RestoNet Peergroup private DiscoveryService disco; // Discovery Service private PipeService pipes; // Pipe Service private PipeAdvertisement myAdv; // Hungry peer pipe advertisement private InputPipe myPipe; // Input pipe to talk to hungry peer private MimeMediaType mimeType = new MimeMediaType("text", "xml"); private int timeout = 3000; // Discovery timeout private int rtimeout = 30000; // Pipe Resolver Timeout // All RestoPeers found private Vector restoPeerAdvs = new Vector(); private Vector restoPeerPipes = new Vector(); private String myIdentity = "Bill Joy"; // Identity of this HungryPeer private String friesRequest ="medium"; // Fries Auction request public static void main(String args[]) { HungryPeer myapp = new HungryPeer(); myapp.startJxta(); System.exit(0); } private void startJxta() { try { // Discover (or create) and join the default jxta NetPeerGroup netpg = PeerGroupFactory.newNetPeerGroup(); } catch (PeerGroupException e) { //Couldn't initialize; can't continue System.out.println("Fatal error : creating the NetPeerGroup"); System.exit(1); } // Discover and join the RestoNet Peergroup try { if (!joinRestoNet()) { System.out.println("Sorry could not find the RestoNet Peergroup"); System.exit(2); } } catch (Exception e) { System.out.println("Can't join RestoNet group"); System.exit(1); } // Set our HungryPeer communication pipe so RestoPeers // can talk to us if (!setHungryPeerPipe()) { System.out.println( "Aborting due to failure to create our HungryPeer pipe"); System.exit(1); } // Register for discovery events for pipe advertisements disco.addDiscoveryListener(this); disco.getRemoteAdvertisements(null, DiscoveryService.ADV, "name", "RestoNet:RestoPipe:*", 5, null); // Wait; processing events as they happen while(true) { receiveFriesBids(); } } public void discoveryEvent(DiscoveryEvent ev) { System.out.println("Processing discovery event"); DiscoveryResponseMsg msg = ev.getResponse(); // The enumeration contains all the pipe advertisements that // were found. Enumeration e = msg.getResponses(); while (e.hasMoreElements()) { try { String s = (String) e.nextElement(); PipeAdvertisement adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement( new MimeMediaType("text/xml"), new ByteArrayInputStream(s.getBytes())); connectAndSend(adv); } catch (Exception ex) { System.out.println("Can't connect to peer " + ex); continue; } } } // This method is used to discover the RestoNet Peergroup. // If found the peer will join the peergroup private boolean joinRestoNet() { int count = 3; // maximum number of attempts to discover System.out.println("Attempting to discover the RestoNet Peergroup"); // Get the Discovery service handle from the NetPeerGroup DiscoveryService hdisco = netpg.getDiscoveryService(); // All discovered RestoNet Peers Enumeration ae = null; // Loop until we find the "RestoNet" Peergroup advertisement // or we've exhausted the desired number of attempts while (count-- > 0) { try { // Check if we have the advertisement in the local // peer cache ae = hdisco.getLocalAdvertisements(DiscoveryService.GROUP, "Name", "RestoNet"); // If we found the RestoNet advertisement, we are done if ((ae != null) && ae.hasMoreElements()) break; // The RestoNet advertisement is not in the local // cache . Send a discovery request to search for it. hdisco.getRemoteAdvertisements(null, DiscoveryService.GROUP, "Name", "RestoNet", 1, null); // Wait to give peers a chance to respond try { Thread.sleep(timeout); } catch (InterruptedException ie) {} } catch (IOException e) { // Found nothing! Move on. } } // Check if we found the RestoNet advertisement if (ae == null || !ae.hasMoreElements()) { return false; } System.out.println("Found the RestoNet PeerGroup Advertisement"); // Get the advertisement PeerGroupAdvertisement adv = (PeerGroupAdvertisement) ae.nextElement(); try { // Call the PeerGroup Factory to instantiate a new // peergroup instance restoNet = netpg.newGroup(adv); // Get the Discovery and Pipe services to // be used within the RestoNet Peergroup disco = restoNet.getDiscoveryService(); pipes = restoNet.getPipeService(); } catch (Exception e) { System.out.println("Could not create RestoPeerGroup"); return false; } System.out.println("The HungryPeer joined the restoNet PeerGroup"); return true; } // Create the HungryPeer pipe to receive bid responses // from RestoPeers. The advertisement of this pipe is sent as part // of the auction request for RestoPeers to respond. private boolean setHungryPeerPipe() { try { // Create a pipe advertisement for our hungry peer. This // pipe will be used within the RestoNet peergroup for other // peers to talk to our hungry peer myAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement( PipeAdvertisement.getAdvertisementType()); // Initialize the advertisement with unique peer information // So we can communicate myAdv.setPipeID(IDFactory.newPipeID(restoNet.getPeerGroupID())); myAdv.setName("restoNet:HungryPipe:" + myIdentity); // Set the pipe type to be unicast unidrectional myAdv.setType(PipeService.UnicastType); // Create the input pipe myPipe = pipes.createInputPipe(myAdv); } catch (Exception e) { System.out.println("Could not create the HungryPeer pipe"); return false; } return true; } // Method to connect and open output pipes to all the // RestoPeers that we have discovered. Each RestoPeer is // identified by its unique RestoPeer pipe advertisement. private void connectAndSend(PipeAdvertisement padv) throws Exception { System.out.println("Attempting to connect to discovered RestoPeer"); // Create an output pipe connection to the RestoPeer OutputPipe op = pipes.createOutputPipe(padv, rtimeout); // Check if we have a connected pipe if (op == null) { // Failed; go to next RestoPeer System.out.println("Failure to connect to RestoPeer Pipe:" + padv.getName()); return; } StructuredDocument request = StructuredDocumentFactory.newStructuredDocument(mimeType, "RestoNet:Request"); // Fill up the Fries auction request argument Element re; re = request.createElement("Name", myIdentity); request.appendChild(re); re = request.createElement("Fries", friesRequest); request.appendChild(re); // Create the pipe message to send Message msg = pipes.createMessage(); // Fill the first message element which is the HungryPeer // pipe advertisement return address. We need this // so RestoPeers can respond to us msg.addElement(msg.newMessageElement("HungryPeerPipe", mimeType, myAdv.getDocument(mimeType).getStream())); // Fill the second message element, which is // the fries request. Insert the document // in the message msg.addElement(msg.newMessageElement("Request", mimeType, request.getStream())); // Send the auction message to the RestoPeer op.send(msg); System.out.println("Sent Fries Auction Request (" + friesRequest + ") to connected peers"); } // Receive bid requests from RestoPeers on the // HungryPeer listening pipe private void receiveFriesBids() { // Continue until we get all answers while (true) { Message msg = null; // Pipe message received String price = null; // Fries price bid String brand = null; // RestoPeer name which offers the bid String specials = null; // Specials offer bid InputStream ip = null; // Input stream to read message element StructuredDocument bid = null; //Bid document received try { // Wait for a bid message to arrive from a RestoPeer // Will block until a message arrive msg = myPipe.waitForMessage(); // Check if the message is valid if (msg == null) { if (Thread.interrupted()) { // We have been asked to stop System.out.println( "Abort Receiving bid loop interrupted"); myPipe.close(); // Close the Pipe return; } } } catch (Exception ex) { // Error in receiving message myPipe.close(); System.out.println("Abort Receiving Error receiving bids"); return; } // We got a message from a RestoPeer. // Extract and display infomation about the bid received. try { // Extract the Bid document from the message ip = msg.getElement("Bid").getStream(); bid = StructuredDocumentFactory.newStructuredDocument( mimeType, ip); // Parse the document to extract bid information Enumeration enum = bid.getChildren(); while (enum.hasMoreElements()) { Element element = (Element) enum.nextElement(); String attr = (String) element.getKey(); String value = (String) element.getValue(); if (attr.equals("Price")) { price = value; continue; } if (attr.equals("Brand")) { brand = value; continue; } if (attr.equals("Specials")) { specials = value; continue; } } // We got a valid bid. Print it. System.out.println("Received Fries Bid from RestoPeers (" + brand + ") at a Price ($" + price + ") \nRestoPeers Special (" + specials + ")"); } catch (Exception e) { // Broken content System.out.println("Error extracting bid from the message"); continue; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -