📄 endpointrouter.java
字号:
/* * * $Id: EndpointRouter.java,v 1.140 2006/06/07 17:26:53 hamada Exp $ * * Copyright (c) 2001-2006 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.endpoint.router;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.ArrayList;import java.util.Collections;import java.util.Enumeration;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.Timer;import java.util.TimerTask;import java.util.Vector;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.document.Element;import net.jxta.document.TextElement;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointListener;import net.jxta.endpoint.EndpointService;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageReceiver;import net.jxta.endpoint.MessageSender;import net.jxta.endpoint.MessageTransport;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.MessengerEvent;import net.jxta.endpoint.MessengerEventListener;import net.jxta.exception.PeerGroupException;import net.jxta.id.ID;import net.jxta.id.IDFactory;import net.jxta.impl.endpoint.IllegalTransportLoopException;import net.jxta.impl.endpoint.LoopbackMessenger;import net.jxta.impl.util.FastHashMap;import net.jxta.impl.util.TimeUtils;import net.jxta.impl.util.TimerThreadNamer;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.platform.Module;import net.jxta.protocol.AccessPointAdvertisement;import net.jxta.protocol.ModuleImplAdvertisement;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.RouteAdvertisement;import org.apache.log4j.Level;import org.apache.log4j.Logger;public class EndpointRouter implements EndpointListener, MessageReceiver, MessageSender, MessengerEventListener, Module { /** * Log4J Logger */ private static transient final Logger LOG = Logger.getLogger(EndpointRouter.class.getName()); /** * Until we decide otherwise, the router is *by definition* handling * peerID addressed messages. */ private final static String routerPName = "jxta"; /** * Router Service Name */ public final static String routerSName = "EndpointRouter"; /** * These are peers which we know multi-hop routes for. * * <p/><ul> * <li>Key is peer id as a {@link et.jxta.endpoint.EndpointAddress}</li> * <li>value is a {@link Route}.</li> * </ul> */ private final Map routedRoutes = new FastHashMap(16); /** * A record of failures. * * <p/><ul> * <li>keys are {@link net.jxta.endpoint.EndpointAddress}.</li> * <li>values are the time of failure as {@link java.lang.Long}.</li> * </ul> */ private final Map triedAndFailed = new HashMap(); /** * local peer ID as a endpointAddress. */ protected EndpointAddress localPeerAddr = null; /** * local Peer ID */ private ID localPeerId = null; /** * endpointservice handle * */ EndpointService endpoint = null; /** * PeerGroup handle */ private PeerGroup group = null; /** * Whenever we initiate connectivity to a peer (creating a direct route). * we remember that we need to send our route adv to that peer. So that * it has a chance to re-establish the connection from its side, if need * be. The route adv is actually sent piggy-backed on the first message * that goes there. * * <p>Values are {@link net.jxta.endpoint.EndpointAddress}. */ private final Set newDestinations = Collections.synchronizedSet(new HashSet()); /** * A pool of messengers categorized by logical address. * This actually is the direct routes map. */ private Destinations destinations; /** * A record of expiration time of known bad routes we received a NACK route * * <p/><ul> * <li>Keys are {@link net.jxta.endpoint.EndpointAddress}.</li> * <li>Values are {@link net.jxta.impl.endpoint.router.BadRoute}.</li> * </ul> * */ private final Map badRoutes = new HashMap(); /** * We record queries when first started and keep them pending for * a while. Threads coming in the meanwhile wait for a result without * initiating a query. Thus threads may wait passed the point where * the query is no-longer pending, and, although they could initiate * a new one, they do not. However, other threads coming later may initiate * a new query. So a query is not re-initiated systematically on a fixed schedule. * This mechanism also serves to avoid infinite recursions if we're looking * for the route to a rendezvous (route queries will try to go there * themselves). * FIXME: jice@jxta.org 20020903 this is approximate. We can do * cleaner/better than that, but it's an inexpensive improvement over what * was there before. * FIXME: tra@jxta.org 20030818 the pending hashmap should be moved * in the routeResolver class as this will allow to only synchronize * on the routeResolver object rather than the router object. * <p/><ul> * <li>Keys are {@link net.jxta.endpoint.EndpointAddress}.</li> * <li>Values are {@link ClearPendingQuery}.</li> * </ul> */ protected final Map pendingQueries = new HashMap(); /** * Timer by which we schedule the clearing of pending queries. */ private final Timer timer = new Timer(true); /** * MAX timeout (seconds) for route discovery after that timeout * the peer will bail out from finding a route */ private final static long MAXFINDROUTE_TIMEOUT = 60L * TimeUtils.ASECOND; /** * How long do we wait (seconds) before retrying to make a connection * to an endpoint destination */ private final static long MAXASYNC_GETMESSENGER_RETRY = 30L * TimeUtils.ASECOND; /** * PeerAdv tracking. * The peer adv is modified every time a new public address is * enabled/disabled. One of such cases is the connection/disconnection * from a relay. Since these changes are to the embedded route adv * and since we may embbed our route adv in messages, we must keep it * up-to-date. */ private PeerAdvertisement lastPeerAdv = null; private int lastModCount = -1; /** * Route info for the local peer (updated along with lastPeerAdv). */ private RouteAdvertisement localRoute = null; /** * Route CM persistent cache */ private final RouteCM routeCM; /** * Route Resolver */ private final RouteResolver routeResolver; /** * MessageTransport Control operation */ public final static Integer GET_ROUTE_CONTROL = new Integer(0); // Return RouteControl Object public final static int RouteControlOp = 0; // Return RouteControl Object protected class ClearPendingQuery extends TimerTask { EndpointAddress pid; volatile boolean failed = false; long timeToRetry = 0; ClearPendingQuery(EndpointAddress pid) { this.pid = pid; // We schedule for one tick at one minute and another at 5 minutes // after the second, we cancel ourselves. timer.schedule(this, 1L * TimeUtils.AMINUTE, 5L * TimeUtils.AMINUTE); timeToRetry = TimeUtils.toAbsoluteTimeMillis(20L * TimeUtils.ASECOND); } /** * {@inheritDoc} */ public void run() { try { if (failed) { // Second tick. // This negative cache info is expired. synchronized (EndpointRouter.this) { pendingQueries.remove(pid); } this.cancel(); } else { // First timer tick. We're done trying. This is now a negative // cache info. For the next 5 minutes that destination fails // immediately unless it unexpectedly gets finaly resolved. failed = true; } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in timer task " + Thread.currentThread().getName() + " for " + pid, all); } } } public synchronized boolean isTimeToRetry() { if (TimeUtils.toRelativeTimeMillis(timeToRetry) > 0) { return false; } // timeToRetry is passed. Set the next time to retry from now. timeToRetry = TimeUtils.toAbsoluteTimeMillis(20L * TimeUtils.ASECOND); return true; } public boolean isFailed() { return failed; } } protected RouteAdvertisement getMyLocalRoute() { // Update our idea of the local peer adv. If it has change, // update our idea of the local route adv. // If nothing has changed, do not do any work. // In either case, return the local route adv as it is after this // refresh. // Race condition possible but tolerable: if two threads discover // the change in the same time, lastPeerAdv and lastModCount // could become inconsistent. That'll be straightened out the // next time someone looks. The inconsistency can only trigger // an extraneous update. PeerAdvertisement newPadv = group.getPeerAdvertisement(); int newModCount = newPadv.getModCount(); if ((lastPeerAdv != newPadv) || (lastModCount != newModCount) || (null == localRoute)) { lastPeerAdv = newPadv; lastModCount = newModCount; } else { // The current version is good. return localRoute; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -