chord.java
来自「High performance DB query」· Java 代码 · 共 1,010 行 · 第 1/3 页
JAVA
1,010 行
client.locationMapChange(); } } /** * Method registerClient * * @param client * @param applicationID * @param answer */ public void registerClient(LocationServiceClient client, long applicationID, InetSocketAddress answer) { localClients.add(client); Long appID = new Long(applicationID); applications.put(appID, answer); applicationsCallback.put(appID, client); } /** * Method handleUDPNetwork * * @param source * @param data */ public void handleUDPNetwork(InetSocketAddress source, Payload data) { if (data instanceof Join) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_JOIN, SerializationManager.getPayloadSize(data)); processJoinRequest((Join) data); } if (data instanceof JoinResponse) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_JOINRESPONSE, SerializationManager.getPayloadSize(data)); processJoinResponse((JoinResponse) data); } if (data instanceof JoinRefuse) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_JOINREFUSE, SerializationManager.getPayloadSize(data)); processJoinRefuse((JoinRefuse) data); } if (data instanceof Lookup) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_LOOKUP, SerializationManager.getPayloadSize(data)); processLookupRequest((Lookup) data); } if (data instanceof LookupResponse) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_LOOKUPRESPONSE, SerializationManager.getPayloadSize(data)); processLookupResponse((LookupResponse) data, source.equals(address)); } if (data instanceof Maintain) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_MAINTAIN, SerializationManager.getPayloadSize(data)); processMaintainRequest((Maintain) data); } if (data instanceof MaintainResponse) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_MAINTAINRESPONSE, SerializationManager.getPayloadSize(data)); processMaintainResponse((MaintainResponse) data); } if (data instanceof Message) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_MESSAGE, SerializationManager.getPayloadSize(data)); handleMessageUpCall(null, (Message) data, source.equals(address)); } if (data instanceof Notify) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_NOTIFY, SerializationManager.getPayloadSize(data)); processNotifyRequest((Notify) data); } if (data instanceof Route) { handleRoute(source, (Route) data, source.equals(address)); } if (data instanceof RouteResponse) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_ROUTERESPONSE, SerializationManager.getPayloadSize(data)); handleRouteResponse((RouteResponse) data); } if (data instanceof Stabilize) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_STABILIZE, SerializationManager.getPayloadSize(data)); processStabilizeRequest((Stabilize) data); } if (data instanceof StabilizeResponse) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_STABILIZERESPONSE, SerializationManager.getPayloadSize(data)); processStabilizeResponse((StabilizeResponse) data); } } /** * Method handleClock * * @param data */ public void handleClock(Object data) { Integer signal = (Integer) data; if (signal == SIGNAL_STABILIZE) { stabilize(); } if (signal == SIGNAL_MAINTAIN) { maintainFingers(); } } /** * Method handleTimeout * * @param item * @param currentTime */ public void handleTimeout(Object item, double currentTime) { Lookup request = Lookup.allocate(((Lookup) item)); timeoutManager.addTimeout(request, new Integer(request.getID()), timeout, this); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_LOOKUP, SerializationManager.getPayloadSize(request)); StatCollector.addSample(StatVars.MISC_A, StatVars.LOCATION_SERVICE, StatVars.LOOKUP_TIMEOUTS, 1); routeMessage(request, request.getSearchID(), true, false); } // Routing Handling Functions /* Put message in routing enevolpe and send it to the predecessor of the destination ID */ private void routeMessage(ChordMessage theMessage, BitID destination, boolean toSuccessor, boolean provideUpCalls) { Route message = Route.allocate(messageID++, address, destination, theMessage, recursive, toSuccessor, provideUpCalls); forwardMessage(message, true, false); } /* Forward message to next hop (recursive), or return to sender with additional routing info (iterative) */ private void forwardMessage(Route message, boolean firstSend, boolean lastHop) { // Increment hop count message.incrementHops(1); // If the message has been routed through too many hops, drop it if (message.getHopCount() > maxHops) { return; } // Determine where to next route the message Finger bestGuess; if ( !lastHop) { bestGuess = normalFingers.getBestPredecessor(message.getDestinationID(), null); } else { bestGuess = normalFingers.getSuccessor(); } if ((message.getToSuccessor()) && (message.getDestinationID().inRange( normalFingers.getPredecessor().getID(), id, false, true))) { bestGuess = normalFingers.getSelfFinger(); } // Charge the bandwidth appropriatly int size = SerializationManager.getPayloadSize(message); if (firstSend) { size -= SerializationManager.getPayloadSize(message.getTheMessage()); } // Determine where the immediate destination is, nextHop for recusive, sender for iterative if (message.getRecursive()) { StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_ROUTE, size); LocalNode.myUDPMessenger.send(address, bestGuess.getAddress(), message); } else { RouteResponse response = RouteResponse.allocate(messageID++, address, message, bestGuess.getAddress(), bestGuess.getID()); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_ROUTERESPONSE, size); LocalNode.myUDPMessenger.send(address, message.getSourceSocketAddress(), response); } } /* Determine whether a message had been routed to destination. */ private void handleRoute(InetSocketAddress previousHop, Route message, boolean local) { boolean predecessorNode = message.getDestinationID().inRange(id, normalFingers.getSuccessor().getID(), false, true); if ((predecessorNode && (message.getToSuccessor() == false)) || (message.getToSuccessor() && message.getDestinationID().inRange( normalFingers.getPredecessor().getID(), id, false, true))) { // This message has reached its final destination StatCollector.addSample(StatVars.MISC_A, StatVars.LOCATION_SERVICE, StatVars.HOP_COUNT, message.getHopCount()); StatCollector.addSample( StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_ROUTE, SerializationManager.getPayloadSize(message) - SerializationManager.getPayloadSize(message.getTheMessage())); handleUDPNetwork(previousHop, message.getTheMessage()); return; } boolean oneMoreHop = predecessorNode && message.getToSuccessor(); // Forward to next hop StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.CHORD_ROUTE, SerializationManager.getPayloadSize(message)); boolean upCall = false; if (message.getProvideUpCalls()) { upCall = handleMessageUpCall(message.getDestinationID(), (Message) message.getTheMessage(), local); } if ( !upCall) { forwardMessage(message, false, oneMoreHop); } } private boolean handleMessageUpCall(BitID destination, Message message, boolean local) { LocationServiceClient client = (LocationServiceClient) applicationsCallback.get( new Long(message.getApplicationID())); if (client != null) { return client.routeUpCall(destination, message.getMessage(), local); } return false; } private void handleRouteResponse(RouteResponse message) { Route theRoute = message.getTheRoute(); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_ROUTE, SerializationManager.getPayloadSize(theRoute)); LocalNode.myUDPMessenger.send(address, message.getNextHopSocketAddress(), theRoute); } private void sendJoinRequest() { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Sending a JOIN request FROM:", address, " (", id.toNumString(), "), TO:", landmarkNode})); } Join message = Join.allocate(messageID++, address, id); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_JOIN, SerializationManager.getPayloadSize(message)); LocalNode.myUDPMessenger.send(address, landmarkNode, message); } private void processJoinRequest(Join message) { // Join messages may arrive via routing or directly, must check if this node should process if (message.getSourceID().inRange(id, normalFingers.getSuccessor().getID(), false, true)) { // Process Join, new node in this node's space if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?