📄 peerview.java
字号:
* a rdv. * * - A probe to the peer whose adv we received, because we want * confirmation that it's alive. * * - A response with a random adv from our cache if we're being probed * * We may send more than one message. */ boolean status; if (!isCached) { if (!isResponse) { // Type 1: Respond to probe // // We are being probed by an edge peer or peerview member. We respond // with our own advertisement. status = send(pve, self, true, false); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Type 1 (Respond with self PVE) : Sent to " + pve + " result =" + status); } // Type 3: Respond with random entry from our PV when we are probed. // // Respond with a strategized adv from our view. PeerViewElement sendpve = replyStrategy.next(); if ((sendpve != null) && !pve.equals(sendpve) && !self.equals(sendpve)) { status = send(pve, sendpve, true, false); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Type 3 (Respond with random PVE) : Sent " + sendpve + " to " + pve + " result=" + status); } } } else { // Heartbeat: do nothing. } } else if (isResponse) { if (isNewbie && !useOnlySeeds && !isFromEdge) { // Type 2: Probe a peer we have just learned about from a referral. // // If useOnlySeeds, we're not allowed to talk to peers other than our // seeds, so do not probe anything we learn from 3rd party. (Probing of // seeds happens as part of the "kick" strategy). status = send(pve, self, false, false); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Type 2 (Probe PVE) : Probed " + pve + " result=" + status); } } else { // Already known or ignoring: do nothing. } } else { // Invalid : do nothing. } } /** * {@inheritDoc} */ @SuppressWarnings("fallsthrough") public void rendezvousEvent(RendezvousEvent event) { if (closed) { return; } boolean notifyFailure = false; synchronized (this) { int theEventType = event.getType(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("[" + group.getPeerGroupName() + "] Processing " + event); } refreshSelf(); if ((RendezvousEvent.BECAMERDV == theEventType) || (RendezvousEvent.BECAMEEDGE == theEventType)) { // kill any existing watchdog task if (null != watchdogTask) { removeTask(watchdogTask); watchdogTask.cancel(); watchdogTask = null; } } switch (theEventType) { case RendezvousEvent.RDVCONNECT: case RendezvousEvent.RDVRECONNECT: case RendezvousEvent.CLIENTCONNECT: case RendezvousEvent.CLIENTRECONNECT: case RendezvousEvent.RDVFAILED: case RendezvousEvent.RDVDISCONNECT: case RendezvousEvent.CLIENTFAILED: case RendezvousEvent.CLIENTDISCONNECT: break; case RendezvousEvent.BECAMERDV: openWirePipes(); watchdogTask = new WatchdogTask(); addTask(watchdogTask, WATCHDOG_PERIOD, WATCHDOG_PERIOD); rescheduleKick(true); break; case RendezvousEvent.BECAMEEDGE: openWirePipes(); if (!localView.isEmpty()) { // FIXME bondolo 20040229 since we likely don't have a // rendezvous connection, it is kind of silly to be sending // this now. probably should wait until we get a rendezvous // connection. notifyFailure = true; } rescheduleKick(true); break; default: if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("[" + group.getPeerGroupName() + "] Unexpected RDV event : " + event); } break; } } // we can't do the notification under synchronization. if (notifyFailure) { notifyFailure(self, true); } } public void start() {// do nothing for now... all the good stuff happens as a result of // rendezvous events. } public void stop() { synchronized (this) { // Only one thread gets to perform the shutdown. if (closed) { return; } closed = true; } // notify other rendezvous peers that we are going down notifyFailure(self, true); // From now on we can nullify everything we want. Other threads check // the closed flag where it matters. synchronized (this) { if (watchdogTask != null) { removeTask(watchdogTask); watchdogTask.cancel(); watchdogTask = null; } // Remove message listener. endpoint.removeIncomingMessageListener(SERVICE_NAME, uniqueGroupId); // Remove rendezvous listener. rdvService.removeListener(this); // Remove all our pending scheduled tasks // Carefull with the indices while removing: do it backwards, it's // cheaper and simpler. synchronized (scheduledTasks) { Iterator<TimerTask> eachTask = scheduledTasks.iterator(); while (eachTask.hasNext()) { try { TimerTask task = eachTask.next(); task.cancel(); eachTask.remove(); } catch (Exception ez1) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Cannot cancel task: ", ez1); } } } } // Make sure that we close our WirePipes closeWirePipes(); // Let go of the up and down peers. downPeer = null; upPeer = null; localView.clear(); timer.cancel(); rpvListeners.clear(); } } protected void addTask(TimerTask task, long delay, long interval) { synchronized (scheduledTasks) { if (scheduledTasks.contains(task)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Task list already contains specified task."); } } scheduledTasks.add(task); } if (interval >= 1) { timer.schedule(task, delay, interval); } else { timer.schedule(task, delay); } } protected void removeTask(TimerTask task) { scheduledTasks.remove(task); } /** * Adds the specified URI to the list of seeds. Even if useOnlySeeds is in * effect, this seed may now be used, as if it was part of the initial * configuration. * * @param seed the URI of the seed rendezvous. */ public void addSeed(URI seed) { if (seedingManager instanceof URISeedingManager) { ((URISeedingManager) seedingManager).addSeed(seed); } } /** * Probe the specified peer immediately. * <p/> * Note: If "useOnlySeeds" is in effect and the peer is not a seed, any response to this probe will be ignored. */ public boolean probeAddress(EndpointAddress address, RouteAdvertisement hint) { PeerViewElement holdIt; synchronized (localView) { holdIt = self; } return send(address, hint, holdIt, false, false); } /** * Send our own advertisement to all of the seed rendezvous. */ public void seed() { long reseedRemaining = earliestReseed - TimeUtils.timeNow(); if (reseedRemaining > 0) { // Too early; the previous round is not even done. if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Still Seeding for " + reseedRemaining + "ms."); } return; } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("New Seeding..."); } // Schedule sending propagated query to our local network neighbors. send(null, null, self, false, false); long iterations = 0; if (localView.size() < minHappyPeerView) { // We only do these things if we don't have a "happy" Peer View. // If the Peer View is already "happy" then we will use only // Peer View referrals for learning of new entires. List<RouteAdvertisement> seedRdvs = new ArrayList<RouteAdvertisement>( Arrays.asList(seedingManager.getActiveSeedRoutes())); while (!seedRdvs.isEmpty()) { RouteAdvertisement aSeed = seedRdvs.remove(0); if (null == aSeed.getDestPeerID()) { // It is an incomplete route advertisement. We are going to assume that it is only a wrapper for a single ea. Vector<String> seed_eas = aSeed.getDest().getVectorEndpointAddresses(); if (!seed_eas.isEmpty()) { EndpointAddress aSeedHost = new EndpointAddress(seed_eas.get(0)); // XXX 20061220 bondolo We could check all of our current PVEs to make sure that this address is not already known. send(aSeedHost, null, self, false, false); } } else { // We have a full route, send it to the virtual address of the route! // FIXME malveaux 20070816 Second part of conjunct can be removed once 'self' is included in the peerview if ((null == getPeerViewElement(aSeed.getDestPeerID())) && !group.getPeerID().equals(aSeed.getDestPeerID())) { EndpointAddress aSeedHost = new EndpointAddress("jxta", aSeed.getDestPeerID().getUniqueValue().toString(), null, null); send(aSeedHost, aSeed, self, false, false); } } } if (!useOnlySeeds) { // If use only seeds, we're not allowed to put in the peerview // anything but our seed rdvs. So, we've done everything that // was required. // Schedule sending propagated query to our advertising group if (advertisingGroup != null) { // send it, but not immediately. scheduleAdvertisingGroupQuery(DEFAULT_SEEDING_PERIOD * 2); } } } earliestReseed = TimeUtils.toAbsoluteTimeMillis(seedingRdvConnDelay + (DEFAULT_SEEDING_PERIOD * iterations)); } /** * Make sure that the PeerView properly changes behavior, when switching * from edge mode to rdv mode, and vice-versa. * Since openWirePipes() requires some other services such as the Pipe * Service, and since updateStatus is invoked this work must happen in * background, giving a chance to other services to be started. */ private class OpenPipesTask extends TimerTask { /** * {@inheritDoc} */ @Override public void run() { try { if (closed) { return; } openWirePipes(); } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in thread: " + Thread.currentThread().getName(), all); } } finally { removeTask(this); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -