chord.java

来自「High performance DB query」· Java 代码 · 共 1,010 行 · 第 1/3 页

JAVA
1,010
字号
                    "Processing a JOIN request FROM:",                    message.getSourceSocketAddress(),                    " (",                    message.getSourceID().toNumString(),                    ")"}));            }            // Create response message, this node is the predecessor of new node, this node's successor is new node's successor            Finger oldSuccessor = normalFingers.getSuccessor();            if ((message.getSourceID().equals(                    id)) || (message.getSourceID().equals(                    oldSuccessor.getID()))) {    // If requested id is same as this node, refuse join.                JoinRefuse refuse = JoinRefuse.allocate(message.getID(),                                                        address);                StatCollector.addSample(                    StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE,                    StatVars.CHORD_JOINREFUSE,                    SerializationManager.getPayloadSize(refuse));                LocalNode.myUDPMessenger.send(address,                                              message.getSourceSocketAddress(),                                              refuse);            } else {                JoinResponse response =                    JoinResponse.allocate(message.getID(), address, id,                                          address, oldSuccessor.getID(),                                          oldSuccessor.getAddress());                StatCollector.addSample(                    StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE,                    StatVars.CHORD_JOINRESPONSE,                    SerializationManager.getPayloadSize(response));                LocalNode.myUDPMessenger.send(address,                                              message.getSourceSocketAddress(),                                              response);                // This should be my new successor                normalFingers.insertFinger(message.getSourceID(),                                           message.getSourceSocketAddress(),                                           defaultLatency);            }        } else {            StatCollector.addSample(                StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE,                StatVars.CHORD_JOINROUTE,                SerializationManager.getPayloadSize(message));            routeMessage(message, message.getSourceID(), false, false);        }    }    private void processJoinResponse(JoinResponse message) {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Processing a JOIN response FROM:",                message.getPredecessorSocketAddress(),                " (",                message.getPredecessorID().toNumString(),                ")"}));        }        normalFingers.insertFinger(message.getSuccessorID(),                                   message.getSuccessorSocketAddress(),                                   defaultLatency);        normalFingers.insertFinger(message.getPredecessorID(),                                   message.getPredecessorSocketAddress(),                                   defaultLatency);        stabilize();        maintainFingers();        notifyMappingChange();    }    private void processJoinRefuse(JoinRefuse message) {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Processing a JOIN refuse FROM:",                message.getSourceSocketAddress()}));        }        this.id = generateRandomID();        init();        sendJoinRequest();    }    private void processLookupRequest(Lookup message) {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Received LOOKUP request ID: ",                String.valueOf(message.getID()),                ", FROM:",                message.getSourceSocketAddress(),                ", SUCC:",                normalFingers.getSuccessor().getAddress().getAddress(),                "(",                normalFingers.getSuccessor().getID().toNumString(),                ")"}));        }        LookupResponse response = LookupResponse.allocate(                                      message.getID(), address,                                      (InetSocketAddress) applications.get(                                          new Long(                                              message.getApplicationID())));        StatCollector.addSample(StatVars.NETWORK_OUT,                                StatVars.LOCATION_SERVICE,                                StatVars.CHORD_LOOKUPRESPONSE,                                SerializationManager.getPayloadSize(response));        LocalNode.myUDPMessenger.send(address,                                      message.getSourceSocketAddress(),                                      response);    }    private void processLookupResponse(LookupResponse message, boolean local) {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Received LOOKUP response ID: ",                String.valueOf(message.getID()),                ", FROM:",                message.getSourceSocketAddress(),                ", NODE:",                message.getAnswer()}));        }        Integer lookupID = new Integer(message.getID());        LocationServiceClient client =            (LocationServiceClient) lookupRequests.remove(lookupID);        Object requestID = lookupRequestsID.remove(lookupID);        if (timeout > 0) {            timeoutManager.removeTimeout(lookupID);        }        if (client != null) {            client.lookupResult(message.getAnswer(), requestID, local);        }    }    private void maintainFingers() {        if (doRouteMaintenance) {            // Determine ID of finger to maintain            long i = Math.round(LocalNode.myRandom.nextDouble() * numBits);            BitID checkID = id.add(numBits, makeID((int) i));            Maintain request = Maintain.allocate(messageID++, address);            if (Output.debuggingEnabled) {                logger.debug(new LogMessage(new Object[]{                    "Sending MAINTAIN request for ID: ",                    checkID.toNumString()}));            }            routeMessage(request, checkID, false, false);        }        // Schedule next maintaince period        double nextUpdate = (LocalNode.myRandom.nextDouble() * maintainDelay)                            + (maintainDelay / 2);        LocalNode.myTimer.schedule(nextUpdate, SIGNAL_MAINTAIN, this);    }    private void processMaintainRequest(Maintain message) {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Procesing MAINTAIN request FROM:",                message.getSourceSocketAddress()}));        }        MaintainResponse response =            MaintainResponse.allocate(message.getID(), address,                                      normalFingers.getSuccessor().getAddress(),                                      normalFingers.getSuccessor().getID());        StatCollector.addSample(StatVars.NETWORK_OUT,                                StatVars.LOCATION_SERVICE,                                StatVars.CHORD_MAINTAINRESPONSE,                                SerializationManager.getPayloadSize(response));        LocalNode.myUDPMessenger.send(address,                                      message.getSourceSocketAddress(),                                      response);    }    private void processMaintainResponse(MaintainResponse message) {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Received MAINTAIN response FROM:",                message.getSourceSocketAddress(),                " FINGER:",                message.getSuccessorSocketAddress(),                " (",                message.getSuccessorID().toNumString(),                ")"}));        }        normalFingers.insertFinger(message.getSuccessorID(),                                   message.getSuccessorSocketAddress(),                                   defaultLatency);    }    /** Notify our successor that we think we are its predecessor */    private void doNotify() {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{"Sending NOTIFY TO:",                                                     normalFingers.getSuccessor().getAddress()}));        }        Notify request = Notify.allocate(messageID++, address, id);        StatCollector.addSample(StatVars.NETWORK_OUT,                                StatVars.LOCATION_SERVICE,                                StatVars.CHORD_NOTIFY,                                SerializationManager.getPayloadSize(request));        LocalNode.myUDPMessenger.send(address,                                      normalFingers.getSuccessor().getAddress(),                                      request);    }    private void processNotifyRequest(Notify message) {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Received NOTIFY request FROM:",                message.getSourceSocketAddress(),                "(",                message.getPredecessorID().toNumString(),                ")"}));        }        if ((normalFingers.getPredecessor() == null)                || message.getPredecessorID().inRange(                    normalFingers.getPredecessor().getID(), id, false, false)) {            normalFingers.insertFinger(message.getPredecessorID(),                                       message.getSourceSocketAddress(),                                       defaultLatency);        }        notifyMappingChange();    }    /** This sends a request to who we think is our successor.  It will reply with who it thinks is its predecessor so we can make an update if it is not us */    private void stabilize() {        if (doRouteMaintenance) {            if ( !id.equals(normalFingers.getSuccessor().getID())) {                Stabilize request = Stabilize.allocate(messageID++, address);                InetSocketAddress currentSuccessorSocketAddress =                    normalFingers.getSuccessor().getAddress();                if (Output.debuggingEnabled) {                    logger.debug(new LogMessage(new Object[]{                        "Sending STABILIZE reuqest TO: ",                        currentSuccessorSocketAddress}));                }                StatCollector.addSample(                    StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE,                    StatVars.CHORD_STABILIZE,                    SerializationManager.getPayloadSize(request));                LocalNode.myUDPMessenger.send(address,                                              currentSuccessorSocketAddress,                                              request);            }        }        double nextUpdate = (LocalNode.myRandom.nextDouble() * stabilizeDelay)                            + (stabilizeDelay / 2);        LocalNode.myTimer.schedule(nextUpdate, SIGNAL_STABILIZE, this);    }    private void processStabilizeRequest(Stabilize message) {        InetSocketAddress predecessorSocketAddress =            normalFingers.getPredecessor().getAddress();        BitID predecessorID = normalFingers.getPredecessor().getID();        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Procesing STABILIZE request FROM:",                message.getSourceSocketAddress(),                ", PRED:",                predecessorSocketAddress + "(" + predecessorID.toNumString(),                ")"}));        }        StabilizeResponse response = StabilizeResponse.allocate(message.getID(),                                         address, predecessorSocketAddress,                                         predecessorID);        StatCollector.addSample(StatVars.NETWORK_OUT,                                StatVars.LOCATION_SERVICE,                                StatVars.CHORD_STABILIZERESPONSE,                                SerializationManager.getPayloadSize(response));        LocalNode.myUDPMessenger.send(address,                                      message.getSourceSocketAddress(),                                      response);    }    private void processStabilizeResponse(StabilizeResponse message) {        if ( !id.equals(message.getPredecessorID())) {            if (Output.debuggingEnabled) {                logger.debug(new LogMessage(new Object[]{                    "Processing STABILIZE response OLD SUCC:",                    normalFingers.getSuccessor().getAddress(),                    "(",                    normalFingers.getSuccessor().getID().toNumString(),                    "), NEW SUCC:",                    message.getPredecessorSocketAddress(),                    "(",                    message.getPredecessorID().toNumString(),                    ")"}));            }            normalFingers.insertFinger(message.getPredecessorID(),                                       message.getPredecessorSocketAddress(),                                       defaultLatency);            doNotify();        }    }    // ID Manipulation Functions    /**     * Method makeID     *     * @param i     * @return     */    public static BitID makeID(int i) {        BitID oneBit = new BitID();        oneBit.set(i);        return oneBit;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?