⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 discoveryserviceimpl.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                }            }        }        DiscoveryEvent newevent = new DiscoveryEvent(srcAddress, res, response.getQueryId());        DiscoveryListener dl = listenerTable.get(new Integer(response.getQueryId()));        if (dl != null) {            try {                dl.discoveryEvent(new DiscoveryEvent(srcAddress, res, response.getQueryId()));            } catch (Throwable all) {                LOG.log(Level.SEVERE, "Uncaught Throwable in listener :" + Thread.currentThread().getName(), all);            }        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("processed a response for query #" + response.getQueryId() + " in :" + (System.currentTimeMillis() - t0));        }        // are there any registered discovery listeners,        // generate the event and callback.        t0 = System.currentTimeMillis();        DiscoveryListener[] allListeners = listeners.toArray(new DiscoveryListener[0]);        for (DiscoveryListener allListener : allListeners) {            try {                allListener.discoveryEvent(newevent);            } catch (Throwable all) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING                            ,                            "Uncaught Throwable in listener (" + allListener.getClass().getName() + ") :"                                    + Thread.currentThread().getName()                            ,                            all);                }            }        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Called all listenters to query #" + response.getQueryId() + " in :" + (System.currentTimeMillis() - t0));        }    }    /**     * {@inheritDoc}     */    public int processQuery(ResolverQueryMsg query) {        return processQuery(query, null);    }    /**     * {@inheritDoc}     */    public int processQuery(ResolverQueryMsg query, EndpointAddress srcAddress) {        if (stopped) {            return ResolverService.OK;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            if (srcAddress != null) {                LOG.fine("Processing query #" + query.getQueryId() + " from:" + srcAddress);            } else {                LOG.fine("Processing query #" + query.getQueryId() + " from: unknown");            }        }        List<String> results;        List<Long> expirations = new ArrayList<Long>();        DiscoveryQuery dq;        long t0 = System.currentTimeMillis();        try {            XMLDocument asDoc = (XMLDocument)                    StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(query.getQuery()));            dq = new DiscoveryQuery(asDoc);        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Malformed query : ", e);            }            return ResolverService.OK;        }        if ((dq.getThreshold() < 0) || (dq.getDiscoveryType() < PEER) || (dq.getDiscoveryType() > ADV)) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Malformed query");            }            return ResolverService.OK;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine(                    "Got a " + dirname[dq.getDiscoveryType()] + " query #" + query.getQueryId() + " query :" + dq.getAttr()                            + " = " + dq.getValue());        }        /*         // Get the Peer Adv from the query and publish it.         PeerAdvertisement padv = dq.getPeerAdvertisement();         try {         if (!(padv.getPeerID().toString()).equals(localPeerId)) {         // publish others only. Since this one comes from outside,         // we must not keep it beyond its expiration time.         // FIXME: [jice@jxta.org 20011112] In theory there should         // be an expiration time associated with it in the msg, like         // all other items.         publish(padv, PEER, DEFAULT_EXPIRATION, DEFAULT_EXPIRATION);         }         } catch (Exception e) {         if (LOG.isLoggable(Level.FINE)) {         LOG.fine("Bad Peer Adv in Discovery Query", e);         }         }         */        /*         *  threshold==0 and type==PEER is a special case. In this case we are         *  responding for the purpose of providing our own adv only.         */        int thresh = Math.min(dq.getThreshold(), MAX_RESPONSES);        /*         *  threshold==0 and type==PEER is a special case. In this case we are         *  responding for the purpose of providing our own adv only.         */        if ((dq.getDiscoveryType() == PEER) && (0 == dq.getThreshold())) {            respond(query, dq, Collections.singletonList(group.getPeerAdvertisement().toString())                    ,                    Collections.singletonList(DiscoveryService.DEFAULT_EXPIRATION));            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Responding to query #" + query.getQueryId() + " in :" + (System.currentTimeMillis() - t0));            }            return ResolverService.OK;        } else {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("start local search query" + dq.getAttr() + " " + dq.getValue());            }            results = search(dq.getDiscoveryType(), dq.getAttr(), dq.getValue(), thresh, true, expirations);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("start local search pruned " + results.size());            }        }        // We only share the advs with > 0 expiration time.        Iterator<Long> eachExpiration = expirations.iterator();        Iterator eachAdv = results.iterator();        while (eachExpiration.hasNext()) {            eachAdv.next();            if (eachExpiration.next() <= 0) {                eachAdv.remove();                eachExpiration.remove();            }        }        if (!results.isEmpty()) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Responding to " + dirname[dq.getDiscoveryType()] + " Query : " + dq.getAttr() + " = " + dq.getValue());            }            respond(query, dq, results, expirations);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Responded to query #" + query.getQueryId() + " in :" + (System.currentTimeMillis() - t0));            }            return ResolverService.OK;        } else {            // If this peer is a rendezvous, simply let the resolver            // re-propagate the query.            // If this peer is not a rendez, just discard the query.            if (!group.isRendezvous()) {                return ResolverService.OK;            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Querying SrdiIndex query #" + query.getQueryId());            }            List<PeerID> res = srdiIndex.query(dirname[dq.getDiscoveryType()], dq.getAttr(), dq.getValue(), thresh);            if (!res.isEmpty()) {                srdi.forwardQuery(res, query, thresh);                return ResolverService.OK;            } else if (query.getHopCount() == 0) {                PeerID destPeer = srdi.getReplicaPeer(dirname[dq.getDiscoveryType()] + dq.getAttr() + dq.getValue());                // destPeer can be null in a small rpv (<3)                if (destPeer != null) {                    if (!destPeer.equals(group.getPeerID())) {                        srdi.forwardQuery(destPeer, query);                        return ResolverService.OK;                    } else {                        // start the walk since this peer is this the starting peer                        query.incrementHopCount();                    }                }            }        }        return ResolverService.Repropagate;    }    private void respond(ResolverQueryMsg query, DiscoveryQuery dq, List results, List<Long> expirations) {        if (localonly || stopped) {            return;        }        ResolverResponseMsg response;        DiscoveryResponse dresponse = new DiscoveryResponse();        // peer adv is optional, skip        dresponse.setDiscoveryType(dq.getDiscoveryType());        dresponse.setQueryAttr(dq.getAttr());        dresponse.setQueryValue(dq.getValue());        dresponse.setResponses(results);        dresponse.setExpirations(expirations);        // create a response from the query        response = query.makeResponse();        response.setCredential(credentialDoc);        response.setResponse(dresponse.toString());        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Responding to " + query.getSrcPeer());        }        resolver.sendResponse(query.getSrcPeer().toString(), response);    }    /**     * {@inheritDoc}     */    public synchronized void addDiscoveryListener(DiscoveryListener listener) {        listeners.add(listener);    }    /**     * {@inheritDoc}     */    public synchronized boolean removeDiscoveryListener(DiscoveryListener listener) {        Iterator<Map.Entry<Integer, DiscoveryListener>> e = listenerTable.entrySet().iterator();        while (e.hasNext()) {            Map.Entry<Integer, DiscoveryListener> anEntry = e.next();            if (listener == anEntry.getValue()) {                e.remove();            }        }        return (listeners.remove(listener));    }    /**     * {@inheritDoc}     */    public void remotePublish(String peerid, Advertisement adv, long timeout) {        if (localonly || stopped) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("localonly, no network operations performed");            }            return;        }        int type;        if (adv instanceof PeerAdvertisement) {            type = PEER;        } else if (adv instanceof PeerGroupAdvertisement) {            type = GROUP;        } else {            type = ADV;        }        remotePublish(peerid, adv, type, timeout);    }    /*     *  remote publish the advertisement     */    private void remotePublish(String peerid, Advertisement adv, int type, long expiration) {        if (localonly || stopped) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("localonly, no network operations performed");            }            return;        }        // In case this is invoked before startApp().        if (resolver == null) {            return;        }        switch (type) {            case PEER:                if (adv instanceof PeerAdvertisement) {                    break;                }                throw new IllegalArgumentException("Not a peer advertisement");            case GROUP:                if (adv instanceof PeerGroupAdvertisement) {                    break;                }                throw new IllegalArgumentException("Not a peergroup advertisement");            case ADV:                break;            default:                throw new IllegalArgumentException("Unknown advertisement type");        }        List<String> advert = new ArrayList<String>(1);        List<Long> expirations = new ArrayList<Long>(1);        advert.add(adv.toString());        expirations.add(expiration);        DiscoveryResponseMsg dresponse = new DiscoveryResponse();        dresponse.setDiscoveryType(type);        dresponse.setResponses(advert);        dresponse.setExpirations(expirations);        ResolverResponseMsg pushRes = new ResolverResponse();        pushRes.setHandlerName(handlerName);        pushRes.setCredential(credentialDoc);        pushRes.setQueryId(REMOTE_PUBLISH_QUERYID);        pushRes.setResponse(dresponse.toString());        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Remote publishing ");        }        resolver.sendResponse(peerid, pushRes);    }    /**     * Search for a doc, that matches attr, and value     * bytes is set to true if the caller wants wire format of the     * advertisement, or set to false if caller wants Advertisement     * objects.     *     * @param type        Discovery type PEER, GROUP, ADV     * @param threshold   the upper limit of responses from one peer     * @param bytes       flag to indicate how the results are returned-- advs, or streams     * @param expirations List containing the expirations associated with is returned     * @param attr        attribute name to narrow discovery to Valid values for     *                    this parameter are null (don't care), or exact element name in the     *                    advertisement of interest (e.g. "Name")     * @param value       Value     * @return list of results either as docs, or Strings     */    private List search(int type, String attr, String value, int threshold, boolean bytes, List<Long> expirations) {        if (stopped) {            return new ArrayList();        }        if (type == PEER) {            checkUpdatePeerAdv();        }        List results;        if (threshold <= 0) {            throw new IllegalArgumentException("threshold must be greater than zero");        }        if (expirations != null) {            expirations.clear();        }        if (attr != null) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Searching for " + threshold + " entries of type : " + dirname[type]);            }            // a discovery query with a specific search criteria.            results = cm.search(dirname[type], attr, value, threshold, expirations);        } else {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Getting " + threshold + " entries of type : " + dirname[type]);            }            // Returning any entry that exists            results = cm.getRecords(dirname[type], threshold, expirations);        }        if (results.isEmpty() || bytes) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Returning " + results.size() + " results");            }            // nothing more to do;            return results;        }        // Convert the input streams returned by the cm into Advertisements.        List<Advertisement> advertisements = new ArrayList<Advertisement>();        for (int i = 0; i < results.size(); i++) {            InputStream bis = null;            try {                bis = (InputStream) results.get(i);                XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, bis);                Advertisement adv = AdvertisementFactory.newAdvertisement(asDoc);                advertisements.add(adv);            } catch (Exception e) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING, "Failed building advertisment", e);                }                // we won't be including this advertisement so remove it's expiration.                if (null != expirations) {                    expirations.remove(i);                }            } finally {                if (null != bis) {                    try {                        bis.close();                    } catch (IOException ignored) {                        // ignored                    }                }                bis = null;            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -