📄 peerview.java
字号:
PeerViewElement pve = getPeerViewElement( asID ); if( null != pve ) { srcPeer = "\"" + pve.getRdvAdvertisement().getName() + "\""; } } catch( URISyntaxException failed ) { } } LOG.debug( "[" + group.getPeerGroupID() + "] Received a" + (isCached ? " cached" : "") + (isResponse ? " response" : "") + (isFailure ? " failure" : "") + " message" + (isFromEdge ? " from edge" : "") + " regarding \"" + radv.getName() + "\" from " + srcPeer); } // if this is a notification failure. All we have to do is locally // process the failure if (isFailure) { notifyFailure(radv.getPeerID(), false); return; } if (!isFromEdge && !isCached && isTrusted) { DiscoveryService discovery = group.getDiscoveryService(); if (discovery != null) { try { discovery.publish(radv, DEFAULT_RADV_LIFETIME, DEFAULT_RADV_EXPIRATION); } catch (IOException ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not publish " + radv.getName(), ex); } } } } // Figure out if we know that peer already. If we do, reuse the pve // that we have. boolean isNewbie = false; boolean added = false; PeerViewElement pve; int viewSize = 0; synchronized (localView) { PeerViewElement newbie = new PeerViewElement(endpoint, radv); pve = getPeerViewElement(newbie); if (null == pve) { pve = newbie; isNewbie = true; } if (!isFromEdge && !isCached && isTrusted) { if (!useOnlySeeds || isSeedRdv(radv)) { if (isNewbie) { added = addPeerViewElement(pve); } else { pve.setRdvAdvertisement(radv); } } } viewSize = localView.size(); } if( !isNewbie && isFromEdge && !isCached ) { // The message stated that it is from an edge we believed was a // peerview member. Best thing to do is tell everyone that it's no // longer in peerview. notifyFailure(pve, !localIsEdge); // we continue processing because it's not the other peer's fault we had the wrong idea. } // Do the rest of the add related tasks out of synch. // We must not nest any possibly synchronized ops in // the LocalView lock; it's the lowest level. if (added) { // Notify local listeners generateEvent(PeerViewEvent.ADD, pve); } /* * Now, see what if any message we have to send as a result. * There are four kind of messages we can send: * * - A response with ourselves, if we're being probed and we're * a rdv. * * - A probe to the peer whose adv we received, because we want * confirmation that it's alive. * * - A response with a random adv from our cache if we're being probed * * We may send more than one message. */ // Type 1: respond with self. // We need to do that whenever we're being probed and we're an rdv, // and the adv we got is that of the sender (!cached - otherwise we // can't respond to the sender, it's a kick message that tells us about // another peer, which is handled by Type 2 below). // This could happen along with Type 2 below. if (!isCached && !localIsEdge && !isResponse) { boolean status = send(pve, self, true, false); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Type 1 (Respond with self) : Sent to " + pve + " result=" + status); } } // Type 2: probe it. // We need to probe if we do not have it in our view and it is // a cached adv (from a third party, so non-authoritative) which // can be found either in a response or in a kick message. // OR, if it is a probe from a peer that we do not know (in which // case we will probe here if it pretends to be an rdv, and also // respond (see Type 1, above) - only if it is NOT a response). // If isNewbie && added, isCached cannot be true, so we do not // need to check for added; (isCached && isNewbie) is enough. // If isNewbie && added, response cannot be false, so there is // no need to check for added; (isNewbie && ! reponse) is enough. // Whatchout: do not always probe cached things we got in a response // because we'd likely get another response with another cached // adv, thus cascading through all rdvs. // What we do is to use the information only if our view is way small. // in order to garantee connectivity for the future. // If useOnlySeeds, we're not allowed to use other than our // seed rdvs, so do not probe anything we learn from 3rd party. if (!useOnlySeeds && isNewbie ) { if ( (isCached && !isResponse) || (!isCached && !isResponse && !isFromEdge) ) { boolean status = send(pve, self, false, false); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Type 2 (Probe PVE) : Probed " + pve + " result=" + status); } } } // Type 3: respond with random cached adv because being probed (likely // by an edge, but we don't care, even another rdv could benefit from it). // This could happen along with Type 2 above although it is rather // unlikely. // Respond with a strategized adv from our view. // // This always happens along with Type 1 and sometimes along with // Type 2 in the same time: we could send three messages. That would // be if we receive a probe from another rdv that we we do not know // yet and we're an rdv ourselves. So, we'll respond with ourselves, // we will probe the sender since it pretends to be an rdv, and we // also will send a chosen rdv from our view and send it (done here). // Note, it could mean a cascade of probes to all rdvs: the recipient // of our cached adv would then probe it, thus receiving another // cached adv, which it would probe, etc. // This phenomenon is prevented in Type 2 by probing cached peers // such responses only if the view is small enough. // NOTE: rdv peers do not need this all that much and we could // avoid it for them, but it should not cause much problems so we // might as well leave it for now. if (!isCached && !isResponse) { // Someone is looking for rdvs. try to help. PeerViewElement sendpve = replyStrategy.next(); if ((sendpve != null) && !pve.equals(sendpve) && !self.equals(sendpve)) { boolean status = send(pve, sendpve, true, false); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Type 3 (Respond with random pve) : Sent " + sendpve + " to " + pve + " result=" + status); } } } } /** * {@inheritDoc} **/ public void rendezvousEvent(RendezvousEvent event) { if (closed) { return; } boolean notifyFailure = false; synchronized (this) { int theEventType = event.getType(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupName() + "] Processing " + event); } refreshSelf(); if ((RendezvousEvent.BECAMERDV == theEventType) || (RendezvousEvent.BECAMEEDGE == theEventType)) { // kill any existing watchdog task if (null != watchdogTask) { removeTask(watchdogTask); watchdogTask.cancel(); watchdogTask = null; } } switch (theEventType) { case RendezvousEvent.RDVCONNECT: case RendezvousEvent.RDVRECONNECT: case RendezvousEvent.CLIENTCONNECT: case RendezvousEvent.CLIENTRECONNECT: case RendezvousEvent.RDVFAILED: case RendezvousEvent.RDVDISCONNECT: case RendezvousEvent.CLIENTFAILED: case RendezvousEvent.CLIENTDISCONNECT: break; case RendezvousEvent.BECAMERDV: openWirePipes(); watchdogTask = new WatchdogTask(); addTask(watchdogTask, WATCHDOG_PERIOD, WATCHDOG_PERIOD); rescheduleKick(true); break; case RendezvousEvent.BECAMEEDGE: openWirePipes(); if (!localView.isEmpty()) { // FIXME bondolo 20040229 since we likely don't have a // rendezvous connection, it is kind of silly to be sending // this now. probably should wait until we get a rendezvous // connection. notifyFailure = true; } rescheduleKick(true); break; default: if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("[" + group.getPeerGroupName() + "] Unexpected RDV event : " + event); } break; } } // we can't do the notification under synchronization. if (notifyFailure) { notifyFailure(self, true); } } public void start() { // do nothing for now... all the good stuff happens as a result of // rendezvous events. } public void stop() { synchronized (this) { // Only one thread gets to perform the shutdown. if (closed) { return; } closed = true; } // notify other rendezvous peers that we are going down (only // if this peer is a rendezvous) if (rdvService.isRendezVous()) { notifyFailure(self, true); } // From now on we can nullify everything we want. Other threads check // the closed flag where it matters. synchronized (this) { if (watchdogTask != null) { removeTask(watchdogTask); watchdogTask.cancel(); watchdogTask = null; } // Remove message listener. endpoint.removeIncomingMessageListener(SERVICE_NAME, uniqueGroupId); // Remove rendezvous listener. rdvService.removeListener(this); // Remove all our pending scheduled tasks // Carefull with the indices while removing: do it backwards, it's // cheaper and simpler. synchronized (scheduledTasks) { Iterator eachTask = scheduledTasks.iterator(); while (eachTask.hasNext()) { try { TimerTask task = (TimerTask) eachTask.next(); task.cancel(); eachTask.remove(); } catch (Exception ez1) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Cannot cancel task: ", ez1); } continue; } } } // Make sure that we close our WirePipes closeWirePipes(); // Let go of the up and down peers. downPeer = null; upPeer = null; localView.clear(); timer.cancel(); rpvListeners.clear(); } } protected void addTask(TimerTask task, long delay, long interval) { synchronized (scheduledTasks) { if (scheduledTasks.contains(task)) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Task list already contains specified task."); } } scheduledTasks.add(task); } if (interval >= 1) { timer.schedule(task, delay, interval); } else { timer.schedule(task, delay); } } protected void removeTask(TimerTask task) { scheduledTasks.remove(task); } /** * Adds the specified URI to the list of seeds. Even if useOnlySeeds is in * effect, this seed may now be used, as if it was part of the initial * configuration.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -