📄 destinations.java
字号:
/* * Copyright (c) 2004-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.endpoint.router;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointService;import net.jxta.endpoint.Messenger;import net.jxta.impl.util.TimeUtils;import java.util.logging.Level;import net.jxta.logging.Logging;import java.util.logging.Logger;import java.lang.ref.Reference;import java.lang.ref.SoftReference;import java.util.*;/** * This class is a repository of wisdom regarding destinations. It also provides * a messenger if there is one. Currently, the wisdom is very limited and is * only about direct destinations (for which a messenger once existed). The * wisdom that can be obtained is: * * <p/><ul> * <li> is there a messenger at hand (incoming or otherwise).</li> * * <li> is it likely that one can be made from this end, should the one we * have break. (the last attempt succeeded, not only incoming, and that was * not long ago).</li> * * <li> is either of the above true, (are we confident we can get a * messenger as of right now one way or the other).</li> * * <li> are we supposed to send a welcome to that destination (we can't * remember having done it).</li> * </ul> * * <p/>This could be extended to manage more of the life cycle, such as knowing * about messengers being resolved or having failed to. This primitive interface * is temporary; it is only meant to replace messengerPool without having to * change the router too much. */class Destinations { /** * Logger */ private final static transient Logger LOG = Logger.getLogger(Destinations.class.getName()); /** * Shared Timer which handles cleanup of expired Wisdom. */ private final static transient Timer cleanup = new Timer("Endpoint Destinations GC", true); private final Map<EndpointAddress, Wisdom> wisdoms = new HashMap<EndpointAddress, Wisdom>(64); /** * If {@code true} then we are shutting down. */ private volatile boolean stopped = false; /** * The endpoint service we are working for. */ private final EndpointService endpoint; /** * The wisdom GC task for this instance. */ private final WisdomGCTask wisdomGC; /** * Stores knowledge about one particular destination. * * <p/>It does not provide any synchronization. This is provided by the Destinations class. */ final class Wisdom { /** * How long we consider that a past outgoingMessenger is an indication * that one is possible in the future. */ static final long EXPIRATION = 10 * TimeUtils.AMINUTE; /** * The channel we last used, if any. They disappear faster than the * canonical, but, as long as the canonical is around, they can be * obtained at a near-zero cost. */ private Reference<Messenger> outgoingMessenger; /** * The channel we last used if it happens to be an incoming messenger. We keep * a strong reference to it. */ private Messenger incomingMessenger; /** * The transport destination address of the messenger we're caching (if * not incoming). */ private EndpointAddress xportDest; /** * This tells when the outgoing messenger information expires. Incoming * messengers have no expiration per se. We draw no conclusion from * their past presence; only current presence. A wisdom is totally * expired (and may thus be removed) when its outgoing messenger * information is expired AND it has no incoming messenger. */ private long expiresAt = 0; /** * When a new destination is added, we're supposed to send our welcome * along with the first message. This tells whether isWelcomeNeeded was * once invoked or not. */ private boolean welcomeNeeded = true; /** * @param messenger The messenger to cache information about. * @param incoming If true, this is an incoming messenger, which means * that if the channel is lost it cannot be re-obtained. It must * strongly referenced until it closes for disuse, or breaks. */ Wisdom(Messenger messenger, boolean incoming) { if (incoming) { addIncomingMessenger(messenger); } else { addOutgoingMessenger(messenger); } } /** * Reports whether a welcome message is needed. The first time we're * asked we say "true". Subsequently, always "false". * * <p/>This assumes that the welcome was sent following the first test. * (so, ask only if you'll send the welcome when told to). * * @return {@code true} if this is the first time this method is invoked. */ synchronized boolean isWelcomeNeeded() { boolean res = welcomeNeeded; welcomeNeeded = false; return res; } boolean addIncomingMessenger(Messenger m) { // If we have no other incoming, we take it. No questions asked. Messenger currentIncoming = getIncoming(); if (currentIncoming == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Accepted new incoming messenger for " + m.getDestinationAddress()); } incomingMessenger = m; return true; } // Now, check reachability (0 port number means no incoming connections). // If the old one looks better, prefer it. // Compute reachability of the new one. String originAddr = m.getDestinationAddress().getProtocolAddress(); int index = originAddr.lastIndexOf(':'); int srcPort = (index != -1) ? Integer.parseInt(originAddr.substring(index + 1)) : 0; boolean reachable = (srcPort != 0); // Compute reachability of the old one. originAddr = currentIncoming.getDestinationAddress().getProtocolAddress(); index = originAddr.lastIndexOf(':'); srcPort = (index != -1) ? Integer.parseInt(originAddr.substring(index + 1)) : 0; boolean currentReachable = (srcPort != 0); // The new one is less reachable than the old one. Keep the old one. if (currentReachable && !reachable) { return false; } incomingMessenger = m; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Accepted new incoming messenger for " + m.getDestinationAddress()); } return true; } boolean addOutgoingMessenger(Messenger m) { if (getOutgoing() != null) { return false; } this.outgoingMessenger = new SoftReference<Messenger>(m); xportDest = m.getDestinationAddress(); expiresAt = TimeUtils.toAbsoluteTimeMillis(EXPIRATION); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Accepted new outgoing messenger for " + xportDest); } return true; } void noOutgoingMessenger() { outgoingMessenger = null; xportDest = null; expiresAt = 0; } /** * Returns an incoming messenger is there is a working one available. * * @return an incoming messenger, null if there's none */ private Messenger getIncoming() { if (incomingMessenger != null) { if ((incomingMessenger.getState() & Messenger.USABLE) != 0) { return incomingMessenger; } // forget about this broken messenger. incomingMessenger = null; } return null; } /** * Returns an outgoingMessenger if there is one or one can be made without delay. * Renews a broken one if it can be. Refreshes expiration time if a messenger is returned. * * @return an outgoing messenger, null if there's none */ private Messenger getOutgoing() { if (outgoingMessenger == null) { return null; } // (If messenger is not null, it means that we also have a xportDest). Messenger messenger = outgoingMessenger.get();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -