📄 discoveryserviceimpl.java
字号:
} } bis = null; } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Returning " + advertisements.size() + " advertisements"); } return advertisements; } /** * {@inheritDoc} */ public long getAdvExpirationTime(ID id, int type) { String advName = null; if (id != null && !id.equals(ID.nullID)) { advName = id.getUniqueValue().toString(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Getting expiration time of " + advName + " of type " + dirname[type]); } } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("invalid attempt to get advertisement expiration time of NullID"); } return -1; } return cm.getExpirationtime(dirname[type], advName); } /** * {@inheritDoc} */ public long getAdvLifeTime(ID id, int type) { if (id == null || id.equals(ID.nullID)) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("invalid attempt to get advertisement lifetime of a NullID"); } return -1; } String advName = id.getUniqueValue().toString(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Getting lifetime of " + advName + " of type " + dirname[type]); } return cm.getLifetime(dirname[type], advName); } /** * {@inheritDoc} */ public long getAdvExpirationTime(Advertisement adv) { int type = 0; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } String advName = null; ID id = adv.getID(); if (id != null && !id.equals(ID.nullID)) { advName = id.getUniqueValue().toString(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("attempting to getAdvExpirationTime on " + advName + " of type " + dirname[type]); } } else { XMLDocument doc; try { doc = (XMLDocument) adv.getDocument(MimeMediaType.XMLUTF8); } catch (Exception everything) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed to get document", everything); } return -1; } advName = Cm.createTmpName(doc); } return cm.getExpirationtime(dirname[type], advName); } /** * {@inheritDoc} */ public long getAdvLifeTime(Advertisement adv) { int type = 0; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } ID id = adv.getID(); String advName = null; if (id != null && !id.equals(ID.nullID)) { advName = id.getUniqueValue().toString(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("attempting to getAdvLifeTime " + advName + " of type " + dirname[type]); } } else { XMLDocument doc; try { doc = (XMLDocument) adv.getDocument(MimeMediaType.XMLUTF8); } catch (Exception everything) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed to get document", everything); } return -1; } advName = Cm.createTmpName(doc); } return cm.getLifetime(dirname[type], advName); } /** * {@inheritDoc} */ public boolean processSrdi(ResolverSrdiMsg message) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupID() + "] Received an SRDI messsage"); } SrdiMessage srdiMsg; try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload())); srdiMsg = new SrdiMessageImpl(asDoc); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed parsing srdi message", e); } return false; } PeerID pid = srdiMsg.getPeerID(); Iterator eachEntry = srdiMsg.getEntries().iterator(); while (eachEntry.hasNext()) { SrdiMessage.Entry entry = (SrdiMessage.Entry) eachEntry.next(); srdiIndex.add(srdiMsg.getPrimaryKey(), entry.key, entry.value, pid, entry.expiration); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Primary Key [" + srdiMsg.getPrimaryKey() + "] key [" + entry.key + "] value [" + entry.value + "] exp ["+ entry.expiration + "]"); } } srdi.replicateEntries(srdiMsg); return true; } /** * {@inheritDoc} */ public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) { if (srdiIndex != null) { srdiIndex.remove(peerid); } } /** * {@inheritDoc} */ public void pushEntries(boolean all) { pushSrdi(null, PEER, all); pushSrdi(null, GROUP, all); pushSrdi(null, ADV, all); } /** * push srdi entries * *@param all if true push all entries, otherwise just deltas */ protected void pushSrdi(ID peer, int type, boolean all) { List entries; if (all) { entries = cm.getEntries(dirname[type], true); } else { entries = cm.getDeltas(dirname[type]); } if (!entries.isEmpty()) { SrdiMessage srdiMsg; try { srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // ttl of 1, ensure it is replicated dirname[type], entries); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Pushing " + entries.size() + (all ? " entries" : " deltas") + " of type " + dirname[type]); } srdi.pushSrdi(peer, srdiMsg); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception pushing SRDI Entries", e); } } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No" + (all ? " entries" : " deltas") + " of type " + dirname[type] + " to push"); } } } /** * {@inheritDoc} */ public synchronized void rendezvousEvent(RendezvousEvent event) { int theEventType = event.getType(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupName() + "] Processing " + event); } switch (theEventType) { case RendezvousEvent.RDVCONNECT: case RendezvousEvent.RDVRECONNECT: // start tracking deltas cm.setTrackDeltas(true); break; case RendezvousEvent.CLIENTCONNECT: case RendezvousEvent.CLIENTRECONNECT: break; case RendezvousEvent.RDVFAILED: case RendezvousEvent.RDVDISCONNECT: // stop tracking deltas until we connect again cm.setTrackDeltas(false); break; case RendezvousEvent.CLIENTFAILED: case RendezvousEvent.CLIENTDISCONNECT: break; case RendezvousEvent.BECAMERDV: beRendezvous(); break; case RendezvousEvent.BECAMEEDGE: beEdge(); break; default: if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("[" + group.getPeerGroupName() + "] Unexpected RDV event : " + event); } break; } } /** * this used internally to insure we use a locally (and the current session) * unique query id * * @return next query id to use */ private synchronized static int nextQid() { return qid++; } /** * Checks to see if the local peer advertisement has been updated and if * it has then republish it to the CM. */ private void checkUpdatePeerAdv() { PeerAdvertisement newPadv = group.getPeerAdvertisement(); int newModCount = newPadv.getModCount(); boolean updated = false; synchronized(checkPeerAdvLock) { if ((lastPeerAdv != newPadv) || (lastModCount < newModCount) ) { lastPeerAdv = newPadv; lastModCount = newModCount; updated = true; } if( updated ) { // Publish the local Peer Advertisement try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("publishing local advertisement"); } // This is our own; we can publish it for a long time in our cache publish(newPadv, INFINITE_LIFETIME, DEFAULT_EXPIRATION); } catch (Exception ignoring) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not publish local peer advertisement: ", ignoring); } } } } } /** * Change the behavior to be an rendezvous Peer Discovery Service. * If the Service was acting as an Edge peer, cleanup. */ private synchronized void beRendezvous() { if (isRdv && (srdi != null || srdiIndex != null)) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Already a rendezvous -- No Switch is needed"); } return; } isRdv = true; // rdv peers do not need to track deltas cm.setTrackDeltas(false); if (srdiIndex == null) { srdiIndex = new SrdiIndex(group, srdiIndexerFileName); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("srdiIndex created"); } } // Kill SRDI, create a new one. if (srdi != null) { srdi.stop(); if (srdiThread != null) { srdiThread = null; } srdi = null; } if (!localonly) { srdi = new Srdi(group, handlerName, this, srdiIndex, initialDelay, runInterval); resolver.registerSrdiHandler(handlerName, this); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("srdi created, and registered as an srdi handler "); } } if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Switched to Rendezvous peer role."); } } /** * Change the behavior to be an Edge Peer Discovery Service. * If the Service was acting as a Rendezvous, cleanup. */ private synchronized void beEdge() { // make sure we have been here before if (!isRdv && srdiThread != null) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Already an Edge peer -- No Switch is needed."); } return; } isRdv = false; if (rendezvous.getConnectedRendezVous().hasMoreElements()) { // if we have a rendezvous connection track deltas, otherwise wait // for a connect event to set this option cm.setTrackDeltas(true); } if (srdiIndex != null) { srdiIndex.stop(); srdiIndex = null; resolver.unregisterSrdiHandler(handlerName); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("stopped cache and unregistered from resolver"); } } // Kill SRDI if (srdi != null) { srdi.stop(); if (srdiThread != null) { srdiThread = null; } srdi = null; } if (!localonly) { // Create a new SRDI srdi = new Srdi(group, handlerName, this, null, initialDelay, runInterval); // only edge peers distribute srdi srdiThread = new Thread(group.getHomeThreadGroup(), srdi, "Discovery Srdi Thread"); srdiThread.setDaemon(true); srdiThread.start(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Started SRDIThread"); } } if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Switched to a Edge peer role."); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -