chord.java
来自「High performance DB query」· Java 代码 · 共 1,010 行 · 第 1/3 页
JAVA
1,010 行
"Processing a JOIN request FROM:", message.getSourceSocketAddress(), " (", message.getSourceID().toNumString(), ")"})); } // Create response message, this node is the predecessor of new node, this node's successor is new node's successor Finger oldSuccessor = normalFingers.getSuccessor(); if ((message.getSourceID().equals( id)) || (message.getSourceID().equals( oldSuccessor.getID()))) { // If requested id is same as this node, refuse join. JoinRefuse refuse = JoinRefuse.allocate(message.getID(), address); StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_JOINREFUSE, SerializationManager.getPayloadSize(refuse)); LocalNode.myUDPMessenger.send(address, message.getSourceSocketAddress(), refuse); } else { JoinResponse response = JoinResponse.allocate(message.getID(), address, id, address, oldSuccessor.getID(), oldSuccessor.getAddress()); StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_JOINRESPONSE, SerializationManager.getPayloadSize(response)); LocalNode.myUDPMessenger.send(address, message.getSourceSocketAddress(), response); // This should be my new successor normalFingers.insertFinger(message.getSourceID(), message.getSourceSocketAddress(), defaultLatency); } } else { StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_JOINROUTE, SerializationManager.getPayloadSize(message)); routeMessage(message, message.getSourceID(), false, false); } } private void processJoinResponse(JoinResponse message) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Processing a JOIN response FROM:", message.getPredecessorSocketAddress(), " (", message.getPredecessorID().toNumString(), ")"})); } normalFingers.insertFinger(message.getSuccessorID(), message.getSuccessorSocketAddress(), defaultLatency); normalFingers.insertFinger(message.getPredecessorID(), message.getPredecessorSocketAddress(), defaultLatency); stabilize(); maintainFingers(); notifyMappingChange(); } private void processJoinRefuse(JoinRefuse message) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Processing a JOIN refuse FROM:", message.getSourceSocketAddress()})); } this.id = generateRandomID(); init(); sendJoinRequest(); } private void processLookupRequest(Lookup message) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Received LOOKUP request ID: ", String.valueOf(message.getID()), ", FROM:", message.getSourceSocketAddress(), ", SUCC:", normalFingers.getSuccessor().getAddress().getAddress(), "(", normalFingers.getSuccessor().getID().toNumString(), ")"})); } LookupResponse response = LookupResponse.allocate( message.getID(), address, (InetSocketAddress) applications.get( new Long( message.getApplicationID()))); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_LOOKUPRESPONSE, SerializationManager.getPayloadSize(response)); LocalNode.myUDPMessenger.send(address, message.getSourceSocketAddress(), response); } private void processLookupResponse(LookupResponse message, boolean local) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Received LOOKUP response ID: ", String.valueOf(message.getID()), ", FROM:", message.getSourceSocketAddress(), ", NODE:", message.getAnswer()})); } Integer lookupID = new Integer(message.getID()); LocationServiceClient client = (LocationServiceClient) lookupRequests.remove(lookupID); Object requestID = lookupRequestsID.remove(lookupID); if (timeout > 0) { timeoutManager.removeTimeout(lookupID); } if (client != null) { client.lookupResult(message.getAnswer(), requestID, local); } } private void maintainFingers() { if (doRouteMaintenance) { // Determine ID of finger to maintain long i = Math.round(LocalNode.myRandom.nextDouble() * numBits); BitID checkID = id.add(numBits, makeID((int) i)); Maintain request = Maintain.allocate(messageID++, address); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Sending MAINTAIN request for ID: ", checkID.toNumString()})); } routeMessage(request, checkID, false, false); } // Schedule next maintaince period double nextUpdate = (LocalNode.myRandom.nextDouble() * maintainDelay) + (maintainDelay / 2); LocalNode.myTimer.schedule(nextUpdate, SIGNAL_MAINTAIN, this); } private void processMaintainRequest(Maintain message) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Procesing MAINTAIN request FROM:", message.getSourceSocketAddress()})); } MaintainResponse response = MaintainResponse.allocate(message.getID(), address, normalFingers.getSuccessor().getAddress(), normalFingers.getSuccessor().getID()); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_MAINTAINRESPONSE, SerializationManager.getPayloadSize(response)); LocalNode.myUDPMessenger.send(address, message.getSourceSocketAddress(), response); } private void processMaintainResponse(MaintainResponse message) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Received MAINTAIN response FROM:", message.getSourceSocketAddress(), " FINGER:", message.getSuccessorSocketAddress(), " (", message.getSuccessorID().toNumString(), ")"})); } normalFingers.insertFinger(message.getSuccessorID(), message.getSuccessorSocketAddress(), defaultLatency); } /** Notify our successor that we think we are its predecessor */ private void doNotify() { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Sending NOTIFY TO:", normalFingers.getSuccessor().getAddress()})); } Notify request = Notify.allocate(messageID++, address, id); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_NOTIFY, SerializationManager.getPayloadSize(request)); LocalNode.myUDPMessenger.send(address, normalFingers.getSuccessor().getAddress(), request); } private void processNotifyRequest(Notify message) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Received NOTIFY request FROM:", message.getSourceSocketAddress(), "(", message.getPredecessorID().toNumString(), ")"})); } if ((normalFingers.getPredecessor() == null) || message.getPredecessorID().inRange( normalFingers.getPredecessor().getID(), id, false, false)) { normalFingers.insertFinger(message.getPredecessorID(), message.getSourceSocketAddress(), defaultLatency); } notifyMappingChange(); } /** This sends a request to who we think is our successor. It will reply with who it thinks is its predecessor so we can make an update if it is not us */ private void stabilize() { if (doRouteMaintenance) { if ( !id.equals(normalFingers.getSuccessor().getID())) { Stabilize request = Stabilize.allocate(messageID++, address); InetSocketAddress currentSuccessorSocketAddress = normalFingers.getSuccessor().getAddress(); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Sending STABILIZE reuqest TO: ", currentSuccessorSocketAddress})); } StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_STABILIZE, SerializationManager.getPayloadSize(request)); LocalNode.myUDPMessenger.send(address, currentSuccessorSocketAddress, request); } } double nextUpdate = (LocalNode.myRandom.nextDouble() * stabilizeDelay) + (stabilizeDelay / 2); LocalNode.myTimer.schedule(nextUpdate, SIGNAL_STABILIZE, this); } private void processStabilizeRequest(Stabilize message) { InetSocketAddress predecessorSocketAddress = normalFingers.getPredecessor().getAddress(); BitID predecessorID = normalFingers.getPredecessor().getID(); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Procesing STABILIZE request FROM:", message.getSourceSocketAddress(), ", PRED:", predecessorSocketAddress + "(" + predecessorID.toNumString(), ")"})); } StabilizeResponse response = StabilizeResponse.allocate(message.getID(), address, predecessorSocketAddress, predecessorID); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.CHORD_STABILIZERESPONSE, SerializationManager.getPayloadSize(response)); LocalNode.myUDPMessenger.send(address, message.getSourceSocketAddress(), response); } private void processStabilizeResponse(StabilizeResponse message) { if ( !id.equals(message.getPredecessorID())) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Processing STABILIZE response OLD SUCC:", normalFingers.getSuccessor().getAddress(), "(", normalFingers.getSuccessor().getID().toNumString(), "), NEW SUCC:", message.getPredecessorSocketAddress(), "(", message.getPredecessorID().toNumString(), ")"})); } normalFingers.insertFinger(message.getPredecessorID(), message.getPredecessorSocketAddress(), defaultLatency); doNotify(); } } // ID Manipulation Functions /** * Method makeID * * @param i * @return */ public static BitID makeID(int i) { BitID oneBit = new BitID(); oneBit.set(i); return oneBit; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?