📄 discoveryserviceimpl.java
字号:
} if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Returning " + advertisements.size() + " advertisements"); } return advertisements; } /** * {@inheritDoc} */ public long getAdvExpirationTime(ID id, int type) { if (stopped) { return -1; } String advName; if (id != null && !id.equals(ID.nullID)) { advName = id.getUniqueValue().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Getting expiration time of " + advName + " of type " + dirname[type]); } } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("invalid attempt to get advertisement expiration time of NullID"); } return -1; } return cm.getExpirationtime(dirname[type], advName); } /** * {@inheritDoc} */ public long getAdvLifeTime(ID id, int type) { if (id == null || id.equals(ID.nullID) || stopped) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("invalid attempt to get advertisement lifetime of a NullID"); } return -1; } String advName = id.getUniqueValue().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Getting lifetime of " + advName + " of type " + dirname[type]); } return cm.getLifetime(dirname[type], advName); } /** * {@inheritDoc} */ public long getAdvExpirationTime(Advertisement adv) { if (stopped) { return -1; } int type; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } String advName; ID id = adv.getID(); if (id != null && !id.equals(ID.nullID)) { advName = id.getUniqueValue().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("attempting to getAdvExpirationTime on " + advName + " of type " + dirname[type]); } } else { XMLDocument doc; try { doc = (XMLDocument) adv.getDocument(MimeMediaType.XMLUTF8); } catch (Exception everything) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed to get document", everything); } return -1; } advName = Cm.createTmpName(doc); } return cm.getExpirationtime(dirname[type], advName); } /** * {@inheritDoc} */ public long getAdvLifeTime(Advertisement adv) { if (stopped) { return -1; } int type; if (adv instanceof PeerAdvertisement) { type = PEER; } else if (adv instanceof PeerGroupAdvertisement) { type = GROUP; } else { type = ADV; } ID id = adv.getID(); String advName; if (id != null && !id.equals(ID.nullID)) { advName = id.getUniqueValue().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("attempting to getAdvLifeTime " + advName + " of type " + dirname[type]); } } else { XMLDocument doc; try { doc = (XMLDocument) adv.getDocument(MimeMediaType.XMLUTF8); } catch (Exception everything) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed to get document", everything); } return -1; } advName = Cm.createTmpName(doc); } return cm.getLifetime(dirname[type], advName); } /** * {@inheritDoc} */ public boolean processSrdi(ResolverSrdiMsg message) { if (stopped) { return true; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("[" + group.getPeerGroupID() + "] Received an SRDI messsage"); } SrdiMessage srdiMsg; try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload())); srdiMsg = new SrdiMessageImpl(asDoc); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed parsing srdi message", e); } return false; } PeerID pid = srdiMsg.getPeerID(); for (Object o : srdiMsg.getEntries()) { SrdiMessage.Entry entry = (SrdiMessage.Entry) o; srdiIndex.add(srdiMsg.getPrimaryKey(), entry.key, entry.value, pid, entry.expiration); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Primary Key [" + srdiMsg.getPrimaryKey() + "] key [" + entry.key + "] value [" + entry.value + "] exp [" + entry.expiration + "]"); } } srdi.replicateEntries(srdiMsg); return true; } /** * {@inheritDoc} */ public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) { if (srdiIndex != null) { srdiIndex.remove(peerid); } } /** * {@inheritDoc} */ public void pushEntries(boolean all) { pushSrdi(null, PEER, all); pushSrdi(null, GROUP, all); pushSrdi(null, ADV, all); } /** * push srdi entries * * @param all if true push all entries, otherwise just deltas * @param peer peer id * @param type if true sends all entries */ protected void pushSrdi(ID peer, int type, boolean all) { if (stopped) { return; } List<SrdiMessage.Entry> entries; if (all) { entries = cm.getEntries(dirname[type], true); } else { entries = cm.getDeltas(dirname[type]); } if (!entries.isEmpty()) { SrdiMessage srdiMsg; try { srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // ttl of 1, ensure it is replicated dirname[type], entries); if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("Pushing " + entries.size() + (all ? " entries" : " deltas") + " of type " + dirname[type]); } srdi.pushSrdi(peer, srdiMsg); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception pushing SRDI Entries", e); } } } else { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("No" + (all ? " entries" : " deltas") + " of type " + dirname[type] + " to push"); } } } /** * {@inheritDoc} */ public synchronized void rendezvousEvent(RendezvousEvent event) { int theEventType = event.getType(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("[" + group.getPeerGroupName() + "] Processing " + event); } switch (theEventType) { case RendezvousEvent.RDVCONNECT: case RendezvousEvent.RDVRECONNECT: // start tracking deltas cm.setTrackDeltas(true); break; case RendezvousEvent.CLIENTCONNECT: case RendezvousEvent.CLIENTRECONNECT: break; case RendezvousEvent.RDVFAILED: case RendezvousEvent.RDVDISCONNECT: // stop tracking deltas until we connect again cm.setTrackDeltas(false); break; case RendezvousEvent.CLIENTFAILED: case RendezvousEvent.CLIENTDISCONNECT: break; case RendezvousEvent.BECAMERDV: beRendezvous(); break; case RendezvousEvent.BECAMEEDGE: beEdge(); break; default: if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning(MessageFormat.format("[{0}] Unexpected RDV event : {1}", group.getPeerGroupName(), event)); } break; } } /** * Checks to see if the local peer advertisement has been updated and if * it has then republish it to the CM. */ private void checkUpdatePeerAdv() { PeerAdvertisement newPadv = group.getPeerAdvertisement(); int newModCount = newPadv.getModCount(); boolean updated = false; synchronized (checkPeerAdvLock) { if ((lastPeerAdv != newPadv) || (lastModCount < newModCount)) { lastPeerAdv = newPadv; lastModCount = newModCount; updated = true; } if (updated) { // Publish the local Peer Advertisement try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("publishing local advertisement"); } // This is our own; we can publish it for a long time in our cache publish(newPadv, INFINITE_LIFETIME, DEFAULT_EXPIRATION); } catch (Exception ignoring) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not publish local peer advertisement: ", ignoring); } } } } } /** * Change the behavior to be an rendezvous Peer Discovery Service. * If the Service was acting as an Edge peer, cleanup. */ private synchronized void beRendezvous() { if (isRdv && (srdi != null || srdiIndex != null)) { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Already a rendezvous -- No Switch is needed"); } return; } isRdv = true; // rdv peers do not need to track deltas cm.setTrackDeltas(false); if (srdiIndex == null) { srdiIndex = new SrdiIndex(group, srdiIndexerFileName); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("srdiIndex created"); } } // Kill SRDI, create a new one. if (srdi != null) { srdi.stop(); if (srdiThread != null) { srdiThread = null; } srdi = null; } if (!localonly) { srdi = new Srdi(group, handlerName, this, srdiIndex, initialDelay, runInterval); resolver.registerSrdiHandler(handlerName, this); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("srdi created, and registered as an srdi handler "); } } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Switched to Rendezvous peer role."); } } /** * Change the behavior to be an Edge Peer Discovery Service. * If the Service was acting as a Rendezvous, cleanup. */ private synchronized void beEdge() { // make sure we have been here before if (!isRdv && srdiThread != null) { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Already an Edge peer -- No Switch is needed."); } return; } isRdv = false; if (!rendezvous.getConnectedPeerIDs().isEmpty()) { // if we have a rendezvous connection track deltas, otherwise wait // for a connect event to set this option cm.setTrackDeltas(true); } if (srdiIndex != null) { srdiIndex.stop(); srdiIndex = null; resolver.unregisterSrdiHandler(handlerName); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("stopped cache and unregistered from resolver"); } } // Kill SRDI if (srdi != null) { srdi.stop(); if (srdiThread != null) { srdiThread = null; } srdi = null; } if (!localonly) { // Create a new SRDI srdi = new Srdi(group, handlerName, this, null, initialDelay, runInterval); // only edge peers distribute srdi srdiThread = new Thread(group.getHomeThreadGroup(), srdi, "Discovery Srdi Thread"); srdiThread.setDaemon(true); srdiThread.start(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Started SRDIThread"); } } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Switched to a Edge peer role."); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -