📄 discoveryserviceimpl.java
字号:
} return search(type, attribute, value, Integer.MAX_VALUE, false, null).elements(); } /** * {@inheritDoc} */ public void init(PeerGroup pg, ID assignedID, Advertisement impl) throws PeerGroupException { group = pg; handlerName = assignedID.toString(); implAdvertisement = (ModuleImplAdvertisement) impl; localPeerId = group.getPeerID().toString(); ConfigParams confAdv = (ConfigParams) pg.getConfigAdvertisement(); // Get the config. If we do not have a config, we're done; we just keep // the defaults (edge peer/no auto-rdv) if (confAdv != null) { Advertisement adv = null; try { XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(assignedID); if (null != configDoc) { adv = AdvertisementFactory.newAdvertisement(configDoc); } } catch (NoSuchElementException failed) { ; } if (adv instanceof DiscoveryConfigAdv) { DiscoveryConfigAdv discoConfigAdv = (DiscoveryConfigAdv) adv; alwaysUseReplicaPeer = discoConfigAdv.getForwardAlwaysReplica(); localonly |= discoConfigAdv.getLocalOnly(); if (LOG.isEnabledFor(Level.DEBUG)) { if (localonly) { LOG.debug("localonly set to true via service parameters"); } if (alwaysUseReplicaPeer) { LOG.debug("alwaysUseReplicaPeer set to true via service parameters"); } } } } cm = ((StdPeerGroup) group).getCacheManager(); cm.setTrackDeltas(!localonly); // Initialize the peer adv tracking. checkUpdatePeerAdv(); if (LOG.isEnabledFor(Level.INFO)) { StringBuffer configInfo = new StringBuffer("Configuring Discovery Service : " + assignedID); if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: " + implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : " + implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : " + implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : " + implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tGroup : " + group.getPeerGroupName()); configInfo.append("\n\t\tGroup ID : " + group.getPeerGroupID()); configInfo.append("\n\t\tPeer ID : " + group.getPeerID()); configInfo.append("\n\tConfiguration :"); configInfo.append("\n\t\tLocal Only : " + localonly); configInfo.append("\n\t\tAlways Use ReplicaPeer : " + alwaysUseReplicaPeer); LOG.info(configInfo); } } /** * {@inheritDoc} */ public int startApp(String[] arg) { // Now we know that the resolver is going to be there. // The cm needs the resolver. The code is arranged so that // until the resolver and the cm are created, we just pretend // to be working. We have no requirement to be operational before // startApp() is called, but we must tolerate our public methods // being invoked. The reason for it is that services are registered // upon return from init() so that other services startApp() methods // can find them. (all startApp()s are called after all init()s - with // a few exceptions). resolver = group.getResolverService(); if (null == resolver) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Stalled until there is a resolver service"); } return Module.START_AGAIN_STALLED; } membership = group.getMembershipService(); if (null == membership) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Stalled until there is a membership service"); } return Module.START_AGAIN_STALLED; } rendezvous = group.getRendezVousService(); if (null == rendezvous) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Stalled until there is a rendezvous service"); } return Module.START_AGAIN_STALLED; } // local only discovery if (!localonly) { resolver.registerHandler(handlerName, this); } // Get the initial credential doc synchronized (this) { membershipCredListener = new CredentialListener(); membership.addPropertyChangeListener("defaultCredential", membershipCredListener); try { membershipCredListener.propertyChange( new PropertyChangeEvent( membership, "defaultCredential", null, membership.getDefaultCredential() ) ); } catch (Exception all) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not get credential", all); } } } if (rendezvous.isRendezVous()) { beRendezvous(); } else { beEdge(); } rendezvous.addListener(this); started = true; if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Discovery service started"); } return 0; } /** * {@inheritDoc} * * <p/>Detach from the resolver */ public void stopApp() { boolean failed = false; membership.removePropertyChangeListener("defaultCredential", membershipCredListener); membershipCredListener = null; credential = null; credentialDoc = null; rendezvous.removeListener(this); if (resolver.unregisterHandler(handlerName) == null) { failed = true; } if (rendezvous.isRendezVous()) { if (resolver.unregisterSrdiHandler(handlerName) == null) { failed = true; } } if (LOG.isEnabledFor(Level.WARN) && failed) { LOG.warn("failed to unregister discovery from resolver."); } // stop the DiscoverySrdiThread if (srdiThread != null) { srdi.stop(); srdi = null; } // Reset values in order to avoid cross-reference problems with GC resolver = null; group = null; membership = null; srdiIndex = null; srdiThread = null; rendezvous = null; if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Discovery service stopped."); } } /** * {@inheritDoc} */ public void flushAdvertisements(String id, int type) throws IOException { if ( (type >= PEER) && (type <= ADV) ) { if( null != id ) { ID advID = ID.create(URI.create(id)); String advName = advID.getUniqueValue().toString(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("flushing adv " + advName + " of type " + dirname[type]); } cm.remove(dirname[type], advName); } else { // XXX bondolo 20050902 For historical purposes we ignore null if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Flush request by type IGNORED. You must delete advertisements individually."); } } } else { throw new IllegalArgumentException( "Invalid Advertisement type." ); } } /** * {@inheritDoc} */ public void flushAdvertisement(Advertisement adv) throws IOException { int type = 0; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } ID id = adv.getID(); String advName = null; if (id != null && !id.equals(ID.nullID)) { advName = id.getUniqueValue().toString(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Flushing adv " + advName + " of type " + dirname[type]); } } else { XMLDocument doc; try { doc = (XMLDocument) adv.getDocument(MimeMediaType.XMLUTF8); } catch (Exception everything) { IOException failure = new IOException("Failure removing Advertisement"); failure.initCause(everything); throw failure; } advName = Cm.createTmpName(doc); } if (advName != null) { cm.remove(dirname[type], advName); } } /** * {@inheritDoc} */ public void publish(Advertisement adv) throws IOException { publish(adv, DiscoveryService.DEFAULT_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION); } /** * {@inheritDoc} */ public void publish(Advertisement adv, long lifetime, long expiration) throws IOException { ID advID = null; String advName = null; int type = -1; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } advID = adv.getID(); // if we dont have a unique id for the adv, use the hash method if ((null == advID) || advID.equals(ID.nullID)) { XMLDocument doc; try { doc = (XMLDocument) adv.getDocument(MimeMediaType.XMLUTF8); } catch (Exception everything) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed to generated document from advertisement", everything); } IOException failure = new IOException("Failed to generate document from advertisement"); failure.initCause(everything); throw failure; } try { advName = Cm.createTmpName(doc); } catch (IllegalStateException ise) { IOException failure = new IOException("Failed to generate tempname from advertisement"); failure.initCause(ise); throw failure; } } else { advName = advID.getUniqueValue().toString(); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Publishing a " + adv.getAdvType() + " as " + dirname[type] + " / " + advName + "\n\texpiration : " + expiration + "\tlifetime :" + lifetime); } // save it cm.save(dirname[type], advName, adv, lifetime, expiration); } /** * {@inheritDoc} */ public void remotePublish(Advertisement adv) { remotePublish(null, adv, DiscoveryService.DEFAULT_EXPIRATION); } /** * {@inheritDoc} */ public void remotePublish(Advertisement adv, long expiration) { remotePublish(null, adv, expiration); } /** * {@inheritDoc} */ public void remotePublish(String peerid, Advertisement adv) { remotePublish(peerid, adv, DiscoveryService.DEFAULT_EXPIRATION); } /** * {@inheritDoc} */ public void processResponse(ResolverResponseMsg response) { processResponse(response, null); } /** * {@inheritDoc} */ public void processResponse(ResolverResponseMsg response, EndpointAddress srcAddress) { long t0 = System.currentTimeMillis(); DiscoveryResponse res; try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(response.getResponse())); res = new DiscoveryResponse(asDoc); } catch (Exception e) { // we don't understand this msg, let's skip it if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed to Read Deiscovery Response", e); } return; } /* PeerAdvertisement padv = res.getPeerAdvertisement(); if (padv == null) return; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Got a " + dirname[res.getDiscoveryType()] + " from "+padv.getName()+ " response : " + res.getQueryAttr() + " = " + res.getQueryValue()); } try { // The sender does not put an expiration on that one, but // we do not want to keep it around for more than the // default duration. It may get updated or become invalid. publish(padv, PEER, DEFAULT_EXPIRATION, DEFAULT_EXPIRATION); } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug(e, e); } return; } */ Advertisement adv; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Processing responses for query #" + response.getQueryId()); } Enumeration en = res.getAdvertisements(); Enumeration exps = res.getExpirations(); long exp; if (en != null) { while (en.hasMoreElements()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -