⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 piperesolver.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            }        }        localInputPipes.clear();    }    /**     * {@inheritDoc}     */    public boolean register(InputPipe ip) {        PipeID pipeID = (PipeID) ip.getPipeID();        synchronized (this) {            if (localInputPipes.containsKey(pipeID)) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Existing registered InputPipe for " + pipeID);                }                return false;            }            // Register this input pipe            boolean registered = endpoint.addIncomingMessageListener((EndpointListener) ip, "PipeService", pipeID.toString());            if (!registered) {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.severe("Existing registered Endpoint Listener for " + pipeID);                }                return false;            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Registering local InputPipe for " + pipeID);            }            localInputPipes.put(pipeID, ip);        }        // Announce the pipe to SRDI so that others will know we are listening.        pushSrdi(ip, true);        // Call anyone who may be listening for this input pipe.        callListener(0, pipeID, ip.getType(), myGroup.getPeerID(), false);        return true;    }    /**     * Return the local {@link net.jxta.pipe.InputPipe InputPipe}, if any, for the     * specified {@link net.jxta.pipe.PipeID PipeID}.     *     * @param pipeID the PipeID who's InputPipe is desired.     * @return The InputPipe object.     */    public InputPipe findLocal(PipeID pipeID) {        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Find local InputPipe for " + pipeID);        }        // First look if the pipe is a local InputPipe        InputPipe ip = localInputPipes.get(pipeID);        // Found it.        if ((null != ip) && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("found local InputPipe for " + pipeID);        }        return ip;    }    /**     * {@inheritDoc}     */    public boolean forget(InputPipe pipe) {        PipeID pipeID = (PipeID) pipe.getPipeID();        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Unregistering local InputPipe for " + pipeID);        }        // Unconditionally announce the change to SRDI.        // FIXME 20040529 bondolo This is overkill, it should be able to wait        // until the deltas are pushed.        pushSrdi(pipe, false);        InputPipe ip;        synchronized (this) {            ip = localInputPipes.remove(pipeID);        }        if (pipe != ip) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Pipe removed was not the same as the pipe to be removed!");            }        }        if (null != ip) {            // remove the queue for the general demux            EndpointListener removed = endpoint.removeIncomingMessageListener("PipeService", pipeID.toString());            if ((null == removed) || (pipe != removed)) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("removeIncomingMessageListener() did not remove correct pipe!");                }            }        }        return (ip != null);    }    /**     * Add a pipe resolver listener     *     * @param listener listener     * @param queryID    The query this callback is being made in response to.     * @param pipeID The pipe which is the subject of the event.     * @return true if sucessfully added     */    synchronized boolean addListener(ID pipeID, Listener listener, int queryID) {        Map<Integer, Listener> perpipelisteners = outputpipeListeners.get(pipeID);        // if no map for this pipeid, make one and add it to the top map.        if (null == perpipelisteners) {            perpipelisteners = new HashMap<Integer, Listener>();            outputpipeListeners.put(pipeID, perpipelisteners);        }        boolean alreadyThere = perpipelisteners.containsKey(queryID);        if (!alreadyThere) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("adding listener for " + pipeID + " / " + queryID);            }            perpipelisteners.put(queryID, listener);        }        return alreadyThere;    }    /**     * Call the listener for the specified pipe id informing it about the     * specified peer.     *     * @param qid    The query this callback is being made in response to.     * @param pipeID The pipe which is the subject of the event.     * @param type   The type of the pipe which is the subject of the event.     * @param peer   The peer on which the remote input pipe was found.     * @param NAK indicate whether the event is a nack     */    void callListener(int qid, ID pipeID, String type, PeerID peer, boolean NAK) {        Event newevent = new Event(this, peer, pipeID, type, qid);        boolean handled = false;        while (!handled) {            Listener pipeListener;            synchronized (this) {                Map<Integer, Listener> perpipelisteners = outputpipeListeners.get(pipeID);                if (null == perpipelisteners) {                    if ((ANYQUERY != qid) && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("No listener for pipe " + pipeID);                    }                    break;                }                pipeListener = perpipelisteners.get(qid);            }            if (null != pipeListener) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Calling Pipe resolver listener " + (NAK ? "NAK " : "") + "for " + pipeID);                }                try {                    if (NAK) {                        handled = pipeListener.pipeNAKEvent(newevent);                    } else {                        handled = pipeListener.pipeResolveEvent(newevent);                    }                } catch (Throwable ignored) {                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                        LOG.log(Level.WARNING                                ,                                "Uncaught Throwable in listener for: " + pipeID + "(" + pipeListener.getClass().getName() + ")", ignored);                    }                }            }            // if we havent tried it already, try it with the ANYQUERY            if (ANYQUERY == qid) {                break;            }            qid = ANYQUERY;        }    }    /**     * Remove a pipe resolver listener     *     * @param pipeID  listener to remove     * @param queryID matching queryid.     * @return listener object removed     */    synchronized Listener removeListener(ID pipeID, int queryID) {        Map<Integer, Listener> perpipelisteners = outputpipeListeners.get(pipeID);        if (null == perpipelisteners) {            return null;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("removing listener for " + pipeID + " / " + queryID);        }        Listener removedListener = perpipelisteners.remove(queryID);        if (perpipelisteners.isEmpty()) {            outputpipeListeners.remove(pipeID);        }        return removedListener;    }    /**     * Send a request to find an input pipe     *     * @param adv             the advertisement for the pipe we are seeking.     * @param acceptablePeers the set of peers at which we wish the pipe to     *                        be resolved. We will not accept responses from peers other than those     *                        in this set. Empty set means all peers are acceptable.     * @param queryID         the query ID to use for the query. if zero then a query     *                        ID will be generated     * @return the query id under which the request was sent     */    int sendPipeQuery(PipeAdvertisement adv, Set<? extends ID> acceptablePeers, int queryID) {        // choose a query id if non-prechosen.        if (0 == queryID) {            queryID = getNextQueryID();        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine((acceptablePeers.isEmpty() ? "Undirected" : "Directed") + " query (" + queryID + ") for " + adv.getPipeID());        }        Collection<? extends ID> targetPeers = new ArrayList<ID>(acceptablePeers);        // check local srdi to see if we have a potential answer        List<? extends ID> knownLocations = srdiIndex.query(adv.getType(), PipeAdvertisement.IdTag, adv.getPipeID().toString(), 100);        if (!knownLocations.isEmpty()) {            // we think we know where the pipe might be...            if (!acceptablePeers.isEmpty()) {                // only keep those peers which are acceptable.                knownLocations.retainAll(acceptablePeers);            }            // if the known locations contain any of the acceptable peers then            // we will send a directed query to ONLY those peers.            if (!knownLocations.isEmpty()) {                targetPeers = knownLocations;                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Using SRDI cache results for directed query (" + queryID + ") for " + adv.getPipeID());                }            }        }        // build the pipe query message.        PipeResolverMessage pipeQry = new PipeResolverMsg();        pipeQry.setMsgType(MessageType.QUERY);        pipeQry.setPipeID(adv.getPipeID());        pipeQry.setPipeType(adv.getType());        for (Object targetPeer : targetPeers) {            pipeQry.addPeerID((PeerID) targetPeer);        }        StructuredTextDocument asDoc = (StructuredTextDocument) pipeQry.getDocument(MimeMediaType.XMLUTF8);        // build the resolver query        ResolverQuery query = new ResolverQuery();        query.setHandlerName(PipeResolverName);        query.setQueryId(queryID);        query.setSrcPeer(myGroup.getPeerID());        query.setQuery(asDoc.toString());        CurrentCredential current = currentCredential;        if (null != current) {            query.setCredential(current.credentialDoc);        }        if (targetPeers.isEmpty()) {            // we have no idea, walk the tree            if (myGroup.isRendezvous()) {                // We are a rdv, then send it to the replica peer.                PeerID peer = srdi.getReplicaPeer(pipeQry.getPipeType() + PipeAdvertisement.IdTag + pipeQry.getPipeID().toString());                if (null != peer) {                    srdi.forwardQuery(peer, query);                    return queryID;                }            }            resolver.sendQuery(null, query);        } else {            // send it only to the peers whose result we would accept.            for (ID targetPeer : targetPeers) {                resolver.sendQuery(targetPeer.toString(), query);            }        }        return queryID;    }    /**     * {@inheritDoc}     */    SrdiIndex getSrdiIndex() {        return srdiIndex;    }    /**     * {@inheritDoc}     * <p/>     * This implementation knows nothing of deltas, it just pushes it all.     */    private void pushSrdi(PeerID peer, boolean all) {        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Pushing " + (all ? "all" : "deltas") + " SRDI to " + peer);        }        Map<String, List<Entry>> types = new HashMap<String, List<Entry>>();        synchronized (this) {            for (InputPipe ip : localInputPipes.values()) {                Entry entry = new Entry(PipeAdvertisement.IdTag, ip.getPipeID().toString(), Long.MAX_VALUE);                String type = ip.getType();                List<Entry> entries = types.get(type);                if (null == entries) {                    entries = new ArrayList<Entry>();                    types.put(type, entries);                }                entries.add(entry);            }        }        for (String type : types.keySet()) {            List<Entry> entries = types.get(type);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Sending a Pipe SRDI messsage in " + myGroup.getPeerGroupID() + " of " + entries.size() + " entries of type " + type);            }            SrdiMessage srdiMsg = new SrdiMessageImpl(myGroup.getPeerID(), 1, type, entries);            if (null == peer) {                srdi.pushSrdi(null, srdiMsg);            } else {                srdi.pushSrdi(peer, srdiMsg);            }        }    }    /**     * Push SRDI entry for the specified pipe     *     * @param ip     the pipe who's entry we are pushing     * @param adding adding an entry for the pipe or expiring the entry?     */    private void pushSrdi(InputPipe ip, boolean adding) {        srdiIndex.add(ip.getType(), PipeAdvertisement.IdTag, ip.getPipeID().toString(), myGroup.getPeerID(),                adding ? Long.MAX_VALUE : 0);        SrdiMessage srdiMsg;        try {            srdiMsg = new SrdiMessageImpl(myGroup.getPeerID(), 1, // ttl                    ip.getType(), PipeAdvertisement.IdTag, ip.getPipeID().toString(), adding ? Long.MAX_VALUE : 0);            if (myGroup.isRendezvous()) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Replicating a" + (adding ? "n add" : " remove") + " Pipe SRDI entry for pipe [" + ip.getPipeID()                            + "] of type " + ip.getType());                }                srdi.replicateEntries(srdiMsg);            } else {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Sending a" + (adding ? "n add" : " remove") + " Pipe SRDI messsage for pipe [" + ip.getPipeID()                            + "] of type " + ip.getType());                }                srdi.pushSrdi(null, srdiMsg);            }        } catch (Throwable e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Uncaught throwable pushing SRDI entries", e);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -