📄 discoveryserviceimpl.java
字号:
// Get the config. If we do not have a config, we're done; we just keep // the defaults (edge peer/no auto-rdv) if (confAdv != null) { Advertisement adv = null; try { XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(assignedID); if (null != configDoc) { adv = AdvertisementFactory.newAdvertisement(configDoc); } } catch (NoSuchElementException failed) {// ignored } if (adv instanceof DiscoveryConfigAdv) { DiscoveryConfigAdv discoConfigAdv = (DiscoveryConfigAdv) adv; alwaysUseReplicaPeer = discoConfigAdv.getForwardAlwaysReplica(); localonly |= discoConfigAdv.getLocalOnly(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { if (localonly) { LOG.fine("localonly set to true via service parameters"); } if (alwaysUseReplicaPeer) { LOG.fine("alwaysUseReplicaPeer set to true via service parameters"); } } } } cm = ((StdPeerGroup) group).getCacheManager(); cm.setTrackDeltas(!localonly); // Initialize the peer adv tracking. checkUpdatePeerAdv(); if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) { StringBuilder configInfo = new StringBuilder("Configuring Discovery Service : " + assignedID); if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tGroup : ").append(group.getPeerGroupName()); configInfo.append("\n\t\tGroup ID : ").append(group.getPeerGroupID()); configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID()); configInfo.append("\n\tConfiguration :"); configInfo.append("\n\t\tLocal Only : ").append(localonly); configInfo.append("\n\t\tAlways Use ReplicaPeer : ").append(alwaysUseReplicaPeer); LOG.config(configInfo.toString()); } } /** * {@inheritDoc} */ public int startApp(String[] arg) { // Now we know that the resolver is going to be there. // The cm needs the resolver. The code is arranged so that // until the resolver and the cm are created, we just pretend // to be working. We have no requirement to be operational before // startApp() is called, but we must tolerate our public methods // being invoked. The reason for it is that services are registered // upon return from init() so that other services startApp() methods // can find them. (all startApp()s are called after all init()s - with // a few exceptions). resolver = group.getResolverService(); if (null == resolver) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is a resolver service"); } return Module.START_AGAIN_STALLED; } membership = group.getMembershipService(); if (null == membership) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is a membership service"); } return Module.START_AGAIN_STALLED; } rendezvous = group.getRendezVousService(); if (null == rendezvous) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is a rendezvous service"); } return Module.START_AGAIN_STALLED; } // local only discovery if (!localonly) { resolver.registerHandler(handlerName, this); } // Get the initial credential doc synchronized (this) { membershipCredListener = new CredentialListener(); membership.addPropertyChangeListener("defaultCredential", membershipCredListener); try { membershipCredListener.propertyChange( new PropertyChangeEvent(membership, "defaultCredential", null, membership.getDefaultCredential())); } catch (Exception all) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not get credential", all); } } } if (rendezvous.isRendezVous()) { beRendezvous(); } else { beEdge(); } rendezvous.addListener(this); stopped = false; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Discovery service started"); } return Module.START_OK; } /** * {@inheritDoc} * <p/> * <p/>Detach from the resolver */ public void stopApp() { stopped = true; boolean failed = false; membership.removePropertyChangeListener("defaultCredential", membershipCredListener); membershipCredListener = null; credential = null; credentialDoc = null; rendezvous.removeListener(this); if (resolver.unregisterHandler(handlerName) == null) { failed = true; } if (rendezvous.isRendezVous()) { if (resolver.unregisterSrdiHandler(handlerName) == null) { failed = true; } } if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING) && failed) { LOG.warning("failed to unregister discovery from resolver."); } // stop the DiscoverySrdiThread if (srdiThread != null) { srdi.stop(); srdi = null; } // Reset values in order to avoid cross-reference problems with GC resolver = null; group = null; membership = null; srdiIndex = null; srdiThread = null; rendezvous = null; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Discovery service stopped."); } } /** * {@inheritDoc} */ public void flushAdvertisements(String id, int type) throws IOException { if (stopped) { return; } if ((type >= PEER) && (type <= ADV)) { if (null != id) { ID advID = ID.create(URI.create(id)); String advName = advID.getUniqueValue().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("flushing adv " + advName + " of type " + dirname[type]); } cm.remove(dirname[type], advName); } else { // XXX bondolo 20050902 For historical purposes we ignore null if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Flush request by type IGNORED. You must delete advertisements individually."); } } } else { throw new IllegalArgumentException("Invalid Advertisement type."); } } /** * {@inheritDoc} */ public void flushAdvertisement(Advertisement adv) throws IOException { if (stopped) { return; } int type; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } ID id = adv.getID(); String advName; if (id != null && !id.equals(ID.nullID)) { advName = id.getUniqueValue().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Flushing adv " + advName + " of type " + dirname[type]); } } else { XMLDocument doc; try { doc = (XMLDocument) adv.getDocument(MimeMediaType.XMLUTF8); } catch (Exception everything) { IOException failure = new IOException("Failure removing Advertisement"); failure.initCause(everything); throw failure; } advName = Cm.createTmpName(doc); } if (advName != null) { cm.remove(dirname[type], advName); } } /** * {@inheritDoc} */ public void publish(Advertisement adv) throws IOException { publish(adv, DiscoveryService.DEFAULT_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION); } /** * {@inheritDoc} */ public void publish(Advertisement adv, long lifetime, long expiration) throws IOException { if (stopped) { return; } ID advID; String advName; int type; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } advID = adv.getID(); // if we dont have a unique id for the adv, use the hash method if ((null == advID) || advID.equals(ID.nullID)) { XMLDocument doc; try { doc = (XMLDocument) adv.getDocument(MimeMediaType.XMLUTF8); } catch (Exception everything) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed to generated document from advertisement", everything); } IOException failure = new IOException("Failed to generate document from advertisement"); failure.initCause(everything); throw failure; } try { advName = Cm.createTmpName(doc); } catch (IllegalStateException ise) { IOException failure = new IOException("Failed to generate tempname from advertisement"); failure.initCause(ise); throw failure; } } else { advName = advID.getUniqueValue().toString(); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Publishing a " + adv.getAdvType() + " as " + dirname[type] + " / " + advName + "\n\texpiration : " + expiration + "\tlifetime :" + lifetime); } // save it cm.save(dirname[type], advName, adv, lifetime, expiration); } /** * {@inheritDoc} */ public void remotePublish(Advertisement adv) { remotePublish(null, adv, DiscoveryService.DEFAULT_EXPIRATION); } /** * {@inheritDoc} */ public void remotePublish(Advertisement adv, long expiration) { remotePublish(null, adv, expiration); } /** * {@inheritDoc} */ public void remotePublish(String peerid, Advertisement adv) { remotePublish(peerid, adv, DiscoveryService.DEFAULT_EXPIRATION); } /** * {@inheritDoc} */ public void processResponse(ResolverResponseMsg response) { processResponse(response, null); } /** * {@inheritDoc} */ public void processResponse(ResolverResponseMsg response, EndpointAddress srcAddress) { if (stopped) { return; } long t0 = System.currentTimeMillis(); DiscoveryResponse res; try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8 , new StringReader(response.getResponse())); res = new DiscoveryResponse(asDoc); } catch (Exception e) { // we don't understand this msg, let's skip it if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed to Read Discovery Response", e); } return; } /* PeerAdvertisement padv = res.getPeerAdvertisement(); if (padv == null) return; if (LOG.isLoggable(Level.FINE)) { LOG.fine("Got a " + dirname[res.getDiscoveryType()] + " from "+padv.getName()+ " response : " + res.getQueryAttr() + " = " + res.getQueryValue()); } try { // The sender does not put an expiration on that one, but // we do not want to keep it around for more than the // default duration. It may get updated or become invalid. publish(padv, PEER, DEFAULT_EXPIRATION, DEFAULT_EXPIRATION); } catch (Exception e) { if (LOG.isLoggable(Level.FINE)) { LOG.fine(e, e); } return; } */ Advertisement adv; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Processing responses for query #" + response.getQueryId()); } Enumeration<Advertisement> en = res.getAdvertisements(); Enumeration<Long> exps = res.getExpirations(); while (en.hasMoreElements()) { adv = en.nextElement(); long exp = exps.nextElement(); if (exp > 0 && adv != null) { try { publish(adv, exp, exp); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Error publishing Advertisement", e); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -