📄 peerview.java
字号:
private OutputPipe localGroupWirePipeOutputPipe = null; /** * A task which monitors the up and down peers in the peerview. */ private WatchdogTask watchdogTask = null; /** * This is the accumulated view by an instance of this class. */ private final SortedSet<PeerViewDestination> localView = Collections.synchronizedSortedSet(new TreeSet<PeerViewDestination>()); /** * PVE for ourself. * <p/> * FIXME bondolo 20041015 This should be part of the local view. */ private final PeerViewElement self; private PeerViewElement upPeer = null; private PeerViewElement downPeer = null; private final PeerViewStrategy replyStrategy; private final PeerViewStrategy kickRecipientStrategy; private final PeerViewStrategy kickAdvertisementStrategy; private final PeerViewStrategy refreshRecipientStrategy; // PeerAdv tracking. private PeerAdvertisement lastPeerAdv = null; private int lastModCount = -1; private final PipeAdvertisement localGroupWirePipeAdv; private final PipeAdvertisement advGroupPropPipeAdv; /** * If <code>true</code> then this Peer View instance is closed and is * shutting down. */ private volatile boolean closed = false; /** * Get an instance of PeerView for the specified PeerGroup and Service. * * @param group Peer Group in which this Peer View instance operates. * @param advertisingGroup Peer Group in which this Peer View instance will * advertise and broadcast its existence. * @param rdvService The rdvService we are to use. * @param name The identifying name for this Peer View instance. */ public PeerView(PeerGroup group, PeerGroup advertisingGroup, RendezVousServiceImpl rdvService, String name) { this.group = group; this.advertisingGroup = advertisingGroup; this.rdvService = rdvService; this.name = name; this.endpoint = group.getEndpointService(); this.uniqueGroupId = group.getPeerGroupID().getUniqueValue().toString(); timer = new Timer("PeerView Timer for " + group.getPeerGroupID(), true); Advertisement adv = null; ConfigParams confAdv = group.getConfigAdvertisement(); // Get the config. If we do not have a config, we're done; we just keep // the defaults (edge peer/no auto-rdv) if (confAdv != null) { try { XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(rdvService.getAssignedID()); if (null != configDoc) { adv = AdvertisementFactory.newAdvertisement(configDoc); } } catch (java.util.NoSuchElementException failed) {// ignored } } RdvConfigAdv rdvConfigAdv; if (!(adv instanceof RdvConfigAdv)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Creating new RdvConfigAdv for defaults."); } rdvConfigAdv = (RdvConfigAdv) AdvertisementFactory.newAdvertisement(RdvConfigAdv.getAdvertisementType()); } else { rdvConfigAdv = (RdvConfigAdv) adv; } if (rdvConfigAdv.getSeedRendezvousConnectDelay() > 0) { seedingRdvConnDelay = rdvConfigAdv.getSeedRendezvousConnectDelay(); } useOnlySeeds = rdvConfigAdv.getUseOnlySeeds(); if (rdvConfigAdv.getMinHappyPeerView() > 0) { minHappyPeerView = rdvConfigAdv.getMinHappyPeerView(); } URISeedingManager seedingManager; if ((null == advertisingGroup) && rdvConfigAdv.getProbeRelays()) { seedingManager = new RelayReferralSeedingManager(rdvConfigAdv.getAclUri(), useOnlySeeds, group, name); } else { seedingManager = new URISeedingManager(rdvConfigAdv.getAclUri(), useOnlySeeds, group, name); } for (URI aSeeder : Arrays.asList(rdvConfigAdv.getSeedingURIs())) { seedingManager.addSeedingURI(aSeeder); } for (URI aSeed : Arrays.asList(rdvConfigAdv.getSeedRendezvous())) { seedingManager.addSeed(aSeed); } this.seedingManager = seedingManager; lastPeerAdv = group.getPeerAdvertisement(); lastModCount = lastPeerAdv.getModCount(); // create a new local RdvAdvertisement and set it to self. RdvAdvertisement radv = createRdvAdvertisement(lastPeerAdv, name); self = new PeerViewElement(endpoint, radv); // addPeerViewElement( self ); // setup endpoint listener endpoint.addIncomingMessageListener(this, SERVICE_NAME, uniqueGroupId); // add rendezvous listener rdvService.addListener(this); // initialize strategies replyStrategy = new PeerViewRandomWithReplaceStrategy(localView); kickRecipientStrategy = new PeerViewRandomStrategy(localView); kickAdvertisementStrategy = new PeerViewRandomWithReplaceStrategy(localView); refreshRecipientStrategy = new PeerViewSequentialStrategy(localView); localGroupWirePipeAdv = makeWirePipeAdvertisement(group, group, name); if (null != advertisingGroup) { advGroupPropPipeAdv = makeWirePipeAdvertisement(advertisingGroup, group, name); } else { advGroupPropPipeAdv = null; } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info( "PeerView created for group \"" + group.getPeerGroupName() + "\" [" + group.getPeerGroupID() + "] name \"" + name + "\""); } } /** * {@inheritDoc} * <p/> * Listener for "PeerView"/<peergroup-unique-id> and propagate pipes. */ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { // check what kind of message this is (response or not). boolean isResponse = false; MessageElement me = msg.getMessageElement(MESSAGE_NAMESPACE, MESSAGE_ELEMENT_NAME); if (me == null) { me = msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME); if (me == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Discarding damaged " + msg + "."); } return; } else { isResponse = true; } } Advertisement adv; try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(me); adv = AdvertisementFactory.newAdvertisement(asDoc); } catch (RuntimeException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed building rdv advertisement from message element", failed); } return; } catch (IOException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed building rdv advertisement from message element", failed); } return; } if (!(adv instanceof RdvAdvertisement)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Response does not contain radv (" + adv.getAdvertisementType() + ")"); } return; } RdvAdvertisement radv = (RdvAdvertisement) adv; if (null == radv.getRouteAdv()) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Rdv Advertisement does not contain route."); } return; } // See if we can find a src route adv in the message. me = msg.getMessageElement(MESSAGE_NAMESPACE, SRCROUTEADV_ELEMENT_NAME); if (me != null) { try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(me); Advertisement routeAdv = AdvertisementFactory.newAdvertisement(asDoc); if (!(routeAdv instanceof RouteAdvertisement)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Advertisement is not a RouteAdvertisement"); } } else { RouteAdvertisement rdvRouteAdv = radv.getRouteAdv().clone(); // XXX we stich them together even if in the end it gets optimized away RouteAdvertisement.stichRoute(rdvRouteAdv, (RouteAdvertisement) routeAdv); radv.setRouteAdv(rdvRouteAdv); } } catch (RuntimeException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed building route adv from message element", failed); } } catch (IOException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed building route adv from message element", failed); } } } me = null; // Is this a message about ourself? if (group.getPeerID().equals(radv.getPeerID())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Received a PeerView message about self. Discard."); } return; } // Collect the various flags. boolean isFailure = (msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME) != null); boolean isCached = (msg.getMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT_NAME) != null); boolean isFromEdge = (msg.getMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT_NAME) != null); boolean isTrusted = isFromEdge || seedingManager.isAcceptablePeer(radv.getRouteAdv()); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { String srcPeer = srcAddr.toString(); if ("jxta".equals(srcAddr.getProtocolName())) { try { String idstr = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress(); ID asID = IDFactory.fromURI(new URI(idstr)); PeerViewElement pve = getPeerViewElement(asID); if (null != pve) { srcPeer = "\"" + pve.getRdvAdvertisement().getName() + "\""; } } catch (URISyntaxException failed) {// ignored } } LOG.fine( "[" + group.getPeerGroupID() + "] Received a" + (isCached ? " cached" : "") + (isResponse ? " response" : "") + (isFailure ? " failure" : "") + " message (" + msg.toString() + ")" + (isFromEdge ? " from edge" : "") + " regarding \"" + radv.getName() + "\" from " + srcPeer); } if (!isTrusted) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Rejecting peerview message from " + radv.getPeerID()); } return; } // if this is a notification failure. All we have to do is locally // process the failure if (isFailure) { notifyFailure(radv.getPeerID(), false); return; } handlePeerViewMessage(isResponse, isCached, isFromEdge, isTrusted, radv); } /** * Following the extraction of a peerview message from a */ private void handlePeerViewMessage(boolean isResponse, boolean isCached, boolean isFromEdge, boolean isTrusted, RdvAdvertisement radv) { // Figure out if we know that peer already. If we do, reuse the pve // that we have. boolean isNewbie = false; boolean added = false; PeerViewElement pve; synchronized (localView) { PeerViewElement newbie = new PeerViewElement(endpoint, radv); pve = getPeerViewElement(newbie); if (null == pve) { pve = newbie; isNewbie = true; } if (!isFromEdge && !isCached && isTrusted) { if (isNewbie) { added = addPeerViewElement(pve); } else { pve.setRdvAdvertisement(radv); } } } if (!isNewbie && isFromEdge && !isCached) { // The message stated that it is from an edge we believed was a // peerview member. Best thing to do is tell everyone that it's no // longer in peerview. notifyFailure(pve, true); // we continue processing because it's not the other peer's fault we had the wrong idea. } // Do the rest of the add related tasks out of synch. // We must not nest any possibly synchronized ops in // the LocalView lock; it's the lowest level. if (added) { // Notify local listeners generateEvent(PeerViewEvent.ADD, pve); } /* * Now, see what if any message we have to send as a result. * There are three kinds of messages we can send: * * - A response with ourselves, if we're being probed and we're
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -