📄 pingpeer.java
字号:
package net.jxta.impl.shell.bin.ping;import net.jxta.impl.shell.*;import java.io.*;import java.util.*;import net.jxta.exception.*;import net.jxta.discovery.*;import net.jxta.endpoint.*;import net.jxta.pipe.*;import net.jxta.peergroup.*;import net.jxta.document.*;import net.jxta.id.*;import net.jxta.protocol.*;import java.util.*;/** * * This class provides all the core capability of our ping implementation * on top of the ping framework. This class provides the protocol format, * and ping workflow (e.g. request-response). It does this by instantiating * the appropriate handlers and implementing the appropriate interfaces and * then filling in the missing work (i.e. protocol format, workflow etc.) */public class PingPeer implements PingListener, PingReplyListener { DiscoveryService discovery; PipeService groupPipe; PeerGroup group; private static final int WaitingTime = 2000; private static final int MAXRETRIES = 5; String myname = ""; Vector replyListeners = new Vector(); Vector listeners = new Vector(); InputPipe cPipeIn = null; InputPipe sPipeIn = null; InputPipe pPipeIn = null; ShellEnv env; private static final String PING_PREFIX = "ping."; /** Creates new PingPeer */ public PingPeer(DiscoveryService d, PeerGroup group, ShellEnv env) throws PingException { discovery = d; groupPipe = group.getPipeService(); this.group = group; myname = group.getPeerName(); this.env = env; setup(); } /** * When a reply to a ping arrives, this callback is invoked. * @param event The callback event */ public void pingReply(PingReplyEvent event) { notifyReplyListeners(event); } /** * Called when a ping message is received. * @param event The callback event */ public void pingEvent(PingEvent event) { String servname = event.getPeerName(); long otstamp = event.getTimestamp(); String replyStr = myname+":"+event.getDifferential()+":"+event.getTimestamp(); replyToPing(replyStr,servname); notifyListeners(event); } public void reset() { // Unimplemented } /** * Builds and sends a reply to a ping event * @param reply Reply string to send * @param clientName Name of client to reply to as indicated in their * ping message */ private void replyToPing(String reply, String clientName) { // Find pipe for the client pinging me PipeAdvertisement pipeAd = findPeerPipe(PING_PREFIX+clientName+"client"); OutputPipe pipeOut = null; try { try { pipeOut = groupPipe.createOutputPipe(pipeAd, 50000); if (pipeOut == null) { return; } } catch (Exception e) { return; } Message msg = null; msg = groupPipe.createMessage(); long timestamp = new Date().getTime(); InputStream ip = new ByteArrayInputStream(reply.getBytes()); MessageElement me = msg.newMessageElement("reply", new MimeMediaType("text/plain"), ip); msg.addElement(me); pipeOut.send(msg); pipeOut.close(); } catch (Exception e) { } } /** * Initialize the client by creating initializing it's inbound listening * pipe and publishing it. Also, create a PingReplyHandler and store it * in the environment so it can be found in future ping invocations and * not recreated in that case. */ private void setup() throws PingException { PipeAdvertisement adv = null; ShellObject ph3Obj = env.get("pr3"); if(ph3Obj==null) { // Build an advertisement for my input pipe // and publish it, but only once try { adv = createPipeAd(PING_PREFIX+group.getPeerName()+"client",PipeService.UnicastType); if(adv==null) throw new PingException(); // Not really using the bundles, but could PipeBundle pb = new PipeBundle(adv,groupPipe); // This is my client pipe waiting to hear back from // pinged peers cPipeIn = pb.getInputPipe(); } catch( Exception all ) { throw new PingException(); } PingReplyHandler pr3 = new PingReplyHandler(cPipeIn); pr3.addPingReplyListener(this); pr3.waitForMessage(); env.add("pr3",new ShellObject("pr3",pr3)); try { // So republish the ad for the client reply pipe in case it gets lost // which means it won't work the rest of the shell session. discovery.publish(adv, DiscoveryService.ADV); discovery.remotePublish(adv, DiscoveryService.ADV); } catch (Exception e) { throw new PingException(); } } } /** * install and run the ping listeners. */ public void listen() throws PingException { PipeAdvertisement adv = null; PipeAdvertisement adv1 = null; try { /** * Create the unicast input pipe to this peer */ adv = createPipeAd(PING_PREFIX+group.getPeerName()+"server",PipeService.UnicastType); PipeBundle pb1 = new PipeBundle(adv,groupPipe); sPipeIn = pb1.getInputPipe(); /** * Create the propagate input pipe to this peer */ adv1 = findPeerPipe(PING_PREFIX+"pingpropserver"); if(adv1 == null) adv1 = createPipeAd(PING_PREFIX+"pingpropserver",PipeService.PropagateType); PipeBundle pb2 = new PipeBundle(adv1,groupPipe); pPipeIn = pb2.getInputPipe(); /** * Build PingHandlers for the unicast and propagate inbound pipes * When a ping message arrives on either pipe, the pingEvent(PingEvent e) * method is invoked. */ ShellObject ph1Obj = env.get("ph1"); if(ph1Obj==null) { PingHandler ph1 = new PingHandler(sPipeIn); ph1.addPingListener(this); ph1.waitForMessage(); env.add("ph1",new ShellObject("ph1",ph1)); } ShellObject ph2Obj = env.get("ph2"); if(ph2Obj==null) { PingHandler ph2 = new PingHandler(pPipeIn); ph2.addPingListener(this); ph2.waitForMessage(); env.add("ph2",new ShellObject("ph2",ph2)); } /** * Publish the pipe advertisements. */ discovery.publish(adv, DiscoveryService.ADV); discovery.remotePublish(adv, DiscoveryService.ADV); discovery.publish(adv1, DiscoveryService.ADV); discovery.remotePublish(adv1, DiscoveryService.ADV); } catch (Exception e) { throw new PingException(); } } /** * Send a ping request across the supplied pipe advertisement. * @param adv The pipe advertisement corresponding to a peer * that has run 'ping -listen' to receive ping requests. */ public void ping(PipeAdvertisement adv) throws PingException { OutputPipe pipeOut; try { pipeOut = groupPipe.createOutputPipe(adv, 50000); if (pipeOut == null) { throw new PingException(); } Message msg = null; msg = groupPipe.createMessage(); long timestamp = new Date().getTime(); // We've defined our protocol in a compact string format // It could easily be done using message elements and probably // should -- a good side project for the reader. String data = new String(myname+":"+timestamp); InputStream ip = new ByteArrayInputStream(data.getBytes()); MessageElement me = msg.newMessageElement("ping", new MimeMediaType("text/plain"), ip); msg.addElement(me); pipeOut.send(msg); pipeOut.close(); } catch (Exception e) { throw new PingException() ; } } /** * Send a ping request across a pipe named 'pingpropagate' which is published * to be a propagate pipe. The ping request should then be received by all * input pipes bound to the ad. */ public void propagate() throws PingException { ping("pingprop"); } /** * Send a ping message to a peer listening on an input pipe represented * by the supplied advertisement. * @param adv The pipe advertisement of the peer to ping. */ public void ping(String name) throws PingException { OutputPipe pipeOut = null; try { PipeAdvertisement adv = findPeerPipe(PING_PREFIX+name+"server"); ping(adv); } catch (Exception e) { throw new PingException() ; } } /** * Find a pipe advertisement by name * @param name Name of pipe * @return The PipeAdvertisement */ public PipeAdvertisement findPeerPipe(String name) { // Look to see if I have a valid ad locally PipeAdvertisement pipeAd = getLocalPipe(name); if(pipeAd==null) { // Search for remote ad and store it in local cache discovery.getRemoteAdvertisements(null, DiscoveryService.ADV,PipeAdvertisement.NameTag, name,2,null); int i=0; while (true) { try { pipeAd = getLocalPipe(name); if (pipeAd!=null){ return pipeAd; } if(i>MAXRETRIES) break; Thread.sleep(WaitingTime); i++; } catch (Exception e) { // Just drop out if an error occurs // Ok for our purposes } } } else return pipeAd; return null; } private PipeAdvertisement getLocalPipe(String name) { Enumeration enum = null; try { enum = discovery.getLocalAdvertisements(DiscoveryService.ADV, PipeAdvertisement.NameTag, name); if ((enum != null) && (enum.hasMoreElements())) { PipeAdvertisement adv = null; while (enum.hasMoreElements()) { try { adv = (PipeAdvertisement) enum.nextElement(); } catch(Exception e) { continue; } if(isValid(adv,name)) { return adv; } } } } catch (Exception e) { // Ok for our purposes } return null; } /** * Determine if a given advertisement is indeed valid. */ private boolean isValid(PipeAdvertisement adv, String name) { if (adv == null) return false; if (adv.getName() == null) return false; if (adv.getName().equals(name)) return true; return false; } /** * Helper method to create input pipe advertisements. * @param name Name of pipe * @param type Type of pipe * @return The PipeAdvertisement */ private PipeAdvertisement createPipeAd(String name, String type) { PipeAdvertisement adv; try { // Create a pipe advertisement for this pipe. adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType() ); // Associate pipe id with group id, creating a new id. // (PeerGroupID) group.getPeerGroupID() adv.setPipeID( IDFactory.newPipeID((PeerGroupID) group.getPeerGroupID()) ); // Use peer name here adv.setName(name); adv.setType(type); return adv; } catch( Exception all ) { return null; } } /** * Notify listeners of a ping reply event. * @param event The PingReplyEvent */ protected void notifyReplyListeners(PingReplyEvent event) { for(int i=0;i<listeners.size();i++) ((PingReplyListener)replyListeners.elementAt(i)).pingReply(event); } public void addPingReplyListener(PingReplyListener ppml) { replyListeners.addElement(ppml); } public void removePingReplyListener(PingReplyListener ppml) { replyListeners.removeElement(ppml); } /** * Notify interested listeners that a PingEvent has occured and send the * ping event. */ private void notifyListeners(PingEvent event) { for(int i=0;i<listeners.size();i++) ((PingListener)listeners.elementAt(i)).pingEvent(event); } public void addPingListener(PingListener ppml) { listeners.addElement(ppml); } public void removePingListener(PingListener ppml) { listeners.removeElement(ppml); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -