📄 can.java
字号:
/* * @(#)$Id: Can.java,v 1.17 2004/12/31 19:53:43 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package overlay.location.can;import org.apache.log4j.Logger;import overlay.location.LocationService;import overlay.location.LocationServiceClient;import overlay.location.can.payload.*;import services.LocalNode;import services.Output;import services.network.Payload;import services.network.udp.UDPClient;import services.stats.StatCollector;import services.stats.StatVars;import services.timer.TimerClient;import util.BitID;import util.logging.LogMessage;import util.network.serialization.SerializationManager;import util.timer.timeouts.TimeoutManager;import util.timer.timeouts.TimeoutManagerClient;import java.net.InetAddress;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;/** * A resource location layer protocol. It implements the Content-Addressable Network of Ratnasamy, Francis, Shenker, Handley and Karp * A node may only participate in one CAN at any one time in this implementation. Also, none of the optimizations are included. This is a bare-bones CAN. */public class Can implements LocationService, UDPClient, TimerClient, TimeoutManagerClient { private static Logger logger = Logger.getLogger(Can.class); private Integer portNumber; private InetSocketAddress mySocketAddress; private Zone zone; private int version; private Neighborhood neighborhood; /** Waiting for this newcomer to take part of my zone */ private InetSocketAddress pendingNewcomer; private Zone pendingNewcomerZone; private Zone myPendingZone; /** The interval arrays used in splits and leaves */ private double[][] myTempIntervals; private double[][] otherTempIntervals; /** The next split dimension */ private int nextSplit; /** The join for which I'm still expecting a response. Null if I'm not seeking to join anywhere. */ private int joinID; /** The leave for which I'm still expecting a response. Null if I'm not seeking to leave anywhere. */ private int leaveID; /** Node of first contact, stored in case of a retry */ private InetSocketAddress landmarkNode; /** My update count. When it reaches the DEEP_UPDATE_COUNT it is reset to 0. */ private int updateCount; /** Global outgoing message ID for all client requests */ private int messageID; /** Pending callbacks lookup table. */ private HashMap lookupResponseClient; private HashMap lookupResponseIdentifier; /** Clients that get notified of mapping change */ private ArrayList localClients; /** Application mappings */ private HashMap applications, applicationsCallback; /** Timeout manager */ private TimeoutManager timeoutManager; /** active part of CAN */ private boolean active; private boolean doRouteMaintenance = true; public static int DIM; public static int idLength; private double MIN_UPDATE; private double MAX_UPDATE; private double MIN_RETRY; private double MAX_RETRY; private int DEEP_UPDATE_COUNT; private int DEPTH; private double MAX_MUTE; private double CLEANUP; private double TIMEOUT; private static final Integer UPDATE_SIGNAL = new Integer(0); private static final Integer CLEANUP_SIGNAL = new Integer(1); private static final Integer RETRY_SIGNAL = new Integer(2); private static final Integer RETRY_SIGNAL_LEAVE = new Integer(3); private static final int MAX_BITS = 64; /** * Create a new CAN node given its network, StatCollector and clock layers, as well as its port number. * * @param canPortNumber * @param canDim * @param idLength * @param canMinUpdate * @param canMaxUpdate * @param canMinRetry * @param canMaxRetry * @param canDeepUpdateCount * @param canUpdateDepth * @param canMaxMute * @param canCleanupInterval * @param canTimeout */ public Can(int canPortNumber, int canDim, int idLength, double canMinUpdate, double canMaxUpdate, double canMinRetry, double canMaxRetry, int canDeepUpdateCount, int canUpdateDepth, double canMaxMute, double canCleanupInterval, double canTimeout) { this.portNumber = new Integer(canPortNumber); this.DIM = canDim; this.idLength = (idLength > MAX_BITS) ? MAX_BITS : idLength; this.MIN_UPDATE = canMinUpdate; this.MAX_UPDATE = canMaxUpdate; this.MIN_RETRY = canMinRetry; this.MAX_RETRY = canMaxRetry; this.DEEP_UPDATE_COUNT = canDeepUpdateCount; this.DEPTH = canUpdateDepth; this.MAX_MUTE = canMaxMute; this.CLEANUP = canCleanupInterval; this.TIMEOUT = canTimeout; mySocketAddress = new InetSocketAddress(LocalNode.myIPAddress, canPortNumber); myTempIntervals = new double[DIM][2]; otherTempIntervals = new double[DIM][2]; zone = Zone.FULL; version = 0; neighborhood = new Neighborhood(); nextSplit = 0; pendingNewcomer = null; joinID = -1; // -1 for no current joins leaveID = -1; // -1 for no current leaves updateCount = 0; messageID = 0; localClients = new ArrayList(); applications = new HashMap(); applicationsCallback = new HashMap(); lookupResponseClient = new HashMap(); lookupResponseIdentifier = new HashMap(); timeoutManager = new TimeoutManager(); active = false; } /** * Method fakeJoin * * @param newZone * @return */ public Object[] fakeJoin(Zone newZone) { this.zone = newZone; StatCollector.addSample(StatVars.MISC_B, StatVars.LOCATION_SERVICE, StatVars.ID_SPACE, this.zone.getArea()); version++; join((InetSocketAddress) null); Object[] retArray = new Object[3]; retArray[0] = newZone; retArray[1] = mySocketAddress; retArray[2] = neighborhood; return retArray; } /** * Instruct the location service to startup and join the network * @param landmarkNode the network address of a node in the system to bootstrap with */ public void join(InetSocketAddress landmarkNode) { SerializationManager.registerClass("overlay.location.can.Neighbor"); SerializationManager.registerClass("overlay.location.can.Street"); SerializationManager.registerClass("overlay.location.can.Zone"); SerializationManager.registerClass( "overlay.location.can.payload.Accept"); SerializationManager.registerClass( "overlay.location.can.payload.AcceptLeave"); SerializationManager.registerClass( "overlay.location.can.payload.CANMessage"); SerializationManager.registerClass( "overlay.location.can.payload.Introduce"); SerializationManager.registerClass( "overlay.location.can.payload.Leave"); SerializationManager.registerClass( "overlay.location.can.payload.LeaveComplete"); SerializationManager.registerClass( "overlay.location.can.payload.LeaveUpdate"); SerializationManager.registerClass( "overlay.location.can.payload.Lookup"); SerializationManager.registerClass( "overlay.location.can.payload.LookupResponse"); SerializationManager.registerClass( "overlay.location.can.payload.Message"); SerializationManager.registerClass( "overlay.location.can.payload.Refuse"); SerializationManager.registerClass( "overlay.location.can.payload.RefuseLeave"); SerializationManager.registerClass( "overlay.location.can.payload.Route"); SerializationManager.registerClass( "overlay.location.can.payload.Update"); SerializationManager.registerClass( "overlay.location.can.payload.Welcome"); if (LocalNode.myUDPMessenger.listen(portNumber, this) == false) { throw new RuntimeException("Duplicate use of port number " + portNumber); } if (landmarkNode != null) { this.landmarkNode = landmarkNode; // Keep a record of the join node in case re-join neccessary if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Sending INTRODUCE message to landmark ", landmarkNode})); } sendIntroduce(); } else { StatCollector.addSample(StatVars.MISC_B, StatVars.LOCATION_SERVICE, StatVars.ID_SPACE, 1); active = true; // Begin processing route/lookups/etc LocalNode.myTimer.schedule(CLEANUP, CLEANUP_SIGNAL, this); // Schedule the first cleanup LocalNode.myTimer.schedule( (MAX_UPDATE - MIN_UPDATE) * LocalNode.myRandom.nextDouble() + MIN_UPDATE, UPDATE_SIGNAL, this); } } /** * Method join * * @param landmarkNodes */ public void join(InetSocketAddress[] landmarkNodes) { if (landmarkNodes.length > 0) { join(landmarkNodes[0]); } } /** * Method getLocationID * @return */ public BitID getLocationID() { return getBitID(zone.getCenter()); } /** Send a new INTRODUCE message to my node of first contact */ private void sendIntroduce() { // Pick random coordinates double[] coordinates = new double[DIM]; for (int i = 0; i < DIM; i++) { coordinates[i] = LocalNode.myRandom.nextDouble(); } joinID = messageID++; Introduce introduce = Introduce.allocate(joinID, mySocketAddress); Route route = Route.allocate(joinID, mySocketAddress, coordinates, introduce, false); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Sending INTRODUCE message to ", landmarkNode})); } StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CAN_INTRODUCE, SerializationManager.getPayloadSize(introduce)); StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CAN_ROUTE,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -