chord.java

来自「High performance DB query」· Java 代码 · 共 1,010 行 · 第 1/3 页

JAVA
1,010
字号
            client.locationMapChange();        }    }    /**     * Method registerClient     *     * @param client     * @param applicationID     * @param answer     */    public void registerClient(LocationServiceClient client,                               long applicationID, InetSocketAddress answer) {        localClients.add(client);        Long appID = new Long(applicationID);        applications.put(appID, answer);        applicationsCallback.put(appID, client);    }    /**     * Method handleUDPNetwork     *     * @param source     * @param data     */    public void handleUDPNetwork(InetSocketAddress source, Payload data) {        if (data instanceof Join) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_JOIN,                                    SerializationManager.getPayloadSize(data));            processJoinRequest((Join) data);        }        if (data instanceof JoinResponse) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_JOINRESPONSE,                                    SerializationManager.getPayloadSize(data));            processJoinResponse((JoinResponse) data);        }        if (data instanceof JoinRefuse) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_JOINREFUSE,                                    SerializationManager.getPayloadSize(data));            processJoinRefuse((JoinRefuse) data);        }        if (data instanceof Lookup) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_LOOKUP,                                    SerializationManager.getPayloadSize(data));            processLookupRequest((Lookup) data);        }        if (data instanceof LookupResponse) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_LOOKUPRESPONSE,                                    SerializationManager.getPayloadSize(data));            processLookupResponse((LookupResponse) data,                                  source.equals(address));        }        if (data instanceof Maintain) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_MAINTAIN,                                    SerializationManager.getPayloadSize(data));            processMaintainRequest((Maintain) data);        }        if (data instanceof MaintainResponse) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_MAINTAINRESPONSE,                                    SerializationManager.getPayloadSize(data));            processMaintainResponse((MaintainResponse) data);        }        if (data instanceof Message) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_MESSAGE,                                    SerializationManager.getPayloadSize(data));            handleMessageUpCall(null, (Message) data, source.equals(address));        }        if (data instanceof Notify) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_NOTIFY,                                    SerializationManager.getPayloadSize(data));            processNotifyRequest((Notify) data);        }        if (data instanceof Route) {            handleRoute(source, (Route) data, source.equals(address));        }        if (data instanceof RouteResponse) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_ROUTERESPONSE,                                    SerializationManager.getPayloadSize(data));            handleRouteResponse((RouteResponse) data);        }        if (data instanceof Stabilize) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_STABILIZE,                                    SerializationManager.getPayloadSize(data));            processStabilizeRequest((Stabilize) data);        }        if (data instanceof StabilizeResponse) {            StatCollector.addSample(StatVars.NETWORK_IN,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_STABILIZERESPONSE,                                    SerializationManager.getPayloadSize(data));            processStabilizeResponse((StabilizeResponse) data);        }    }    /**     * Method handleClock     *     * @param data     */    public void handleClock(Object data) {        Integer signal = (Integer) data;        if (signal == SIGNAL_STABILIZE) {            stabilize();        }        if (signal == SIGNAL_MAINTAIN) {            maintainFingers();        }    }    /**     * Method handleTimeout     *     * @param item     * @param currentTime     */    public void handleTimeout(Object item, double currentTime) {        Lookup request = Lookup.allocate(((Lookup) item));        timeoutManager.addTimeout(request, new Integer(request.getID()),                                  timeout, this);        StatCollector.addSample(StatVars.NETWORK_OUT,                                StatVars.LOCATION_SERVICE,                                StatVars.CHORD_LOOKUP,                                SerializationManager.getPayloadSize(request));        StatCollector.addSample(StatVars.MISC_A, StatVars.LOCATION_SERVICE,                                StatVars.LOOKUP_TIMEOUTS, 1);        routeMessage(request, request.getSearchID(), true, false);    }    // Routing Handling Functions    /* Put message in routing enevolpe and send it to the predecessor of the destination ID */    private void routeMessage(ChordMessage theMessage, BitID destination,                              boolean toSuccessor, boolean provideUpCalls) {        Route message = Route.allocate(messageID++, address, destination,                                       theMessage, recursive, toSuccessor,                                       provideUpCalls);        forwardMessage(message, true, false);    }    /* Forward message to next hop (recursive), or return to sender with additional routing info (iterative) */    private void forwardMessage(Route message, boolean firstSend,                                boolean lastHop) {        // Increment hop count        message.incrementHops(1);        // If the message has been routed through too many hops, drop it        if (message.getHopCount() > maxHops) {            return;        }        // Determine where to next route the message        Finger bestGuess;        if ( !lastHop) {            bestGuess =                normalFingers.getBestPredecessor(message.getDestinationID(),                                                 null);        } else {            bestGuess = normalFingers.getSuccessor();        }        if ((message.getToSuccessor())                && (message.getDestinationID().inRange(                    normalFingers.getPredecessor().getID(), id, false, true))) {            bestGuess = normalFingers.getSelfFinger();        }        // Charge the bandwidth appropriatly        int size = SerializationManager.getPayloadSize(message);        if (firstSend) {            size -=                SerializationManager.getPayloadSize(message.getTheMessage());        }        // Determine where the immediate destination is, nextHop for recusive, sender for iterative        if (message.getRecursive()) {            StatCollector.addSample(StatVars.NETWORK_OUT,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_ROUTE, size);            LocalNode.myUDPMessenger.send(address, bestGuess.getAddress(),                                          message);        } else {            RouteResponse response =                RouteResponse.allocate(messageID++, address, message,                                       bestGuess.getAddress(),                                       bestGuess.getID());            StatCollector.addSample(StatVars.NETWORK_OUT,                                    StatVars.LOCATION_SERVICE,                                    StatVars.CHORD_ROUTERESPONSE, size);            LocalNode.myUDPMessenger.send(address,                                          message.getSourceSocketAddress(),                                          response);        }    }    /* Determine whether a message had been routed to destination. */    private void handleRoute(InetSocketAddress previousHop, Route message,                             boolean local) {        boolean predecessorNode = message.getDestinationID().inRange(id,                                      normalFingers.getSuccessor().getID(),                                      false, true);        if ((predecessorNode && (message.getToSuccessor() == false))                || (message.getToSuccessor()                    && message.getDestinationID().inRange(                        normalFingers.getPredecessor().getID(), id, false,                        true))) {            // This message has reached its final destination            StatCollector.addSample(StatVars.MISC_A, StatVars.LOCATION_SERVICE,                                    StatVars.HOP_COUNT, message.getHopCount());            StatCollector.addSample(                StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE,                StatVars.CHORD_ROUTE,                SerializationManager.getPayloadSize(message)                - SerializationManager.getPayloadSize(message.getTheMessage()));            handleUDPNetwork(previousHop, message.getTheMessage());            return;        }        boolean oneMoreHop = predecessorNode && message.getToSuccessor();        // Forward to next hop        StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE,                                StatVars.CHORD_ROUTE,                                SerializationManager.getPayloadSize(message));        boolean upCall = false;        if (message.getProvideUpCalls()) {            upCall = handleMessageUpCall(message.getDestinationID(),                                         (Message) message.getTheMessage(),                                         local);        }        if ( !upCall) {            forwardMessage(message, false, oneMoreHop);        }    }    private boolean handleMessageUpCall(BitID destination, Message message,                                        boolean local) {        LocationServiceClient client =            (LocationServiceClient) applicationsCallback.get(                new Long(message.getApplicationID()));        if (client != null) {            return client.routeUpCall(destination, message.getMessage(), local);        }        return false;    }    private void handleRouteResponse(RouteResponse message) {        Route theRoute = message.getTheRoute();        StatCollector.addSample(StatVars.NETWORK_OUT,                                StatVars.LOCATION_SERVICE,                                StatVars.CHORD_ROUTE,                                SerializationManager.getPayloadSize(theRoute));        LocalNode.myUDPMessenger.send(address,                                      message.getNextHopSocketAddress(),                                      theRoute);    }    private void sendJoinRequest() {        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{                "Sending a JOIN request FROM:",                address, " (",                id.toNumString(), "), TO:",                landmarkNode}));        }        Join message = Join.allocate(messageID++, address, id);        StatCollector.addSample(StatVars.NETWORK_OUT,                                StatVars.LOCATION_SERVICE, StatVars.CHORD_JOIN,                                SerializationManager.getPayloadSize(message));        LocalNode.myUDPMessenger.send(address, landmarkNode, message);    }    private void processJoinRequest(Join message) {        // Join messages may arrive via routing or directly, must check if this node should process        if (message.getSourceID().inRange(id,                                          normalFingers.getSuccessor().getID(),                                          false, true)) {            // Process Join, new node in this node's space            if (Output.debuggingEnabled) {                logger.debug(new LogMessage(new Object[]{

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?