📄 socketsourceroutemanager.java
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither the name of Rice University (RICE) nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package rice.pastry.socket;import java.io.*;import java.lang.ref.WeakReference;import java.net.*;import java.nio.*;import java.nio.channels.*;import java.util.*;import rice.environment.logging.Logger;import rice.environment.params.Parameters;import rice.environment.random.RandomSource;//import rice.p2p.commonapi.MessageDeserializer;//import rice.p2p.commonapi.AppSocketReceiver;import rice.p2p.commonapi.appsocket.AppSocketReceiver;import rice.p2p.commonapi.exception.NodeIsDeadException;import rice.p2p.commonapi.rawserialization.MessageDeserializer;import rice.pastry.*;import rice.pastry.messaging.*;import rice.pastry.routing.*;import rice.pastry.socket.messaging.*;import rice.selector.*;import rice.selector.TimerTask;/** * Class which keeps track of the best routes to remote nodes. This class is * also therefore in charge of declaring node death and liveness. * * @version $Id: SocketSourceRouteManager.java 3274 2006-05-15 16:17:47Z jeffh $ * @author Alan Mislove */public class SocketSourceRouteManager { // the minimum amount of time between check dead checks on dead routes /** * DESCRIBE THE FIELD */ public long CHECK_DEAD_THROTTLE; // the minimum amount of time between pings /** * DESCRIBE THE FIELD */ public long PING_THROTTLE; /** * DESCRIBE THE FIELD */ public int NUM_SOURCE_ROUTE_ATTEMPTS; // the local pastry node private SocketPastryNode spn; // the socket manager below this manager private SocketCollectionManager manager; // the address of this local node private EpochInetSocketAddress localAddress; private Logger logger; /** * Invariant: if an AM has a message in its queue, it is in here, if not, it * isn't. This prevents outgoing messages from being lost. */ HashSet hardLinks = new HashSet(); /** * EpochInetSocketAddress -> WeakReference(NodeHandle) Note that it is * critical to keep the key == NodeHandle.eaddress. And I mean the same * object!!! not .equals(). The whole memory management will get confused if * this is not the case. */ WeakHashMap nodeHandles = new WeakHashMap(); /** * Constructor * * @param node The local node * @param bindAddress The address which the node should bind to * @param proxyAddress The address which the node should advertise as it's * address * @param random DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ protected SocketSourceRouteManager(SocketPastryNode node, EpochInetSocketAddress bindAddress, EpochInetSocketAddress proxyAddress, RandomSource random) throws IOException { this.spn = node; Parameters p = node.getEnvironment().getParameters(); CHECK_DEAD_THROTTLE = p.getLong("pastry_socket_srm_check_dead_throttle"); PING_THROTTLE = p.getLong("pastry_socket_srm_ping_throttle"); NUM_SOURCE_ROUTE_ATTEMPTS = p.getInt("pastry_socket_srm_num_source_route_attempts"); this.logger = node.getEnvironment().getLogManager().getLogger(SocketSourceRouteManager.class, null); this.manager = new SocketCollectionManager(node, this, bindAddress, proxyAddress, random); this.localAddress = bindAddress; } /** * Gets the Best attribute of the SocketSourceRouteManager object * * @return The Best value */ public HashMap getBest() { HashMap result = new HashMap(); synchronized (nodeHandles) { Iterator i = nodeHandles.keySet().iterator(); while (i.hasNext()) { Object addr = i.next(); WeakReference wr = (WeakReference) nodeHandles.get(addr); if (wr != null) { SocketNodeHandle snh = (SocketNodeHandle) wr.get(); if (snh != null) { AddressManager am = snh.addressManager; if (am != null) { if (am.getLiveness() < SocketNodeHandle.LIVENESS_DEAD) { result.put(addr, am.best); } } } } } } return result; } /** * Method which returns the internal manager * * @return The internal manager */ public SocketCollectionManager getManager() { return manager; } /** * Internal method which returns (or builds) the manager associated with an * address * * @param address The remote address * @param search DESCRIBE THE PARAMETER * @return The AddressManager value */ protected AddressManager getAddressManager(EpochInetSocketAddress address, boolean search) { synchronized (nodeHandles) { AddressManager manager = getAddressManager(address); if (manager == null) { manager = putAddressManager(address, search); } return manager; } } /** * Gets the NodeHandle attribute of the SocketSourceRouteManager object * * @param address DESCRIBE THE PARAMETER * @return The NodeHandle value */ public SocketNodeHandle getNodeHandle(EpochInetSocketAddress address) { synchronized (nodeHandles) { WeakReference wr = (WeakReference) nodeHandles.get(address); if (wr == null) { return null; } SocketNodeHandle ret = (SocketNodeHandle) wr.get(); if (ret == null) { return null; } if (ret.getNodeId() == null) { return null; } return ret; } } /** * Gets the AddressManager attribute of the SocketSourceRouteManager object * * @param address DESCRIBE THE PARAMETER * @return The AddressManager value */ public AddressManager getAddressManager(EpochInetSocketAddress address) { WeakReference wr = (WeakReference) nodeHandles.get(address); if (wr == null) { return null; } SocketNodeHandle snh = (SocketNodeHandle) wr.get(); if (snh == null) { return null; } return snh.addressManager; } /** * Method which returns the last cached liveness value for the given address. * If there is no cached value, then LIVENESS_ALIVE * * @param address The address to return the value for * @return The liveness value */ public int getLiveness(EpochInetSocketAddress address) { return getAddressManager(address, true).getLiveness(); } /** * Internal method which returns a list of all possible routes to a given * address. Currently, this method simply sees if any of the leafset members * are able to reach the node. * * @param destination DESCRIBE THE PARAMETER * @return All possible source routes to the destination */ protected SourceRoute[] getAllRoutes(EpochInetSocketAddress destination) { NodeSet nodes = spn.getLeafSet().neighborSet(NUM_SOURCE_ROUTE_ATTEMPTS); nodes.randomize(spn.getEnvironment().getRandomSource()); Vector result = new Vector(); result.add(SourceRoute.build(destination)); for (int i = 0; i < nodes.size(); i++) { SocketNodeHandle handle = (SocketNodeHandle) nodes.get(i); if ((!handle.isLocal()) && (!handle.getEpochAddress().equals(destination)) && (getBestRoute(handle.getEpochAddress()) != null) && (!getBestRoute(handle.getEpochAddress()).goesThrough(destination))) { result.add(getBestRoute(handle.getEpochAddress()).append(destination)); } } return (SourceRoute[]) result.toArray(new SourceRoute[0]); } /** * Internal method which returns the best known route to the given destination * * @param address The address * @return The BestRoute value */ protected SourceRoute getBestRoute(EpochInetSocketAddress address) { AddressManager am = getAddressManager(address); if ((am == null) || (am.getLiveness() == SocketNodeHandle.LIVENESS_DEAD) || (am.getLiveness() == SocketNodeHandle.LIVENESS_DEAD_FOREVER)) { return null; } else { return am.best; } } /** * Makes this node resign from the network. Is designed to be used for * debugging and testing. * * @exception IOException DESCRIBE THE EXCEPTION */ public void destroy() throws IOException { spn.getEnvironment().getSelectorManager().invoke( new Runnable() { public void run() { try { manager.destroy(); } catch (IOException ioe) { if (logger.level <= Logger.WARNING) { logger.logException("Exception while destrying SocketSourceRouteManager", ioe); } } } }); } /** * DESCRIBE THE METHOD * * @param newHandle DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public NodeHandle coalesce(NodeHandle newHandle) { SocketNodeHandle snh = (SocketNodeHandle) newHandle; synchronized (nodeHandles) { WeakReference wr = (WeakReference) nodeHandles.get(snh.eaddress); if (wr == null) { addNodeHandle(snh); return snh; } else { SocketNodeHandle ret = (SocketNodeHandle) wr.get(); if (ret == null) { // if this happens, then the handle got collected, but not the // eaddress yet. Grumble... addNodeHandle(snh); return snh; } else { // inflates a stub NodeHandle if (ret.getNodeId() == null) { ret.setNodeId(newHandle.getNodeId()); } return ret; } } } } /** * Adds a feature to the NodeHandle attribute of the SocketSourceRouteManager * object * * @param snh The feature to be added to the NodeHandle attribute */ private void addNodeHandle(SocketNodeHandle snh) { WeakReference wr = new WeakReference(snh); nodeHandles.put(snh.eaddress, wr); snh.setLocalNode(spn); } /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -