⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pingpeer.java

📁 Java p2p程序设计2002年版
💻 JAVA
字号:
package net.jxta.impl.shell.bin.ping;import net.jxta.impl.shell.*;import java.io.*;import java.util.*;import net.jxta.exception.*;import net.jxta.discovery.*;import net.jxta.endpoint.*;import net.jxta.pipe.*;import net.jxta.peergroup.*;import net.jxta.document.*;import net.jxta.id.*;import net.jxta.protocol.*;import java.util.*;/** * * This class provides all the core capability of our ping implementation * on top of the ping framework. This class provides the protocol format, * and ping workflow (e.g. request-response). It does this by instantiating * the appropriate handlers and implementing the appropriate interfaces and * then filling in the missing work (i.e. protocol format, workflow etc.) */public class PingPeer implements PingListener, PingReplyListener {        DiscoveryService discovery;    PipeService groupPipe;    PeerGroup group;    private static final int WaitingTime = 2000;    private static final int  MAXRETRIES = 5;    String myname = "";    Vector replyListeners = new Vector();    Vector listeners = new Vector();    InputPipe cPipeIn = null;    InputPipe sPipeIn = null;    InputPipe pPipeIn = null;    ShellEnv env;    private static final String PING_PREFIX = "ping.";    /** Creates new PingPeer */    public PingPeer(DiscoveryService d, PeerGroup group, ShellEnv env) throws PingException {        discovery = d;        groupPipe = group.getPipeService();        this.group = group;        myname = group.getPeerName();        this.env = env;        setup();    }        /**     * When a reply to a ping arrives, this callback is invoked.     * @param event The callback event     */    public void pingReply(PingReplyEvent event) {        notifyReplyListeners(event);    }        /**     * Called when a ping message is received.     * @param event The callback event     */    public void pingEvent(PingEvent event) {                String servname = event.getPeerName();        long otstamp = event.getTimestamp();                String replyStr = myname+":"+event.getDifferential()+":"+event.getTimestamp();        replyToPing(replyStr,servname);        notifyListeners(event);    }        public void reset() {        // Unimplemented           }        /**     * Builds and sends a reply to a ping event     * @param reply Reply string to send     * @param clientName Name of client to reply to as indicated in their     * ping message     */    private void replyToPing(String reply, String clientName) {                // Find pipe for the client pinging me        PipeAdvertisement pipeAd = findPeerPipe(PING_PREFIX+clientName+"client");                OutputPipe pipeOut = null;        try {            try {                pipeOut = groupPipe.createOutputPipe(pipeAd,  50000);                if (pipeOut == null) {                    return;                }            } catch (Exception e) {                return;            }            Message msg = null;            msg = groupPipe.createMessage();            long timestamp = new Date().getTime();            InputStream ip = new ByteArrayInputStream(reply.getBytes());            MessageElement me = msg.newMessageElement("reply", new MimeMediaType("text/plain"),  ip);            msg.addElement(me);            pipeOut.send(msg);            pipeOut.close();                    } catch (Exception e) {        }            }        /**     * Initialize the client by creating initializing it's inbound listening     * pipe and publishing it. Also, create a PingReplyHandler and store it     * in the environment so it can be found in future ping invocations and     * not recreated in that case.     */    private void setup() throws PingException {        PipeAdvertisement adv = null;                        ShellObject ph3Obj = env.get("pr3");        if(ph3Obj==null) {            // Build an advertisement for my input pipe            // and publish it, but only once            try {                                adv = createPipeAd(PING_PREFIX+group.getPeerName()+"client",PipeService.UnicastType);                if(adv==null) throw new PingException();                // Not really using the bundles, but could                PipeBundle pb = new PipeBundle(adv,groupPipe);                                // This is my client pipe waiting to hear back from                // pinged peers                cPipeIn = pb.getInputPipe();                            } catch( Exception all ) {                throw new PingException();            }                        PingReplyHandler pr3 = new PingReplyHandler(cPipeIn);            pr3.addPingReplyListener(this);            pr3.waitForMessage();            env.add("pr3",new ShellObject("pr3",pr3));            try {                // So republish the ad for the client reply pipe in case it gets lost                // which means it won't work the rest of the shell session.                discovery.publish(adv, DiscoveryService.ADV);                discovery.remotePublish(adv, DiscoveryService.ADV);            } catch (Exception e) {                throw new PingException();            }        }            }            /**     * install and run the ping listeners.     */    public void listen() throws PingException {                PipeAdvertisement adv = null;        PipeAdvertisement adv1 = null;                try {                        /**             * Create the unicast input pipe to this peer             */            adv = createPipeAd(PING_PREFIX+group.getPeerName()+"server",PipeService.UnicastType);            PipeBundle pb1 = new PipeBundle(adv,groupPipe);            sPipeIn = pb1.getInputPipe();                        /**             * Create the propagate input pipe to this peer             */            adv1 = findPeerPipe(PING_PREFIX+"pingpropserver");            if(adv1 == null)                 adv1 = createPipeAd(PING_PREFIX+"pingpropserver",PipeService.PropagateType);            PipeBundle pb2 = new PipeBundle(adv1,groupPipe);            pPipeIn = pb2.getInputPipe();                                    /**             * Build PingHandlers for the unicast and propagate inbound pipes             * When a ping message arrives on either pipe, the pingEvent(PingEvent e)             * method is invoked.             */            ShellObject ph1Obj = env.get("ph1");            if(ph1Obj==null) {                PingHandler ph1 = new PingHandler(sPipeIn);                ph1.addPingListener(this);                ph1.waitForMessage();                env.add("ph1",new ShellObject("ph1",ph1));            }            ShellObject ph2Obj = env.get("ph2");            if(ph2Obj==null) {                PingHandler ph2 = new PingHandler(pPipeIn);                ph2.addPingListener(this);                ph2.waitForMessage();                env.add("ph2",new ShellObject("ph2",ph2));            }            /**             * Publish the pipe advertisements.             */            discovery.publish(adv, DiscoveryService.ADV);            discovery.remotePublish(adv, DiscoveryService.ADV);            discovery.publish(adv1, DiscoveryService.ADV);            discovery.remotePublish(adv1, DiscoveryService.ADV);        } catch (Exception e) {            throw new PingException();        }    }        /**     * Send a ping request across the supplied pipe advertisement.     * @param adv The pipe advertisement corresponding to a peer     * that has run 'ping -listen' to receive ping requests.     */    public void ping(PipeAdvertisement adv) throws PingException {        OutputPipe pipeOut;                try {                        pipeOut = groupPipe.createOutputPipe(adv, 50000);            if (pipeOut == null) {                throw new PingException();            }            Message msg = null;            msg = groupPipe.createMessage();            long timestamp = new Date().getTime();                        // We've defined our protocol in a compact string format            // It could easily be done using message elements and probably            // should -- a good side project for the reader.            String data = new String(myname+":"+timestamp);            InputStream ip = new ByteArrayInputStream(data.getBytes());            MessageElement me = msg.newMessageElement("ping", new MimeMediaType("text/plain"),  ip);            msg.addElement(me);            pipeOut.send(msg);            pipeOut.close();                    } catch (Exception e) {            throw new PingException() ;        }    }        /**     * Send a ping request across a pipe named 'pingpropagate' which is published     * to be a propagate pipe. The ping request should then be received by all     * input pipes bound to the ad.     */    public void propagate() throws PingException {        ping("pingprop");    }        /**     * Send a ping message to a peer listening on an input pipe represented     * by the supplied advertisement.     * @param adv The pipe advertisement of the peer to ping.     */    public void ping(String name) throws PingException {                OutputPipe pipeOut = null;        try {            PipeAdvertisement adv = findPeerPipe(PING_PREFIX+name+"server");            ping(adv);                    } catch (Exception e) {            throw new PingException() ;        }    }        /**     * Find a pipe advertisement by name     * @param name Name of pipe     * @return The PipeAdvertisement     */    public PipeAdvertisement findPeerPipe(String name) {                // Look to see if I have a valid ad locally        PipeAdvertisement pipeAd = getLocalPipe(name);        if(pipeAd==null) {            // Search for remote ad and store it in local cache            discovery.getRemoteAdvertisements(null, DiscoveryService.ADV,PipeAdvertisement.NameTag, name,2,null);            int i=0;            while (true) {                try {                    pipeAd = getLocalPipe(name);                    if (pipeAd!=null){                        return pipeAd;                    }                    if(i>MAXRETRIES) break;                    Thread.sleep(WaitingTime);                    i++;                } catch (Exception e) {                    // Just drop out if an error occurs                    // Ok for our purposes                }            }                    } else return pipeAd;                        return null;    }        private PipeAdvertisement getLocalPipe(String name) {        Enumeration enum = null;                try {            enum = discovery.getLocalAdvertisements(DiscoveryService.ADV, PipeAdvertisement.NameTag, name);                        if ((enum != null) && (enum.hasMoreElements())) {                PipeAdvertisement adv = null;                                while (enum.hasMoreElements()) {                                        try {                        adv = (PipeAdvertisement) enum.nextElement();                    } catch(Exception e) {                        continue;                    }                    if(isValid(adv,name)) {                        return adv;                    }                }            }                    } catch (Exception e) {            // Ok for our purposes        }        return null;    }            /**     * Determine if a given advertisement is indeed valid.     */    private boolean isValid(PipeAdvertisement adv, String name) {                if (adv == null) return false;        if (adv.getName() == null) return false;        if (adv.getName().equals(name)) return true;        return false;    }        /**     * Helper method to create input pipe advertisements.     * @param name Name of pipe     * @param type Type of pipe     * @return The PipeAdvertisement     */    private PipeAdvertisement createPipeAd(String name, String type) {        PipeAdvertisement adv;                try {            // Create a pipe advertisement for this pipe.            adv = (PipeAdvertisement)            AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType() );            // Associate pipe id with group id, creating a new id.            // (PeerGroupID) group.getPeerGroupID()            adv.setPipeID( IDFactory.newPipeID((PeerGroupID) group.getPeerGroupID()) );            // Use peer name here            adv.setName(name);            adv.setType(type);            return adv;        }        catch( Exception all ) {            return null;        }    }        /**     * Notify listeners of a ping reply event.     * @param event The PingReplyEvent     */    protected void notifyReplyListeners(PingReplyEvent event) {        for(int i=0;i<listeners.size();i++)            ((PingReplyListener)replyListeners.elementAt(i)).pingReply(event);    }        public void addPingReplyListener(PingReplyListener ppml) {        replyListeners.addElement(ppml);    }        public void removePingReplyListener(PingReplyListener ppml) {        replyListeners.removeElement(ppml);    }        /**     * Notify interested listeners that a PingEvent has occured and send the     * ping event.     */    private void notifyListeners(PingEvent event) {        for(int i=0;i<listeners.size();i++)            ((PingListener)listeners.elementAt(i)).pingEvent(event);    }        public void addPingListener(PingListener ppml) {        listeners.addElement(ppml);    }        public void removePingListener(PingListener ppml) {        listeners.removeElement(ppml);    }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -