📄 edgepeerrdvservice.java
字号:
/* * $Id: EdgePeerRdvService.java,v 1.80 2006/05/30 21:01:50 hamada Exp $ * * Copyright (c) 2001-2004 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ========================================================= * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.rendezvous.edge;import java.io.IOException;import java.io.InputStream;import java.net.URI;import java.net.URISyntaxException;import java.util.ArrayList;import java.util.Arrays;import java.util.Collections;import java.util.Enumeration;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.TimerTask;import java.util.Vector;import net.jxta.discovery.DiscoveryService;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.document.XMLDocument;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.TextDocumentMessageElement;import net.jxta.id.ID;import net.jxta.id.IDFactory;import net.jxta.impl.protocol.RdvConfigAdv;import net.jxta.impl.rendezvous.PeerConnection;import net.jxta.impl.rendezvous.RendezVousPropagateMessage;import net.jxta.impl.rendezvous.RendezVousServiceImpl;import net.jxta.impl.rendezvous.StdRendezVousService;import net.jxta.impl.rendezvous.rendezvousMeter.RendezvousConnectionMeter;import net.jxta.impl.rendezvous.rendezvousMeter.RendezvousMeterBuildSettings;import net.jxta.impl.rendezvous.rpv.PeerViewElement;import net.jxta.impl.rendezvous.rpv.PeerViewEvent;import net.jxta.impl.rendezvous.rpv.PeerViewListener;import net.jxta.impl.util.TimeUtils;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.protocol.ConfigParams;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.RdvAdvertisement;import net.jxta.protocol.RouteAdvertisement;import net.jxta.rendezvous.RendezvousEvent;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** * A JXTA {@link net.jxta.rendezvous.RendezvousService} implementation which * implements the client portion of the standard JXTA Rendezvous Protocol (RVP). * * @see net.jxta.rendezvous.RendezvousService * @see <a href="http://spec.jxta.org/nonav/v1.0/docbook/JXTAProtocols.html#proto-rvp" target="_blank">JXTA Protocols Specification : Rendezvous Protocol</a> */public class EdgePeerRdvService extends StdRendezVousService implements PeerViewListener { /** * Log4J Logger */ private final static transient Logger LOG = Logger.getLogger(EdgePeerRdvService.class.getName()); // period. private final static long MONITOR_INTERVAL = 20 * TimeUtils.ASECOND; private final static long ADDEVENT_DELAY = 3 * TimeUtils.ASECOND; private final static long CHALLENGE_TIMEOUT = 90 * TimeUtils.ASECOND; /** * Number of rendezvous we will try to connect to. */ private final int MAX_RDV_CONNECTIONS = 1; private boolean useOnlySeeds = false; private long LEASE_MARGIN = 5 * TimeUtils.AMINUTE; private long maxChoiceDelay = ADDEVENT_DELAY; /** * This the time in absolute milliseconds at which the monitor is scheduled * to start.The monitor will not be scheduled at all until there is at * least one item in the peerview. The more items in the peerview, * the earlier we start. Once there are at least rdvConfig.minHappyPeerView items * it guaranteed that we start immediately because the start date * is in the past. */ private long monitorStartAt = -1; /** * Once choice delay has reached zero, any ADD event could trigger * a attempt at connecting to one of the rdvs. If these events come * in bursts while we're not yet connected, we might end-up doing * many parallel attempts, which is a waste of bandwidth. Instead * we refrain from doing more than one attempt every ADDEVENT_DELAY */ private long monitorNotBefore = -1; /** * <p/><ul> * <li>Keys are {@link net.jxta.peer.ID}.</li> * <li>Values are {@link net.jxta.impl.rendezvous.RdvConnection}.</li> * </ul> */ private final Map rendezVous = Collections.synchronizedMap(new HashMap()); /** * <p/><ul> * <li>Keys are {@link net.jxta.peer.PeerID}.</li> * <li>Values are {@link java.lang.Long} containing the time at which * the rendezvous disconnected.</li> * </ul> */ private final Set disconnectedRendezVous = Collections.synchronizedSet(new HashSet()); /** * Constructor * * @param group Description of Parameter * @param rdvService Description of Parameter */ public EdgePeerRdvService(PeerGroup group, RendezVousServiceImpl rdvService) { super(group, rdvService); ConfigParams confAdv = group.getConfigAdvertisement(); // Get the config. If we do not have a config, we're done; we just keep // the defaults (edge peer/no auto-rdv) if (confAdv != null) { Advertisement adv = null; try { XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(rdvService.getAssignedID()); if (null != configDoc) { // XXX 20041027 backwards compatibility configDoc.addAttribute( "type", RdvConfigAdv.getAdvertisementType() ); adv = AdvertisementFactory.newAdvertisement(configDoc); } } catch (java.util.NoSuchElementException failed) { ; } if (adv instanceof RdvConfigAdv) { RdvConfigAdv rdvConfigAdv = (RdvConfigAdv) adv; if (-1 != rdvConfigAdv.getMaxTTL()) { MAX_TTL = rdvConfigAdv.getMaxTTL(); } useOnlySeeds = rdvConfigAdv.getUseOnlySeeds(); if (0 != rdvConfigAdv.getLeaseMargin()) { LEASE_MARGIN = rdvConfigAdv.getLeaseMargin(); } if( rdvConfigAdv.getMinHappyPeerView() > 0) { maxChoiceDelay = rdvConfigAdv.getMinHappyPeerView() * ADDEVENT_DELAY; } else { maxChoiceDelay = ADDEVENT_DELAY; } } } // If edge peers ever use a walker, here is a good point to instantiate // one with: // For protocol bw compatibility, edge peers use the legacy // rdv protocol rather than the walker. // walker = new LimitedRangeWalk(group, pName, pParam, rdvService.rpv); if (LOG.isEnabledFor(Level.INFO)) { LOG.info("RendezVous Service is initialized for " + group.getPeerGroupID() + " as an Edge peer."); } } /** * Listener for * * <assignedID> */ private class StdRdvEdgeProtocolListener implements StdRendezVousService.StdRdvProtocolListener { /** * {@inheritDoc} */ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupID() + "] processing " + msg); } if (msg.getMessageElement("jxta", RdvAdvReply) != null) { processRdvAdvReply(msg); } if ((msg.getMessageElement("jxta", ConnectedPeerReply) != null) || (msg.getMessageElement("jxta", ConnectedRdvAdvReply) != null)) { processConnectedReply(msg); } if (msg.getMessageElement("jxta", DisconnectRequest) != null) { processDisconnectRequest(msg); } } } /** * {@inheritDoc} */ protected int startApp(String[] arg) { super.startApp(arg, new StdRdvEdgeProtocolListener()); // The other services may not be fully functional but they're there // so we can start our subsystems. // As for us, it does not matter if our methods are called between init // and startApp(). if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.startEdge(); } rdvService.generateEvent(RendezvousEvent.BECAMEEDGE, group.getPeerID()); long choiceDelay; // When quickStart is true, start-up is fast. This is typically // used when useOnlySeeds is true, on the grounds // that the choice is normaly very limited and that all candidates // are queried early on, and at roughly the same time. So, the // first one that responds will do just as well. // Else, we do compute a dead line that will allow for the // peerview to fill up a bit. if (useOnlySeeds) { choiceDelay = 0; } else { // If there are already peers in the peer view, then it is // worth scheduling the monitor; else, wait for an ADD event. // Else we'll schedule it according to how many peers we // already have in the peerview. If there are enough we could // go immediately. List rpv = rdvService.getLocalWalkView(); int rpvSize = rpv.size(); choiceDelay = Math.max(0, maxChoiceDelay - (rpvSize * ADDEVENT_DELAY)); } monitorStartAt = TimeUtils.toAbsoluteTimeMillis(choiceDelay); timer.schedule(new MonitorTask(), choiceDelay, MONITOR_INTERVAL); rdvService.rpv.addListener(this); return 0; } /** * {@inheritDoc} */ protected synchronized void stopApp() { if (closed) { return; } closed = true; disconnectFromAllRendezVous(); // If edge peers are ever converted to use a walker, // remember to stop the walker here, with: walker.stop(); rdvService.rpv.removeListener(this); super.stopApp(); if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.stopEdge(); } } /** * {@inheritDoc} */ public Vector getConnectedPeerIDs() { return new Vector(); } /** * {@inheritDoc}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -