📄 discoveryserviceimpl.java
字号:
adv = (Advertisement) en.nextElement(); exp = ((Long) exps.nextElement()).longValue(); if (exp > 0 && adv != null) { try { publish(adv, exp, exp); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Error publishing Advertisement", e); } } } } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received empty responses"); } } DiscoveryEvent newevent = new DiscoveryEvent(srcAddress, res, response.getQueryId()); DiscoveryListener dl = (DiscoveryListener) listenerTable.get(new Integer(response.getQueryId())); if (dl != null) { try { dl.discoveryEvent(new DiscoveryEvent(srcAddress, res, response.getQueryId())); } catch (Throwable all) { LOG.fatal("Uncaught Throwable in listener :" + Thread.currentThread().getName(), all); } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processed a response for query #" + response.getQueryId() + " in :" + (System.currentTimeMillis() - t0)); } // are there any registered discovery listeners, // generate the event and callback. t0 = System.currentTimeMillis(); DiscoveryListener[] allListeners = (DiscoveryListener[]) listeners.toArray(new DiscoveryListener[0]); for (int eachListener = 0; eachListener < allListeners.length; eachListener++) { try { allListeners[eachListener].discoveryEvent(newevent); } catch (Throwable all) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "Uncaught Throwable in listener (" + allListeners[eachListener].getClass().getName() + ") :" + Thread.currentThread().getName(), all); } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Called all listenters to query #" + response.getQueryId() + " in :" + (System.currentTimeMillis() - t0)); } } /** * {@inheritDoc} */ public int processQuery(ResolverQueryMsg query) { return processQuery(query, null); } /** * {@inheritDoc} */ public int processQuery(ResolverQueryMsg query, EndpointAddress srcAddress) { if (LOG.isEnabledFor(Level.DEBUG)) { if (srcAddress != null) { LOG.debug("Processing query #" + query.getQueryId()+" from:"+srcAddress); } else { LOG.debug("Processing query #" + query.getQueryId()+" from: unknown"); } } Vector results = null; Vector expirations = new Vector(); DiscoveryQuery dq; long t0 = System.currentTimeMillis(); try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(query.getQuery())); dq = new DiscoveryQuery(asDoc); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Malformed query : ", e); } return ResolverService.OK; } if ((dq.getThreshold() < 0) || (dq.getDiscoveryType() < PEER) || (dq.getDiscoveryType() > ADV)) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Malformed query"); } return ResolverService.OK; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Got a " + dirname[dq.getDiscoveryType()] + " query #" + query.getQueryId() + " query :" + dq.getAttr() + " = " + dq.getValue()); } /* // Get the Peer Adv from the query and publish it. PeerAdvertisement padv = dq.getPeerAdvertisement(); try { if (!(padv.getPeerID().toString()).equals(localPeerId)) { // publish others only. Since this one comes from outside, // we must not keep it beyond its expiration time. // FIXME: [jice@jxta.org 20011112] In theory there should // be an expiration time associated with it in the msg, like // all other items. publish(padv, PEER, DEFAULT_EXPIRATION, DEFAULT_EXPIRATION); } } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Bad Peer Adv in Discovery Query", e); } } */ /* * threshold==0 and type==PEER is a special case. In this case we are * responding for the purpose of providing our own adv only. */ int thresh = Math.min(dq.getThreshold(), MAX_RESPONSES); /* * threshold==0 and type==PEER is a special case. In this case we are * responding for the purpose of providing our own adv only. */ if ((dq.getDiscoveryType() == PEER) && (0 == dq.getThreshold())) { results = new Vector(); results.add(group.getPeerAdvertisement().toString()); expirations.add(new Long(DiscoveryService.DEFAULT_EXPIRATION)); respond(query, dq, results, expirations); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Responding to query #" + query.getQueryId() + " in :" + (System.currentTimeMillis() - t0)); } return ResolverService.OK; } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("start local search query" + dq.getAttr() + " " + dq.getValue()); } results = search(dq.getDiscoveryType(), dq.getAttr(), dq.getValue(), thresh, true, expirations); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("start local search pruned " + results.size()); } } // We only share the advs with > 0 expiration time. Iterator<Long> eachExpiration = (Iterator<Long>) expirations.iterator(); Iterator eachAdv = results.iterator(); while( eachExpiration.hasNext() ) { eachAdv.next(); if( eachExpiration.next() <= 0 ) { eachAdv.remove(); eachExpiration.remove(); } } if (!results.isEmpty()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Responding to " + dirname[dq.getDiscoveryType()] + " Query : " + dq.getAttr() + " = " + dq.getValue()); } respond(query, dq, results, expirations); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Responded to query #" + query.getQueryId() + " in :" + (System.currentTimeMillis() - t0)); } return ResolverService.OK; } else { // If this peer is a rendezvous, simply let the resolver // re-propagate the query. // If this peer is not a rendez, just discard the query. if (!group.isRendezvous()) { return ResolverService.OK; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Querying SrdiIndex query #" + query.getQueryId()); } Vector res = srdiIndex.query(dirname[dq.getDiscoveryType()], dq.getAttr(), dq.getValue(), thresh); if (res.size() > 0) { srdi.forwardQuery(res, query, thresh); return ResolverService.OK; } else if (query.getHopCount() == 0) { PeerID destPeer = srdi.getReplicaPeer(dirname[dq.getDiscoveryType()] + dq.getAttr() + dq.getValue()); // destPeer can be null in a small rpv (<3) if (destPeer != null) { if (!destPeer.equals(group.getPeerID())) { srdi.forwardQuery(destPeer, query); return ResolverService.OK; } else { // start the walk since this peer is this the starting peer query.incrementHopCount(); } } } } return ResolverService.Repropagate; } private void respond(ResolverQueryMsg query, DiscoveryQuery dq, Vector results, Vector expirations) { if (localonly) { return; } ResolverResponseMsg response; DiscoveryResponse dresponse = new DiscoveryResponse(); // peer adv is optional, skip dresponse.setDiscoveryType(dq.getDiscoveryType()); dresponse.setQueryAttr(dq.getAttr()); dresponse.setQueryValue(dq.getValue()); dresponse.setResponses(results); dresponse.setExpirations(expirations); // create a response from the query response = query.makeResponse(); response.setCredential(credentialDoc); response.setResponse(dresponse.toString()); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Responding to " + query.getSrc()); } resolver.sendResponse(query.getSrc(), response); } /** * {@inheritDoc} */ public synchronized void addDiscoveryListener(DiscoveryListener listener) { listeners.add(listener); } /** * {@inheritDoc} */ public synchronized boolean removeDiscoveryListener(DiscoveryListener listener) { Iterator e = listenerTable.entrySet().iterator(); while (e.hasNext()) { Map.Entry anEntry = (Map.Entry) e.next(); if( listener == anEntry.getValue() ) { e.remove(); } } return (listeners.remove(listener)); } /** * {@inheritDoc} */ public void remotePublish(String peerid, Advertisement adv, long timeout) { if (localonly) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("localonly, no network operations performed"); } return; } int type = -1; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } remotePublish(peerid, adv, type, timeout); } /** * remote publish the advertisement */ private void remotePublish(String peerid, Advertisement adv, int type, long expiration) { if (localonly) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("localonly, no network operations performed"); } return; } // In case this is invoked before startApp(). if (resolver == null) { return; } switch (type) { case PEER: if (adv instanceof PeerAdvertisement) { break; } throw new IllegalArgumentException("Not a peer advertisement"); case GROUP: if (adv instanceof PeerGroupAdvertisement) { break; } throw new IllegalArgumentException("Not a peergroup advertisement"); case ADV: break; default: throw new IllegalArgumentException("Unknown advertisement type"); } Vector advert = new Vector(1); Vector expirations = new Vector(1); advert.add(adv.toString()); expirations.add(new Long(expiration)); DiscoveryResponseMsg dresponse = new DiscoveryResponse(); dresponse.setDiscoveryType(type); dresponse.setResponses(advert); dresponse.setExpirations(expirations); ResolverResponseMsg pushRes = new ResolverResponse(); pushRes.setHandlerName(handlerName); pushRes.setCredential(credentialDoc); pushRes.setQueryId(0); pushRes.setResponse(dresponse.toString()); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Remote publishing "); } resolver.sendResponse(peerid, pushRes); } /** * Search for a doc, that matches attr, and value * bytes is set to true if the caller wants wire format of the * advertisement, or set to false if caller wants Advertisement * objects. * * @param type Discovery type PEER, GROUP, ADV * @param threshold the upper limit of responses from one peer * @param streams flag to indicate how the results are returned-- advs, or streams * @param expirations vector containing the expirations associated with is returned * @param attr attribute name to narrow disocvery to Valid values for * this parameter are null (don't care), or exact element name in the * advertisement of interest (e.g. "Name") * @param value Value * @return vector of results either as docs, or Strings */ private Vector search(int type, String attr, String value, int threshold, boolean bytes, Vector expirations) { if( type == PEER ) { checkUpdatePeerAdv(); } Vector results; if (threshold <= 0) { throw new IllegalArgumentException("threshold must be greater than zero"); } if (expirations != null) { expirations.clear(); } if (attr != null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Searching for " + threshold + " entries of type : " + dirname[type]); } // a discovery query with a specific search criteria. results = cm.search(dirname[type], attr, value, threshold, expirations); } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Getting " + threshold + " entries of type : " + dirname[type]); } // Returning any entry that exists results = cm.getRecords(dirname[type], threshold, null, expirations); } if ( results.isEmpty() || bytes) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Returning " + results.size() + " results"); } // nothing more to do; return results; } Vector advertisements = new Vector(); for (int i = 0; i < results.size(); i++) { InputStream bis = null; try { bis = (InputStream) results.elementAt(i); Advertisement adv = AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, bis); advertisements.addElement(adv); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed building advertisment", e); } } finally { if (null != bis) { try { bis.close(); } catch (IOException ignored) { ;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -