⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 discoveryserviceimpl.java

📁 jxta平台的开发包
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                adv = (Advertisement) en.nextElement();                exp = ((Long) exps.nextElement()).longValue();                if (exp > 0 && adv != null) {                    try {                        publish(adv, exp, exp);                    } catch (Exception e) {                        if (LOG.isEnabledFor(Level.WARN)) {                            LOG.warn("Error publishing Advertisement", e);                        }                    }                }            }        } else {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Received empty responses");            }        }        DiscoveryEvent newevent = new DiscoveryEvent(srcAddress, res, response.getQueryId());        DiscoveryListener dl = (DiscoveryListener)                               listenerTable.get(new Integer(response.getQueryId()));        if (dl != null) {            try {                dl.discoveryEvent(new DiscoveryEvent(srcAddress, res, response.getQueryId()));            } catch (Throwable all) {                LOG.fatal("Uncaught Throwable in listener :" + Thread.currentThread().getName(), all);            }        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("processed a response for query #" + response.getQueryId() + " in :" + (System.currentTimeMillis() - t0));        }        // are there any registered discovery listeners,        // generate the event and callback.        t0 = System.currentTimeMillis();        DiscoveryListener[] allListeners = (DiscoveryListener[]) listeners.toArray(new DiscoveryListener[0]);        for (int eachListener = 0; eachListener < allListeners.length; eachListener++) {            try {                allListeners[eachListener].discoveryEvent(newevent);            } catch (Throwable all) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn(                        "Uncaught Throwable in listener (" + allListeners[eachListener].getClass().getName() + ") :"                        + Thread.currentThread().getName(),                        all);                }            }        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Called all listenters to query #" + response.getQueryId() + " in :" + (System.currentTimeMillis() - t0));        }    }    /**     *  {@inheritDoc}     */    public int processQuery(ResolverQueryMsg query) {        return processQuery(query, null);    }    /**     *  {@inheritDoc}     */    public int processQuery(ResolverQueryMsg query, EndpointAddress srcAddress) {        if (LOG.isEnabledFor(Level.DEBUG)) {            if (srcAddress != null) {                LOG.debug("Processing query #" + query.getQueryId()+" from:"+srcAddress);            } else  {                LOG.debug("Processing query #" + query.getQueryId()+" from: unknown");            }        }        Vector results = null;        Vector expirations = new Vector();        DiscoveryQuery dq;        long t0 = System.currentTimeMillis();        try {            XMLDocument asDoc = (XMLDocument)                                StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(query.getQuery()));            dq = new DiscoveryQuery(asDoc);        } catch (Exception e) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Malformed query : ", e);            }            return ResolverService.OK;        }        if ((dq.getThreshold() < 0) || (dq.getDiscoveryType() < PEER) || (dq.getDiscoveryType() > ADV)) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Malformed query");            }            return ResolverService.OK;        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Got a " + dirname[dq.getDiscoveryType()] + " query #" + query.getQueryId() + " query :" + dq.getAttr() + " = " + dq.getValue());        }        /*         // Get the Peer Adv from the query and publish it.         PeerAdvertisement padv = dq.getPeerAdvertisement();         try {         if (!(padv.getPeerID().toString()).equals(localPeerId)) {         // publish others only. Since this one comes from outside,         // we must not keep it beyond its expiration time.         // FIXME: [jice@jxta.org 20011112] In theory there should         // be an expiration time associated with it in the msg, like         // all other items.         publish(padv, PEER, DEFAULT_EXPIRATION, DEFAULT_EXPIRATION);         }         } catch (Exception e) {         if (LOG.isEnabledFor(Level.DEBUG)) {         LOG.debug("Bad Peer Adv in Discovery Query", e);         }         }         */        /*         *  threshold==0 and type==PEER is a special case. In this case we are         *  responding for the purpose of providing our own adv only.         */        int thresh = Math.min(dq.getThreshold(), MAX_RESPONSES);        /*         *  threshold==0 and type==PEER is a special case. In this case we are         *  responding for the purpose of providing our own adv only.         */        if ((dq.getDiscoveryType() == PEER) && (0 == dq.getThreshold())) {            results = new Vector();            results.add(group.getPeerAdvertisement().toString());            expirations.add(new Long(DiscoveryService.DEFAULT_EXPIRATION));            respond(query, dq, results, expirations);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Responding to query #" + query.getQueryId() + " in :" + (System.currentTimeMillis() - t0));            }            return ResolverService.OK;        } else {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("start local search query" + dq.getAttr() + " " + dq.getValue());            }            results = search(dq.getDiscoveryType(), dq.getAttr(), dq.getValue(), thresh, true, expirations);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("start local search pruned " + results.size());            }        }        // We only share the advs with > 0 expiration time.        Iterator<Long> eachExpiration = (Iterator<Long>) expirations.iterator();        Iterator eachAdv = results.iterator();        while( eachExpiration.hasNext() ) {            eachAdv.next();            if( eachExpiration.next() <= 0 ) {                eachAdv.remove();                eachExpiration.remove();            }        }        if (!results.isEmpty()) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Responding to " + dirname[dq.getDiscoveryType()] + " Query : " + dq.getAttr() + " = " + dq.getValue());            }            respond(query, dq, results, expirations);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Responded to query #" + query.getQueryId() + " in :" + (System.currentTimeMillis() - t0));            }            return ResolverService.OK;        } else {            // If this peer is a rendezvous, simply let the resolver            // re-propagate the query.            // If this peer is not a rendez, just discard the query.            if (!group.isRendezvous()) {                return ResolverService.OK;            }            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Querying SrdiIndex query #" + query.getQueryId());            }            Vector res = srdiIndex.query(dirname[dq.getDiscoveryType()], dq.getAttr(), dq.getValue(), thresh);            if (res.size() > 0) {                srdi.forwardQuery(res, query, thresh);                return ResolverService.OK;            } else if (query.getHopCount() == 0) {                PeerID destPeer = srdi.getReplicaPeer(dirname[dq.getDiscoveryType()] + dq.getAttr() + dq.getValue());                // destPeer can be null in a small rpv (<3)                if (destPeer != null) {                    if (!destPeer.equals(group.getPeerID())) {                        srdi.forwardQuery(destPeer, query);                        return ResolverService.OK;                    } else {                        // start the walk since this peer is this the starting peer                        query.incrementHopCount();                    }                }            }        }        return ResolverService.Repropagate;    }    private void respond(ResolverQueryMsg query,                         DiscoveryQuery dq,                         Vector results,                         Vector expirations) {        if (localonly) {            return;        }        ResolverResponseMsg response;        DiscoveryResponse dresponse = new DiscoveryResponse();        // peer adv is optional, skip        dresponse.setDiscoveryType(dq.getDiscoveryType());        dresponse.setQueryAttr(dq.getAttr());        dresponse.setQueryValue(dq.getValue());        dresponse.setResponses(results);        dresponse.setExpirations(expirations);        // create a response from the query        response = query.makeResponse();        response.setCredential(credentialDoc);        response.setResponse(dresponse.toString());        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Responding to " + query.getSrc());        }        resolver.sendResponse(query.getSrc(), response);    }    /**     *  {@inheritDoc}     */    public synchronized void addDiscoveryListener(DiscoveryListener listener) {        listeners.add(listener);    }    /**     *  {@inheritDoc}     */    public synchronized boolean removeDiscoveryListener(DiscoveryListener listener) {        Iterator e = listenerTable.entrySet().iterator();        while (e.hasNext()) {            Map.Entry anEntry = (Map.Entry) e.next();            if( listener == anEntry.getValue() ) {                e.remove();            }        }        return (listeners.remove(listener));    }    /**     *  {@inheritDoc}     */    public void remotePublish(String peerid, Advertisement adv, long timeout) {        if (localonly) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("localonly, no network operations performed");            }            return;        }        int type = -1;        if (adv instanceof PeerAdvertisement) {            type = PEER;        } else            if (adv instanceof PeerGroupAdvertisement) {                type = GROUP;            } else {                type = ADV;            }        remotePublish(peerid, adv, type, timeout);    }    /**     *  remote publish the advertisement     */    private void remotePublish(String peerid, Advertisement adv, int type, long expiration) {        if (localonly) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("localonly, no network operations performed");            }            return;        }        // In case this is invoked before startApp().        if (resolver == null) {            return;        }        switch (type) {        case PEER:            if (adv instanceof PeerAdvertisement) {                break;            }            throw new IllegalArgumentException("Not a peer advertisement");        case GROUP:            if (adv instanceof PeerGroupAdvertisement) {                break;            }            throw new IllegalArgumentException("Not a peergroup advertisement");        case ADV:            break;        default:            throw new IllegalArgumentException("Unknown advertisement type");        }        Vector advert = new Vector(1);        Vector expirations = new Vector(1);        advert.add(adv.toString());        expirations.add(new Long(expiration));        DiscoveryResponseMsg dresponse = new DiscoveryResponse();        dresponse.setDiscoveryType(type);        dresponse.setResponses(advert);        dresponse.setExpirations(expirations);        ResolverResponseMsg pushRes = new ResolverResponse();        pushRes.setHandlerName(handlerName);        pushRes.setCredential(credentialDoc);        pushRes.setQueryId(0);        pushRes.setResponse(dresponse.toString());        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Remote publishing ");        }        resolver.sendResponse(peerid, pushRes);    }    /**     *  Search for a doc, that matches attr, and value     * bytes is set to true if the caller wants wire format of the     * advertisement, or set to false if caller wants Advertisement     * objects.     *     * @param  type         Discovery type PEER, GROUP, ADV     * @param  threshold    the upper limit of responses from one peer     * @param  streams      flag to indicate how the results are returned-- advs, or streams     * @param  expirations  vector containing the expirations associated with is returned     * @param  attr         attribute name to narrow disocvery to Valid values for     *      this parameter are null (don't care), or exact element name in the     *      advertisement of interest (e.g. "Name")     * @param  value        Value     * @return              vector of results either as docs, or Strings     */    private Vector search(int type,                          String attr,                          String value,                          int threshold,                          boolean bytes,                          Vector expirations) {        if( type == PEER ) {            checkUpdatePeerAdv();        }        Vector results;        if (threshold <= 0) {            throw new IllegalArgumentException("threshold must be greater than zero");        }        if (expirations != null) {            expirations.clear();        }        if (attr != null) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Searching for " + threshold + " entries of type : " + dirname[type]);            }            // a discovery query with a specific search criteria.            results = cm.search(dirname[type], attr, value, threshold, expirations);        } else {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Getting " + threshold + " entries of type : " + dirname[type]);            }            // Returning any entry that exists            results = cm.getRecords(dirname[type], threshold, null, expirations);        }        if ( results.isEmpty() || bytes) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Returning " + results.size() + " results");            }            // nothing more to do;            return results;        }        Vector advertisements = new Vector();        for (int i = 0; i < results.size(); i++) {            InputStream bis = null;            try {                bis = (InputStream) results.elementAt(i);                Advertisement adv = AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, bis);                advertisements.addElement(adv);            } catch (Exception e) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Failed building advertisment", e);                }            } finally {                if (null != bis) {                    try {                        bis.close();                    } catch (IOException ignored) {                        ;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -