📄 peerview.java
字号:
private void scheduleOpenPipes(long delay) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Scheduling open pipes attempt in " + delay + "ms."); } addTask(new OpenPipesTask(), delay, -1); } /** * Send a PeerView Message to the specified peer. * * @param response indicates whether this is a response. Otherwise * we may create a distributed loop where peers keep perpetually * responding to each-other. * @param failure Construct the message as a failure notification. */ private boolean send(PeerViewElement dest, PeerViewElement pve, boolean response, boolean failure) { Message msg = makeMessage(pve, response, failure); boolean result = dest.sendMessage(msg, SERVICE_NAME, uniqueGroupId); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + " to " + dest + " success = " + result); } return result; } /** * Send a PeerView Message to the specified peer. * * @param response indicates whether this is a response. Otherwise * we may create a distributed loop where peers keep perpetually * responding to each-other. * @param failure Construct the message as a failure notification. */ private boolean send(EndpointAddress dest, RouteAdvertisement hint, PeerViewElement pve, boolean response, boolean failure) { Message msg = makeMessage(pve, response, failure); if (null != dest) { EndpointAddress realAddr = new EndpointAddress(dest, SERVICE_NAME, uniqueGroupId); Messenger messenger = rdvService.endpoint.getMessengerImmediate(realAddr, hint); if (null != messenger) { try { boolean result = messenger.sendMessage(msg); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + " to " + dest + " success = " + result); } return result; } catch (IOException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not send " + msg + " to " + dest, failed); } return false; } } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Could not get messenger for " + dest); } return false; } } else { // Else, propagate the message. try { endpoint.propagate(msg, SERVICE_NAME, uniqueGroupId); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sent " + msg + " via propagate"); } return true; } catch (IOException ez) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { // Pretty strange. This has little basis for failure... LOG.log(Level.WARNING, "Could not propagate " + msg, ez); } return false; } } } /** * Send a PeerView Message to the specified peer. * * @param response indicates whether this is a response. Otherwise * we may create a distributed loop where peers keep perpetually * responding to each-other. * @param failure Construct the message as a failure notification. * @param dest destination output pipe * @param pve the peer view element * @return true if successful */ private boolean send(OutputPipe dest, PeerViewElement pve, boolean response, boolean failure) { Message msg = makeMessage(pve, response, failure); try { return dest.send(msg); } catch (IOException ez) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not send " + msg, ez); } return false; } } /** * Make a PeerView Message * * @param content the peer view element * @param response the response * @param failure whether to create a message based on a failure * @return the message */ private Message makeMessage(PeerViewElement content, boolean response, boolean failure) { Message msg = new Message(); // // edge peers add an identifying element, RDV peers do not // if (!rdvService.isRendezVous()) { // msg.addMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT); // } // if (failure) { // This is a failure notification. msg.addMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT); } refreshSelf(); RdvAdvertisement radv = content.getRdvAdvertisement(); XMLDocument doc = (XMLDocument) radv.getDocument(MimeMediaType.XMLUTF8); String msgName = response ? RESPONSE_ELEMENT_NAME : MESSAGE_ELEMENT_NAME; MessageElement msge = new TextDocumentMessageElement(msgName, doc, null); msg.addMessageElement(MESSAGE_NAMESPACE, msge); if (!content.equals(self)) { // This is a cached RdvAdvertisement msg.addMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT); // This message contains an RdvAdvertisement which is not ourself. In that // case, it is wise to also send the local route advertisement (as the optional // SrcRdvAdv) so the destination might have a better change to access the "content" // RendezvousAdv (this peer will then act as a hop). RouteAdvertisement localra = EndpointUtils.extractRouteAdv(lastPeerAdv); if (localra != null) { try { XMLDocument radoc = (XMLDocument) localra.getDocument(MimeMediaType.XMLUTF8); msge = new TextDocumentMessageElement(SRCROUTEADV_ELEMENT_NAME, radoc, null); msg.addMessageElement(MESSAGE_NAMESPACE, msge); } catch (Exception ez1) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not create optional src route adv for " + content, ez1); } } } } return msg; } /** * Invoked by anyone in order to inform the PeerView of a failure * of one of the member peers. * * @param pid ID of the peer which failed. * @param propagateFailure If <tt>true</tt>then broadcast the failure to * other peers otherwise only update the local peerview. */ public void notifyFailure(PeerID pid, boolean propagateFailure) { PeerViewElement pve = getPeerViewElement(pid); if (null != pve) { notifyFailure(pve, propagateFailure); } } /** * Invoked when a peerview member peer becomes unreachable. * * @param pve The peer which failed. * @param propagateFailure If {@code true} then broadcast the failure to * other peers otherwise only update the local peerview. */ void notifyFailure(PeerViewElement pve, boolean propagateFailure) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Notifying failure of " + pve); } try { boolean removedFromPeerView = removePeerViewElement(pve); // only propagate if we actually knew of the peer propagateFailure &= (removedFromPeerView || (self == pve)); // Notify local listeners if (removedFromPeerView) { generateEvent(PeerViewEvent.FAIL, pve); } boolean emptyPeerView = localView.isEmpty(); // If the local view has become empty, reset the kicker into // a seeding mode. if (emptyPeerView && removedFromPeerView) { rescheduleKick(true); } if (propagateFailure) { // Notify other rendezvous peers that there has been a failure. OutputPipe op = localGroupWirePipeOutputPipe; if (null != op) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Propagating failure of " + pve); } send(op, pve, true, true); } } } catch (Exception ez) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failure while generating noficiation of failure of PeerView : " + pve, ez); } } } /** * Invoked by the Timer thread to cause each PeerView to initiate * a Peer Advertisement exchange. */ private void kick() { try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Begun kick() in " + group.getPeerGroupID()); } // Use seed strategy. (it has its own throttling and resource limiting). seed(); // refresh ourself to a peer in our view PeerViewElement refreshee = refreshRecipientStrategy.next(); if ((refreshee != null) && (self != refreshee)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Refresh " + refreshee); } send(refreshee, self, false, false); } // now share an adv from our local view to another peer from our // local view. PeerViewElement recipient = kickRecipientStrategy.next(); if (recipient == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No recipient to send adv "); } return; } PeerViewElement rpve = kickAdvertisementStrategy.next(); if (rpve == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No adv to send"); } return; } if (rpve.equals(recipient) || self.equals(recipient)) { // give up: no point in sending a peer its own adv if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("adv to send is same as recipient: Nothing to do."); } return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending adv " + rpve + " to " + recipient); } send(recipient, rpve, true, false); } finally { rescheduleKick(false); } } /** * Choose a boot level appropriate for the current configuration and state. * * @return the new boot level. */ private int adjustBootLevel() { boolean areWeHappy = localView.size() >= minHappyPeerView; // increment boot level faster if we have a reasonable peerview. int increment = areWeHappy ? BOOTLEVEL_INCREMENT : BOOTLEVEL_INCREMENT * 2; // if we don't have a reasonable peerview, we continue to try harder. int maxbootlevel = MAX_BOOTLEVEL - (areWeHappy ? 0 : BOOTLEVEL_INCREMENT); bootLevel = Math.min(maxbootlevel, bootLevel + increment); return bootLevel; } private synchronized void rescheduleKick(boolean now) { if (closed) { return; } // Set the next iteration try { if (now) { bootLevel = MIN_BOOTLEVEL; } else { adjustBootLevel(); } long tilNextKick = DEFAULT_BOOTSTRAP_KICK_INTERVAL * ((1L << bootLevel) - 1); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Scheduling kick in " + (tilNextKick / TimeUtils.ASECOND) + " seconds at bootLevel " + bootLevel + " in group " + group.getPeerGroupID()); } KickerTask task = new KickerTask(); addTask(task, tilNextKick, -1); } catch (Exception ez1) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Cannot set timer. RPV will not work.", ez1); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -