📄 peerview.java
字号:
* * @param seed the URI of the seed rendezvous. **/ public void addSeed(URI seed) { permanentSeedHosts.add(seed); activeSeedHosts.add(seed); } /** * Probe the specified peer immediately. * * <p/> Note: If "useOnlySeeds" is in effect and the peer is not a seed, any response to this probe will be ignored. **/ public boolean probeAddress(EndpointAddress address, Object hint) { PeerViewElement holdIt = null; synchronized(localView) { holdIt = self; } return send(address, hint, holdIt, false, false); } /** * Send our own advertisement to all of the seed rendezvous. */ public void seed() { long reseedRemaining = earliestReseed - TimeUtils.timeNow(); if (reseedRemaining > 0) { // Too early; the previous round is not even done. if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Still Seeding for " + reseedRemaining + "ms."); } return; } if (LOG.isEnabledFor(Level.INFO)) { LOG.info("New Seeding..."); } // Schedule sending propagated query to our local network neighbors. timedSend(self, (EndpointAddress) null, DEFAULT_SEEDING_PERIOD * 2); // We start trying configured seed peers only after some time, so we // make sure that the topology will not tend to be centralized. if( (TimeUtils.timeNow() > nextSeedingURIrefreshTime) && (localView.size() < minHappyPeerView) ) { nextSeedingURIrefreshTime = TimeUtils.toAbsoluteTimeMillis(SEEDING_URI_REFRESH_PERIOD); List seedRdvs = new ArrayList(permanentSeedHosts); if (!seedingURIs.isEmpty() ) { boolean allLoadsFailed = true; Iterator allSeedingURIs = seedingURIs.iterator(); while(allSeedingURIs.hasNext()) { URI aURI = (URI) allSeedingURIs.next(); try { seedRdvs.addAll(Arrays.asList(loadSeeds(aURI))); allLoadsFailed = false; } catch( IOException failed ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed loading seeding list from : " + aURI ); } } } if( allLoadsFailed ) { // Allow for an early reload if we couldn't contact any of // the seeding URIS. nextSeedingURIrefreshTime = TimeUtils.toAbsoluteTimeMillis(SEEDING_URI_REFRESH_PERIOD / 5); } } synchronized( activeSeedHosts ) { activeSeedHosts.clear(); Collections.shuffle(seedRdvs); activeSeedHosts.addAll( seedRdvs ); } } long iterations = 0; if(localView.size() < minHappyPeerView) { // We only do these things if we don't have a "happy" Peer View. // If the Peer View is already "happy" then we will use only // Peer View referrals for learning of new entires. List seedRdvs = new ArrayList(activeSeedHosts); while ( !seedRdvs.isEmpty() ) { if (sendRandomByAddr(seedRdvs, DEFAULT_SEEDING_RDVPEERS, seedingRdvConnDelay + DEFAULT_SEEDING_PERIOD * iterations)) { ++iterations; } } if ( !useOnlySeeds ) { // If use only seeds, we're not allowed to put in the peerview // anything but our seed rdvs. So, we've done everything that // was required. // Schedule sending propagated query to our advertising group if (advertisingGroup != null) { // send it, but not immediately. scheduleAdvertisingGroupQuery(DEFAULT_SEEDING_PERIOD * 2); } // send own advertisement to a random set of rendezvous List rdvs = discoverRdvAdverisements(); Collections.shuffle(rdvs); while ( !rdvs.isEmpty() ) { if (sendRandomByAdv(rdvs, DEFAULT_SEEDING_RDVPEERS, DEFAULT_SEEDING_PERIOD * iterations)) { ++iterations; } } if (probeRelays) { List relays = getRelayPeers(); Collections.shuffle(relays); while ( !relays.isEmpty() ) { if (sendRandomByAddr(relays, DEFAULT_SEEDING_RDVPEERS, DEFAULT_SEEDING_PERIOD * iterations)) { ++iterations; } } } } } earliestReseed = TimeUtils.toAbsoluteTimeMillis(seedingRdvConnDelay + (DEFAULT_SEEDING_PERIOD * iterations)); } /** * Evaluates if the given pve corresponds to one of our seed rdvs. This is * to support the useOnlySeeds flag. The test is not completely foolproof * since our list of seed rdvs is just transport addresses. We could be * given a pve that exhibits an address that corresponds to one of our seeds * but is fake. And we might later succeed in connecting to that rdv via one * the other, real addresses. As a result, useOnlySeeds is *not* a security * feature, just a convenience for certain kind of deployments. Seed * rdvs should include certificates for such a restriction to be a security * feature. */ private boolean isSeedRdv(RdvAdvertisement rdvAdv) { RouteAdvertisement radv = rdvAdv.getRouteAdv(); if (radv == null) { return false; } AccessPointAdvertisement apAdv = radv.getDest(); if (apAdv == null) { return false; } // The accessPointAdv returns a live (!) copy of the endpoint addresses. List addrList = new ArrayList(apAdv.getVectorEndpointAddresses()); if (addrList.isEmpty()) { return false; } ListIterator eachAddr = addrList.listIterator(); // convert each string to a URI while (eachAddr.hasNext()) { String anAddr = (String) eachAddr.next(); try { // Convert to URI to compare with seedHosts eachAddr.set( new URI(anAddr)); } catch (URISyntaxException badURI) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Skipping bad URI : " + anAddr, badURI); } } } // seedList must be treated as read-only. // We do what we want with addrVect addrList.retainAll(activeSeedHosts); // What's left is the intersection of seedHosts and the set of // endpoint addresses in the given pve. If it is non-empty, then we // accept the pve as that of a seed host. return (!addrList.isEmpty()); } /** * Make sure that the PeerView properly changes behavior, when switching * from edge mode to rdv mode, and vice-versa. * Since openWirePipes() requires some other services such as the Pipe * Service, and since updateStatus is invoked this work must happen in * background, giving a chance to other services to be started. **/ private class OpenPipesTask extends TimerTask { /** * {@inheritDoc} **/ public void run() { try { if (closed) { return; } openWirePipes(); } catch (Throwable all) { if (LOG.isEnabledFor(Level.FATAL)) { LOG.fatal("Uncaught Throwable in thread: " + Thread.currentThread().getName(), all); } } finally { removeTask(this); } } } private void scheduleOpenPipes(long delay) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Scheduling open pipes attempt in " + delay + "ms."); } addTask(new OpenPipesTask(), delay, -1); } /** * Send a PeerView Message to the specified peer. * * @param response indicates whether this is a response. Otherwise * we may create a distributed loop where peers keep perpetually * responding to each-other. * @param failure Construct the message as a failure notification. **/ private boolean send(PeerViewElement dest, PeerViewElement pve, boolean response, boolean failure) { Message msg = makeMessage(pve, response, failure); boolean result = dest.sendMessage(msg, SERVICE_NAME, uniqueGroupId); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending " + msg + " to " + dest + " success = " + result); } return result; } /** * Send a PeerView Message to the specified peer. * * @param response indicates whether this is a response. Otherwise * we may create a distributed loop where peers keep perpetually * responding to each-other. * @param failure Construct the message as a failure notification. **/ private boolean send(EndpointAddress dest, Object hint, PeerViewElement pve, boolean response, boolean failure) { Message msg = makeMessage(pve, response, failure); if (null != dest) { EndpointAddress realAddr = new EndpointAddress(dest, SERVICE_NAME, uniqueGroupId); Messenger messenger = rdvService.endpoint.getMessengerImmediate(realAddr, hint); if (null != messenger) { try { boolean result = messenger.sendMessage(msg); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending " + msg + " to " + dest + " success = " + result); } return result; } catch (IOException failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not send " + msg + " to " + dest, failed); } return false; } } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not get messenger for " + dest); } return false; } } else { // Else, propagate the message. try { endpoint.propagate(msg, SERVICE_NAME, uniqueGroupId); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sent " + msg + " via propagate"); } return true; } catch (IOException ez) { if (LOG.isEnabledFor(Level.WARN)) { // Pretty strange. This has little basis for failure... LOG.warn("Could not propagate " + msg, ez); } return false; } } } /** * Send a PeerView Message to the specified peer. * * @param response indicates whether this is a response. Otherwise * we may create a distributed loop where peers keep perpetually * responding to each-other. * @param failure Construct the message as a failure notification. **/ private boolean send(OutputPipe dest, PeerViewElement pve, boolean response, boolean failure) { Message msg = makeMessage(pve, response, failure); try { return dest.send(msg); } catch (IOException ez) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not send " + msg, ez); } return false; } } /** * Make a PeerView Message **/ private Message makeMessage(PeerViewElement content, boolean response, boolean failure) { Message msg = new Message(); // edge peers add an identifying element, RDV peers do not if (!rdvService.isRendezVous()) { msg.addMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -