⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 piperesolver.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        localInputPipes.clear();    }        /**     * {@inheritDoc}     **/    public boolean register(InputPipe ip) {                PipeID pipeID = (PipeID) ip.getPipeID();                synchronized (this) {            if (localInputPipes.containsKey(pipeID)) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Existing registered InputPipe for " + pipeID);                }                                return false;            }                        // Register this input pipe                        if (!ip.getType().equals(PipeService.PropagateType)) {                boolean registered = myGroup.getEndpointService().addIncomingMessageListener((EndpointListener) ip, "PipeService", pipeID.toString());                                if (!registered) {                    if (LOG.isEnabledFor(Level.ERROR)) {                        LOG.error("Existing registered Endpoint Listener for " + pipeID);                    }                                        return false;                }            }                        if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Registering local InputPipe for " + pipeID);            }                        localInputPipes.put(pipeID, ip);        }                // Call anyone who may be listening for this input pipe.        callListener(0, pipeID, ip.getType(), myGroup.getPeerID(), false);                return true;    }        /**     *  Return the local {@link net.jxta.pipe.InputPipe InputPipe}, if any, for the     *  specified {@link net.jxta.pipe.PipeID PipeID}.     *     * @param   pipeID  the PipeID who's InputPipe is desired.     * @return       The InputPipe object.     **/    public InputPipe findLocal(PipeID pipeID) {                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Find local InputPipe for " + pipeID);        }                // First look if the pipe is a local InputPipe        InputPipe ip = (InputPipe) localInputPipes.get(pipeID);                // Found it.        if ((null != ip) && LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("found local InputPipe for " + pipeID);        }                return ip;    }        /**     * {@inheritDoc}     **/    public boolean forget(InputPipe pipe) {                PipeID pipeID = (PipeID) pipe.getPipeID();                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Unregistering local InputPipe for " + pipeID);        }                InputPipe ip;        synchronized (this) {            ip = (InputPipe) localInputPipes.remove(pipeID);        }                if ((null != ip) && !ip.getType().equals(PipeService.PropagateType)) {            // remove the queue for the general demux            EndpointListener removed = myGroup.getEndpointService().removeIncomingMessageListener("PipeService", pipeID.toString());                        if ((null == removed) || (pipe != removed)) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("removeIncomingMessageListener() did not remove correct pipe!");                }            }        }                return (ip != null);    }        /**     *  Add a pipe resolver listener     *     * @param  listener  listener     **/    synchronized boolean addListener(PipeID pipeID, Listener listener, int queryID) {                Map perpipelisteners = (Map) outputpipeListeners.get(pipeID);                // if no map for this pipeid, make one and add it to the top map.        if (null == perpipelisteners) {            perpipelisteners = new HashMap();                        outputpipeListeners.put(pipeID, perpipelisteners);        }                Integer queryKey = new Integer(queryID);                boolean alreadyThere = perpipelisteners.containsKey(queryKey);                if (!alreadyThere) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("adding listener for " + pipeID + " / " + queryID);            }                        perpipelisteners.put(queryKey, listener);        }                return alreadyThere;    }        /**     *  Call the listener for the specified pipe id informing it about the     *  specified peer.     *     *  @param qid      The query this callback is being made in response to.     *  @param pipeID   The pipe which is the subject of the event.     *  @param type     The type of the pipe which is the subject of the event.     *  @param peer     The peer on which the remote input pipe was found.     **/    void callListener(int qid, PipeID pipeID, String type, PeerID peer, boolean NAK) {                Event newevent = new Event(this, peer, pipeID, type, qid);                boolean handled = false;                while (!handled) {            Listener pl = null;            synchronized (this) {                Map perpipelisteners = (Map) outputpipeListeners.get(pipeID);                                if (null == perpipelisteners) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("No listener for pipe: " + pipeID);                    }                    break;                }                                pl = (Listener) perpipelisteners.get(new Integer(qid));            }                        if (null != pl) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Calling Pipe resolver listener " + (NAK ? "NAK " : "") + "for pipe : " + pipeID);                }                                try {                    if (NAK) {                        handled = pl.pipeNAKEvent(newevent);                    } else {                        handled = pl.pipeResolveEvent(newevent);                    }                } catch (Throwable ignored) {                    if (LOG.isEnabledFor(Level.WARN)) {                        LOG.warn("Uncaught Throwable in listener for: " + pipeID + "(" + pl.getClass().getName() + ")", ignored);                    }                }            }                        // if we havent tried it already, try it with the ANYQUERY            if (ANYQUERY == qid) {                break;            }                        qid = ANYQUERY;        }    }        /**     *  Remove a pipe resolver listener     *     * @param pipeid listener to remove     * @param queryid matching queryid.     * @return listener object removed     **/    synchronized Listener removeListener(PipeID pipeID, int queryID) {                Map perpipelisteners = (Map) outputpipeListeners.get(pipeID);                if (null == perpipelisteners) {            return null;        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("removing listener for :" + pipeID + " / " + queryID);        }                Integer queryKey = new Integer(queryID);                Listener removedListener = (Listener) perpipelisteners.remove(queryKey);                if (0 == perpipelisteners.size()) {            outputpipeListeners.remove(pipeID);        }                return removedListener;    }        /**     *  Send a request to find an input pipe     *     *  @param  adv the advertisement for the pipe we are seeking.     *  @param  acceptablePeers the set of peers at which we wish the pipe to     *  be resolved. We will not accept responses from peers other than those     *  in this set. Empty set means all peers are acceptable.     *  @param queryID the query ID to use for the query. if zero then a query     *  ID will be generated     *  @return the query id under which the request was sent     **/    int sendPipeQuery(PipeAdvertisement adv, Set acceptablePeers, int queryID) {                // choose a query id if non-prechosen.        if (0 == queryID) {            queryID = getNextQueryID();        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug((acceptablePeers.isEmpty() ? "Undirected" : "Directed") + " query (" + queryID + ") for " + adv.getPipeID());        }                Collection targetPeers = acceptablePeers;                // check local srdi to see if we have a potential answer        List knownLocations = srdiIndex.query(adv.getType(), PipeAdvertisement.IdTag, adv.getPipeID().toString(), 100);                if (!knownLocations.isEmpty()) {            // we think we know where the pipe might be...                        if (!acceptablePeers.isEmpty()) {                // only keep those peers which are acceptable.                knownLocations.retainAll(acceptablePeers);            }                        // if the known locations contain any of the acceptable peers then            // we will send a directed query to ONLY those peers.            if (!knownLocations.isEmpty()) {                targetPeers = knownLocations;                                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Using SRDI cache results for directed query (" + queryID + ") for " + adv.getPipeID());                }            }        }                // build the pipe query message.        PipeResolverMessage pipeQry = new PipeResolverMsg();                pipeQry.setMsgType(MessageType.QUERY);        pipeQry.setPipeID(adv.getPipeID());        pipeQry.setPipeType(adv.getType());        Iterator eachPeer = targetPeers.iterator();        while (eachPeer.hasNext()) {            pipeQry.addPeerID((PeerID) eachPeer.next());        }                StructuredTextDocument asDoc = (StructuredTextDocument) pipeQry.getDocument(MimeMediaType.XMLUTF8);                // build the resolver query        ResolverQuery query = new ResolverQuery();                query.setHandlerName(PipeResolverName);        query.setCredential(credentialDoc);        query.setQueryId(queryID);        query.setSrc(myGroup.getPeerID().toString());        query.setQuery(asDoc.toString());                if (targetPeers.isEmpty()) {            // we have no idea, walk the tree                        if (myGroup.isRendezvous()) {                // We are a rdv, then send it to the replica peer.                                PeerID peer = srdi.getReplicaPeer(pipeQry.getPipeType() + PipeAdvertisement.IdTag + pipeQry.getPipeID().toString());                                if (null != peer) {                    srdi.forwardQuery(peer.toString(), query);                    return queryID;                }            }                        resolver.sendQuery(null, query);        } else {            // send it only to the peers whose result we would accept.                        eachPeer = targetPeers.iterator();            while (eachPeer.hasNext()) {                resolver.sendQuery(eachPeer.next().toString(), query);            }        }                return queryID;    }        /**     *  {@inheritDoc}     **/    SrdiIndex getSrdiIndex() {        return srdiIndex;    }        /**     *  {@inheritDoc}     *     *  <p/>This implementation knows nothing of deltas, it just pushes it all.     **/    private void pushSrdi(PeerID peer, boolean all) {                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Pushing " + (all ? "all" : "deltas") + " SRDI to " + peer);        }                Map types = new HashMap();                synchronized (this) {            Iterator eachPipe = localInputPipes.values().iterator();                        while (eachPipe.hasNext()) {                InputPipe ip = (InputPipe) eachPipe.next();                Entry entry = new Entry(PipeAdvertisement.IdTag, ip.getPipeID().toString(), Long.MAX_VALUE);                                String type = ip.getType();                                List entries = (List) types.get(type);                                if (null == entries) {                    entries = new Vector();                    types.put(type, entries);                }                                entries.add(entry);            }        }                Iterator eachType = types.keySet().iterator();                while (eachType.hasNext()) {            String type = (String) eachType.next();                        Vector entries = (Vector) types.get(type);            eachType.remove();                        if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Sending a Pipe SRDI messsage in " + myGroup.getPeerGroupID() + " of " + entries.size() + " entries of type " + type);            }                        SrdiMessage srdiMsg = new SrdiMessageImpl(myGroup.getPeerID(), 1, // ttl                    type, entries);                        if (null == peer) {                srdi.pushSrdi(null, srdiMsg);            } else {                srdi.pushSrdi(peer, srdiMsg);            }        }    }        /**     * Push SRDI entry for the specified pipe     *     * @param   ip  the pipe who's entry we are pushing     * @param adding    adding an entry for the pipe or expiring the entry?     **/    void pushSrdi(InputPipe ip, boolean adding) {                srdiIndex.add(ip.getType(), PipeAdvertisement.IdTag, ip.getPipeID().toString(), myGroup.getPeerID(), adding ? Long.MAX_VALUE : 0);                SrdiMessage srdiMsg;        try {            srdiMsg = new SrdiMessageImpl(myGroup.getPeerID(), 1, // ttl                    ip.getType(), PipeAdvertisement.IdTag, ip.getPipeID().toString(), adding ? Long.MAX_VALUE : 0);                        if (myGroup.isRendezvous()) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug(                            "Replicating a" + (adding ? "n add" : " remove") + " Pipe SRDI entry for pipe [" + ip.getPipeID() + "] of type "                            + ip.getType());                }                                srdi.replicateEntries(srdiMsg);            } else {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug(                            "Sending a" + (adding ? "n add" : " remove") + " Pipe SRDI messsage for pipe [" + ip.getPipeID() + "] of type "                            + ip.getType());                }                                srdi.pushSrdi(null, srdiMsg);            }        } catch (Throwable e) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Uncaught throwable pushing SRDI entries", e);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -