📄 peerview.java
字号:
* this can mean that the PeerView is held around for a long time.</li> * * <li>java.util.Timer is not only not real-time (which is more or less fine * for the PeerView, but it sequentially invokes tasks (only one Thread * per Timer). As a result, tasks that takes a long time to run delays * other tasks.</li> * </ul> * * <p/>The PeerView would function better with a better Timer, but JDK does * not provide a standard timer that would fullfill the needs of the * PeerView. Maybe we should implement a JXTA Timer, since lots of the JXTA * services, by being very asynchronous, rely on the same kind of timer * semantics as the PeerView. Until then, creating a Timer per instance of * the PeerView (i.e. per instance of joined PeerGroup) is the best * workaround. **/ private final Timer timer; /** * A random number generator. */ private final static Random random = new Random(); /** * List of scheduled tasks **/ private final Set scheduledTasks = Collections.synchronizedSet(new HashSet()); /** * Describes the frequency and amount of effort we will spend updating * the peerview. **/ private int bootLevel = MIN_BOOTLEVEL; /** * Earliest absolute time in milliseconds at which we will allow a reseed * to take place. **/ private long earliestReseed = 0L; private final String uniqueGroupId; /** * Listeners for PeerView Events. * * <ul> * <li>Values are {@link PeerViewListener}. * </ul> **/ private final Set rpvListeners = Collections.synchronizedSet(new HashSet()); /** * Used for querying for pves. **/ private InputPipe wirePipeInputPipe = null; /** * Used for querying for pves. **/ private OutputPipe wirePipeOutputPipe = null; /** * Used for notifications about pve failures. **/ private InputPipe localGroupWirePipeInputPipe = null; /** * Used for notifications about pve failures. **/ private OutputPipe localGroupWirePipeOutputPipe = null; /** * A task which monitors the up and down peers in the peerview. **/ private WatchdogTask watchdogTask = null; /** * This is the accumulated view by an instance of this class. * * <p/>Values are {@see net.jxta.impl.rendezvous.rpv.PeerViewElement} */ private final SortedSet localView = Collections.synchronizedSortedSet(new TreeSet()); /** * PVE for ourself. * * FIXME bondolo 20041015 This should be part of the local view. **/ private final PeerViewElement self; private PeerViewElement upPeer = null; private PeerViewElement downPeer = null; private final PeerViewStrategy replyStrategy; private final PeerViewStrategy kickRecipientStrategy; private final PeerViewStrategy kickAdvertisementStrategy; private final PeerViewStrategy refreshRecipientStrategy; protected final AccessList acl = new AccessList(); protected File aclFile; protected long aclFileLastModified = 0; protected long nextACLrefreshTime =0; // PeerAdv tracking. private PeerAdvertisement lastPeerAdv = null; private int lastModCount = -1; private final PipeAdvertisement localGroupWirePipeAdv; private final PipeAdvertisement advGroupPropPipeAdv; /** * If <code>true</code> then this Peer View instance is closed and is * shutting down. **/ private volatile boolean closed = false; /** * Get an instance of PeerView for the specified PeerGroup and Service. * * @param group Peer Group in which this Peer View instance operates. * @param advertisingGroup Peer Group in which this Peer View instance will * advertise and broadcast its existance. * @param rdvService The rdvService we are to use. * @param name The identifying name for this Peer View instance. **/ public PeerView(PeerGroup group, PeerGroup advertisingGroup, RendezVousServiceImpl rdvService, String name ) { this.group = group; this.advertisingGroup = advertisingGroup; this.rdvService = rdvService; this.name = name; this.endpoint = group.getEndpointService(); aclFile = new File( new File(group.getStoreHome()), "rendezvousACL.xml"); aclFileLastModified = aclFile.lastModified(); try { acl.init(aclFile); this.nextACLrefreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD; } catch (IOException io) { acl.setGrantAll(true); this.nextACLrefreshTime = Long.MAX_VALUE; if (LOG.isEnabledFor(Level.INFO)) { LOG.info("PeerView Access Control permanently granting all permissions"); } } this.uniqueGroupId = group.getPeerGroupID().getUniqueValue().toString(); timer = new Timer(true); timer.schedule(new TimerThreadNamer("PeerView Timer for " + group.getPeerGroupID()), 0); ConfigParams confAdv = group.getConfigAdvertisement(); // Get the config. If we do not have a config, we're done; we just keep // the defaults (edge peer/no auto-rdv) if (confAdv != null) { Advertisement adv = null; try { XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(rdvService.getAssignedID()); if (null != configDoc) { // XXX 20041027 backwards compatibility configDoc.addAttribute( "type", RdvConfigAdv.getAdvertisementType() ); adv = AdvertisementFactory.newAdvertisement(configDoc); } } catch (java.util.NoSuchElementException failed) { ; } if (adv instanceof RdvConfigAdv) { RdvConfigAdv rdvConfigAdv = (RdvConfigAdv) adv; permanentSeedHosts.addAll(Arrays.asList(rdvConfigAdv.getSeedRendezvous())); useOnlySeeds = rdvConfigAdv.getUseOnlySeeds(); seedingURIs.addAll(Arrays.asList(rdvConfigAdv.getSeedingURIs())); if (rdvConfigAdv.getSeedRendezvousConnectDelay() > 0) { seedingRdvConnDelay = rdvConfigAdv.getSeedRendezvousConnectDelay(); } probeRelays = rdvConfigAdv.getProbeRelays(); if (rdvConfigAdv.getMinHappyPeerView() > 0) { minHappyPeerView = rdvConfigAdv.getMinHappyPeerView(); } } } lastPeerAdv = group.getPeerAdvertisement(); lastModCount = lastPeerAdv.getModCount(); // create a new local RdvAdvertisement and set it to self. RdvAdvertisement radv = createRdvAdvertisement(lastPeerAdv, name); self = new PeerViewElement(endpoint, radv); // if ( rdvService.isRendezVous() ) { // addPeerViewElement( self ); // } // setup endpoint listener endpoint.addIncomingMessageListener(this, SERVICE_NAME, uniqueGroupId); // add rendezvous listener rdvService.addListener(this); // initialize strategies replyStrategy = new PeerViewRandomWithReplaceStrategy(localView); kickRecipientStrategy = new PeerViewRandomStrategy(localView); kickAdvertisementStrategy = new PeerViewRandomWithReplaceStrategy(localView); refreshRecipientStrategy = new PeerViewSequentialStrategy(localView); localGroupWirePipeAdv = makeWirePipeAdvertisement(group); if (null != advertisingGroup) { advGroupPropPipeAdv = makeWirePipeAdvertisement(advertisingGroup); } else { advGroupPropPipeAdv = null; } if (LOG.isEnabledFor(Level.INFO)) { LOG.info("PeerView created for group \"" + group.getPeerGroupName() + "\" [" + group.getPeerGroupID() + "] name \"" + name + "\""); } } /** * {@inheritDoc} * * Listener for "PeerView"/<peergroup-unique-id> and propagate pipes. **/ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { // IsEdgePeer is confusing because the is* predicates are used // to refer to the properties of the message we're processing. boolean localIsEdge = !rdvService.isRendezVous(); // check what kind of message this is (response or not). boolean isResponse = false; MessageElement me = msg.getMessageElement(MESSAGE_NAMESPACE, MESSAGE_ELEMENT_NAME); if (me == null) { me = msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME); if (me == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Discarding damaged " + msg + "."); } return; } else { isResponse = true; } } Advertisement adv; try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(me); adv = AdvertisementFactory.newAdvertisement(asDoc); } catch (RuntimeException failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed building rdv advertisement from message element", failed); } return; } catch (IOException failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed building rdv advertisement from message element", failed); } return; } if (!(adv instanceof RdvAdvertisement)) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Response does not contain radv (" + adv.getAdvertisementType() + ")"); } return; } RdvAdvertisement radv = (RdvAdvertisement) adv; // See if we can find a src route adv in the message.s me = msg.getMessageElement(MESSAGE_NAMESPACE, SRCROUTEADV_ELEMENT_NAME); if (me != null) { try { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(me); Advertisement routeAdv = AdvertisementFactory.newAdvertisement(asDoc); if (!(routeAdv instanceof RouteAdvertisement)) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Advertisement is not a RouteAdvertisement"); } } else { RouteAdvertisement rdvRouteAdv = (RouteAdvertisement) radv.getRouteAdv().clone(); // XXX we stich them together even if in the end it gets optimized away RouteAdvertisement.stichRoute(rdvRouteAdv, (RouteAdvertisement) routeAdv); radv.setRouteAdv(rdvRouteAdv); } } catch (RuntimeException failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed building route adv from message element", failed); } } catch (IOException failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed building route adv from message element", failed); } } } me = null; // Is this a message about ourself? if (group.getPeerID().equals(radv.getPeerID())) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received a PeerView message about self. Discard."); } return; } // Collect the various flags. boolean isFailure = (msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME) != null); boolean isCached = (msg.getMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT_NAME) != null); boolean isFromEdge = (msg.getMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT_NAME) != null); boolean isTrusted = acl.isAllowed(radv.getPeerID()); if (!localIsEdge && !isFromEdge && !isTrusted) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Rejecting peerview entry for " + radv.getPeerID()); } return; } if (LOG.isEnabledFor(Level.DEBUG)) { String srcPeer = srcAddr.toString(); if( "jxta".equals(srcAddr.getProtocolName())) { try { String idstr = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress(); ID asID = IDFactory.fromURI( new URI(idstr) );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -