📄 discoveryserviceimpl.java
字号:
} } } DiscoveryEvent newevent = new DiscoveryEvent(srcAddress, res, response.getQueryId()); DiscoveryListener dl = listenerTable.get(new Integer(response.getQueryId())); if (dl != null) { try { dl.discoveryEvent(new DiscoveryEvent(srcAddress, res, response.getQueryId())); } catch (Throwable all) { LOG.log(Level.SEVERE, "Uncaught Throwable in listener :" + Thread.currentThread().getName(), all); } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("processed a response for query #" + response.getQueryId() + " in :" + (System.currentTimeMillis() - t0)); } // are there any registered discovery listeners, // generate the event and callback. t0 = System.currentTimeMillis(); DiscoveryListener[] allListeners = listeners.toArray(new DiscoveryListener[0]); for (DiscoveryListener allListener : allListeners) { try { allListener.discoveryEvent(newevent); } catch (Throwable all) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING , "Uncaught Throwable in listener (" + allListener.getClass().getName() + ") :" + Thread.currentThread().getName() , all); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Called all listenters to query #" + response.getQueryId() + " in :" + (System.currentTimeMillis() - t0)); } } /** * {@inheritDoc} */ public int processQuery(ResolverQueryMsg query) { return processQuery(query, null); } /** * {@inheritDoc} */ public int processQuery(ResolverQueryMsg query, EndpointAddress srcAddress) { if (stopped) { return ResolverService.OK; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { if (srcAddress != null) { LOG.fine("Processing query #" + query.getQueryId() + " from:" + srcAddress); } else { LOG.fine("Processing query #" + query.getQueryId() + " from: unknown"); } } List<String> results; List<Long> expirations = new ArrayList<Long>(); DiscoveryQuery dq; long t0 = System.currentTimeMillis(); try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(query.getQuery())); dq = new DiscoveryQuery(asDoc); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Malformed query : ", e); } return ResolverService.OK; } if ((dq.getThreshold() < 0) || (dq.getDiscoveryType() < PEER) || (dq.getDiscoveryType() > ADV)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Malformed query"); } return ResolverService.OK; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Got a " + dirname[dq.getDiscoveryType()] + " query #" + query.getQueryId() + " query :" + dq.getAttr() + " = " + dq.getValue()); } /* // Get the Peer Adv from the query and publish it. PeerAdvertisement padv = dq.getPeerAdvertisement(); try { if (!(padv.getPeerID().toString()).equals(localPeerId)) { // publish others only. Since this one comes from outside, // we must not keep it beyond its expiration time. // FIXME: [jice@jxta.org 20011112] In theory there should // be an expiration time associated with it in the msg, like // all other items. publish(padv, PEER, DEFAULT_EXPIRATION, DEFAULT_EXPIRATION); } } catch (Exception e) { if (LOG.isLoggable(Level.FINE)) { LOG.fine("Bad Peer Adv in Discovery Query", e); } } */ /* * threshold==0 and type==PEER is a special case. In this case we are * responding for the purpose of providing our own adv only. */ int thresh = Math.min(dq.getThreshold(), MAX_RESPONSES); /* * threshold==0 and type==PEER is a special case. In this case we are * responding for the purpose of providing our own adv only. */ if ((dq.getDiscoveryType() == PEER) && (0 == dq.getThreshold())) { respond(query, dq, Collections.singletonList(group.getPeerAdvertisement().toString()) , Collections.singletonList(DiscoveryService.DEFAULT_EXPIRATION)); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Responding to query #" + query.getQueryId() + " in :" + (System.currentTimeMillis() - t0)); } return ResolverService.OK; } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("start local search query" + dq.getAttr() + " " + dq.getValue()); } results = search(dq.getDiscoveryType(), dq.getAttr(), dq.getValue(), thresh, true, expirations); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("start local search pruned " + results.size()); } } // We only share the advs with > 0 expiration time. Iterator<Long> eachExpiration = expirations.iterator(); Iterator eachAdv = results.iterator(); while (eachExpiration.hasNext()) { eachAdv.next(); if (eachExpiration.next() <= 0) { eachAdv.remove(); eachExpiration.remove(); } } if (!results.isEmpty()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Responding to " + dirname[dq.getDiscoveryType()] + " Query : " + dq.getAttr() + " = " + dq.getValue()); } respond(query, dq, results, expirations); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Responded to query #" + query.getQueryId() + " in :" + (System.currentTimeMillis() - t0)); } return ResolverService.OK; } else { // If this peer is a rendezvous, simply let the resolver // re-propagate the query. // If this peer is not a rendez, just discard the query. if (!group.isRendezvous()) { return ResolverService.OK; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Querying SrdiIndex query #" + query.getQueryId()); } List<PeerID> res = srdiIndex.query(dirname[dq.getDiscoveryType()], dq.getAttr(), dq.getValue(), thresh); if (!res.isEmpty()) { srdi.forwardQuery(res, query, thresh); return ResolverService.OK; } else if (query.getHopCount() == 0) { PeerID destPeer = srdi.getReplicaPeer(dirname[dq.getDiscoveryType()] + dq.getAttr() + dq.getValue()); // destPeer can be null in a small rpv (<3) if (destPeer != null) { if (!destPeer.equals(group.getPeerID())) { srdi.forwardQuery(destPeer, query); return ResolverService.OK; } else { // start the walk since this peer is this the starting peer query.incrementHopCount(); } } } } return ResolverService.Repropagate; } private void respond(ResolverQueryMsg query, DiscoveryQuery dq, List results, List<Long> expirations) { if (localonly || stopped) { return; } ResolverResponseMsg response; DiscoveryResponse dresponse = new DiscoveryResponse(); // peer adv is optional, skip dresponse.setDiscoveryType(dq.getDiscoveryType()); dresponse.setQueryAttr(dq.getAttr()); dresponse.setQueryValue(dq.getValue()); dresponse.setResponses(results); dresponse.setExpirations(expirations); // create a response from the query response = query.makeResponse(); response.setCredential(credentialDoc); response.setResponse(dresponse.toString()); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Responding to " + query.getSrcPeer()); } resolver.sendResponse(query.getSrcPeer().toString(), response); } /** * {@inheritDoc} */ public synchronized void addDiscoveryListener(DiscoveryListener listener) { listeners.add(listener); } /** * {@inheritDoc} */ public synchronized boolean removeDiscoveryListener(DiscoveryListener listener) { Iterator<Map.Entry<Integer, DiscoveryListener>> e = listenerTable.entrySet().iterator(); while (e.hasNext()) { Map.Entry<Integer, DiscoveryListener> anEntry = e.next(); if (listener == anEntry.getValue()) { e.remove(); } } return (listeners.remove(listener)); } /** * {@inheritDoc} */ public void remotePublish(String peerid, Advertisement adv, long timeout) { if (localonly || stopped) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("localonly, no network operations performed"); } return; } int type; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } remotePublish(peerid, adv, type, timeout); } /* * remote publish the advertisement */ private void remotePublish(String peerid, Advertisement adv, int type, long expiration) { if (localonly || stopped) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("localonly, no network operations performed"); } return; } // In case this is invoked before startApp(). if (resolver == null) { return; } switch (type) { case PEER: if (adv instanceof PeerAdvertisement) { break; } throw new IllegalArgumentException("Not a peer advertisement"); case GROUP: if (adv instanceof PeerGroupAdvertisement) { break; } throw new IllegalArgumentException("Not a peergroup advertisement"); case ADV: break; default: throw new IllegalArgumentException("Unknown advertisement type"); } List<String> advert = new ArrayList<String>(1); List<Long> expirations = new ArrayList<Long>(1); advert.add(adv.toString()); expirations.add(expiration); DiscoveryResponseMsg dresponse = new DiscoveryResponse(); dresponse.setDiscoveryType(type); dresponse.setResponses(advert); dresponse.setExpirations(expirations); ResolverResponseMsg pushRes = new ResolverResponse(); pushRes.setHandlerName(handlerName); pushRes.setCredential(credentialDoc); pushRes.setQueryId(REMOTE_PUBLISH_QUERYID); pushRes.setResponse(dresponse.toString()); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Remote publishing "); } resolver.sendResponse(peerid, pushRes); } /** * Search for a doc, that matches attr, and value * bytes is set to true if the caller wants wire format of the * advertisement, or set to false if caller wants Advertisement * objects. * * @param type Discovery type PEER, GROUP, ADV * @param threshold the upper limit of responses from one peer * @param bytes flag to indicate how the results are returned-- advs, or streams * @param expirations List containing the expirations associated with is returned * @param attr attribute name to narrow discovery to Valid values for * this parameter are null (don't care), or exact element name in the * advertisement of interest (e.g. "Name") * @param value Value * @return list of results either as docs, or Strings */ private List search(int type, String attr, String value, int threshold, boolean bytes, List<Long> expirations) { if (stopped) { return new ArrayList(); } if (type == PEER) { checkUpdatePeerAdv(); } List results; if (threshold <= 0) { throw new IllegalArgumentException("threshold must be greater than zero"); } if (expirations != null) { expirations.clear(); } if (attr != null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Searching for " + threshold + " entries of type : " + dirname[type]); } // a discovery query with a specific search criteria. results = cm.search(dirname[type], attr, value, threshold, expirations); } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Getting " + threshold + " entries of type : " + dirname[type]); } // Returning any entry that exists results = cm.getRecords(dirname[type], threshold, expirations); } if (results.isEmpty() || bytes) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Returning " + results.size() + " results"); } // nothing more to do; return results; } // Convert the input streams returned by the cm into Advertisements. List<Advertisement> advertisements = new ArrayList<Advertisement>(); for (int i = 0; i < results.size(); i++) { InputStream bis = null; try { bis = (InputStream) results.get(i); XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, bis); Advertisement adv = AdvertisementFactory.newAdvertisement(asDoc); advertisements.add(adv); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed building advertisment", e); } // we won't be including this advertisement so remove it's expiration. if (null != expirations) { expirations.remove(i); } } finally { if (null != bis) { try { bis.close(); } catch (IOException ignored) { // ignored } } bis = null; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -