treemaintainer.java

来自「High performance DB query」· Java 代码 · 共 840 行 · 第 1/2 页

JAVA
840
字号
            }        } else {            childEntry = (TreeNodeEntry) children.get(childID);            int localSpareSlots = getSpareSlots(LOCAL_SLOTS);            boolean tryRedirect = (childEntry == null)                                  ? (localSpareSlots <= 0)                                  : (localSpareSlots < 0);            TreeNodeEntry redirectEntry = (tryRedirect == true)                                          ? determineRedirect(childID)                                          : null;            if ((childEntry == null)                    && ((tryRedirect == false) || (redirectEntry == null))) {                //J-                if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Added New Tree Child Locally",new Object[]{"a",ns,"k",childID,"t",String.valueOf(tryRedirect)}, null));                //J+                childEntry = new TreeNodeEntry(childID, childSocketAddress,                                               childAdvertisedSlots,                                               upstreamLifetime);                children.put(childID, childEntry);                updateHeight(message.getHeight());            }            if ((childEntry != null)                    && ((tryRedirect == false) || (redirectEntry == null))) {                //J-                if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Updating Tree Child", new Object[]{"a",ns,"k",childID,"h",childSocketAddress,"o",String.valueOf(childAdvertisedSlots)}, new Object[]{"w",message}));                //J+                childEntry.updateLifetime(childAdvertisedSlots,                                          upstreamLifetime);                updateHeight(message.getHeight());            }            if ((tryRedirect == true) && (redirectEntry != null)) {                //J-                if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Redirecting Tree Child",new Object[]{"a",ns,"k",childID,"n",String.valueOf(childEntry==null),"f",redirectEntry}, null));                //J+                TreeMessageUpstreamRedirect redirectMessage =                    new TreeMessageUpstreamRedirect(                        selfSocketAddress, selfID,                        redirectEntry.getSocketAddress(),                        redirectEntry.getID());                StatCollector.addSample(                    StatVars.NAMED, StatVars.TREE_PREFIX + ns,                    StatVars.TREE_REDIRECT,                    SerializationManager.getPayloadSize(redirectMessage));                theProvider.send(childSocketAddress, childID, ns, selfIDStr, 0,                                 redirectMessage, 0, false, false);                children.remove(childID);                redirected = true;            }        }        StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns,                                StatVars.TREE_CHILDREN, children.size(),                                StatVars.STAT_DBL_VALUE);        if ((message.getUserMessage() != null)                && ((redirected == false)                    || (message.getDeliverOnRedirect() == true))) {            distributeToClients(true, childEntry, message.getUserMessage(),                                !root);        }    }    private void updateHeight(byte childHeight) {        if (childHeight + 1 > height) {            height = (byte) (childHeight + 1);            StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns,                                    StatVars.TREE_HEIGHT, height,                                    StatVars.STAT_DBL_VALUE);        }    }    /**     * Method processJoinRedirectMessage     *     * @param message     */    protected void processUpstreamRedirectMessage(            TreeMessageUpstreamRedirect message) {        BitID parentID = message.getRouteViaID();        InetSocketAddress parentSocketAddress =            message.getRouteViaSocketAddress();        TreeNodeEntry newParent = new TreeNodeEntry(parentID,                                                    parentSocketAddress, 0,                                                    downstreamLifetime);        //J-        if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Received Redirect Notice", new Object[] {"a",ns,"f",newParent}, new Object[]{"w",message}));        //J+        StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns,                                StatVars.TREE_IPPARENT, 1,                                StatVars.STAT_DBL_VALUE);        parent = newParent;        doUpstreamSend(null, false, StatVars.TREE_IMMEDIATEUPSTREAM);    }    /**     * Method processDownstreamMessage     *     * @param message     */    protected void processDownstreamMessage(TreeMessageDownstream message) {        boolean midway = (children.size() == 0)                         ? false                         : true;        this.root = false;        //J-        if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Updating Tree Parent", new Object[]{"a",ns,"k",message.getSourceID(),"h",message.getSourceSocketAddress()}, new Object[]{"w",message}));        //J+        if (message.getPathToRoot() != null) {            pathToRoot = (ArrayList) message.getPathToRoot().clone();            pathToRoot.add(selfID);            StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns,                                    StatVars.TREE_DEPTH, pathToRoot.size() - 1,                                    StatVars.STAT_DBL_VALUE);        }        parent.updateLifetime(0, downstreamLifetime);        if (message.getUserMessage() != null) {            distributeToClients(false, parent, message.getUserMessage(),                                midway);        }    }    /**     * Method distributeToClients     *     * @param upstream     * @param source     * @param message     * @param midway     */    protected void distributeToClients(boolean upstream, TreeNodeEntry source,                                       Payload message, boolean midway) {        ArrayList clientSet = new ArrayList(clients.size());        Iterator entries = clients.iterator();        while (entries.hasNext()) {            TreeMaintainerClient client = (TreeMaintainerClient) entries.next();            clientSet.add(client);        }        entries = clientSet.iterator();        while (entries.hasNext()) {            TreeMaintainerClient client = (TreeMaintainerClient) entries.next();            if (upstream) {                client.handleDataFromChild(source, message, midway);            } else {                client.handleDataFromParent(source, message, midway);            }        }    }    /**     * Method periodicUpstreamSend     */    protected void periodicUpstreamSend() {        long currentTime = LocalNode.myTimer.getCurrentTimeMS();        long nextRequiredSend = parent.getLastSendTime() + upstreamRenewPeriod;        if (currentTime >= nextRequiredSend) {            doUpstreamSend(null, false, StatVars.TREE_TIMEDUPSTREAM);            nextRequiredSend = parent.getLastSendTime() + upstreamRenewPeriod;        }        long timeTillNextSend = nextRequiredSend - currentTime;        LocalNode.myTimer.scheduleMS(timeTillNextSend, SIGNAL_UPSTREAM, this);    }    /**     * Method periodicDownstreamSend     */    protected void periodicDownstreamSend() {        long currentTime = LocalNode.myTimer.getCurrentTimeMS();        long minTimeTillNextSend = downstreamRenewPeriod;        Iterator entries = children.entrySet().iterator();        while (entries.hasNext()) {            TreeNodeEntry theChild =                (TreeNodeEntry) ((Map.Entry) entries.next()).getValue();            if (theChild.getDeletionTime() < currentTime) {                //J-                if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Periodic Downstream Preparation Deleting Child",new Object[]{"a",ns,"k",theChild.getID(),"e",String.valueOf(theChild.deletionTime)}, null));                //J+                entries.remove();                continue;            }            long nextRequiredSend = theChild.lastSendTime                                    + downstreamRenewPeriod;            if (currentTime >= nextRequiredSend) {                doDownstreamSend(null, theChild, StatVars.TREE_TIMEDDOWNSTREAM);                nextRequiredSend = theChild.getLastSendTime()                                   + downstreamRenewPeriod;            }            long timeTillNextSend = nextRequiredSend - currentTime;            if (timeTillNextSend < minTimeTillNextSend) {                minTimeTillNextSend = timeTillNextSend;            }        }        LocalNode.myTimer.scheduleMS(minTimeTillNextSend, SIGNAL_DOWNSTREAM,                                     this);    }    /**     * Method determineRedirect     *     * @param childID     * @return     */    protected TreeNodeEntry determineRedirect(BitID childID) {        Iterator entries = children.entrySet().iterator();        int currentBestCount = 0;        TreeNodeEntry currentBestChild = null;        while (entries.hasNext()) {            TreeNodeEntry theChild =                (TreeNodeEntry) ((Map.Entry) entries.next()).getValue();            if ((theChild.getAdvertisedSlots() > 0)                    && ( !(theChild.getID().equals(childID)))) {                //J-                if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this,"Redirect Preparation Found Possible Redirect Child",new Object[]{"a",ns,"k",childID,"f",theChild.getID(),"e",String.valueOf(theChild.deletionTime),"z",String.valueOf(theChild.getAdvertisedSlots())}, null));                //J+                if ((currentBestCount == 0)                        || (theChild.getAdvertisedSlots() < currentBestCount)) {                    currentBestCount = theChild.getAdvertisedSlots();                    currentBestChild = theChild;                }            }        }        if (currentBestChild != null) {            //J-            if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Redirect Preparation Found Redirect Child", new Object[]{"a",ns,"k",childID,"f",currentBestChild.getID(),"e",String.valueOf(currentBestChild.deletionTime),"z",String.valueOf(currentBestChild.getAdvertisedSlots())}, null));            //J+            // Proactively update spare slots of redirect target to include additional slots from the child being redirect, minus one to account for the child itself            currentBestChild.decrementSlots();            return currentBestChild;        } else {            //J-            if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Redirect Not Possible", new Object[]{"a",ns,"k",childID}, null));            //J+            StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns,                                    StatVars.TREE_FAILEDREDIRECT, 1);            return null;        }    }    /**     * Method getTotalSpareSlots     *     * @param total     * @return     */    protected int getSpareSlots(boolean total) {        long currentTime = LocalNode.myTimer.getCurrentTimeMS();        int downstreamSpareSlots = 0;        Iterator entries = children.entrySet().iterator();        while (entries.hasNext()) {            TreeNodeEntry theChild =                (TreeNodeEntry) ((Map.Entry) entries.next()).getValue();            if (theChild.getDeletionTime() < currentTime) {                entries.remove();                //J-                if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Calculating Total Spare Slots - Deleting Child",new Object[]{"a",ns,"k",theChild.getID(),"e",String.valueOf(theChild.getDeletionTime())}, null));                //J+            } else {                downstreamSpareSlots += theChild.getAdvertisedSlots();            }        }        // Add this nodes slot capacity        int localUsedSlots = children.size();        int localSpareSlots = maxChildren - localUsedSlots;        int localAdvertisedSpareSlots = (int) (localSpareSlots                                               * advertiseChildrenFraction);        int totalSpareSlots = localAdvertisedSpareSlots + downstreamSpareSlots;        //J-        if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Done Calculating Total Spare Slots", new Object[]{"a",ns,"t",String.valueOf(totalSpareSlots),"c",String.valueOf(localUsedSlots),"z",String.valueOf(localAdvertisedSpareSlots),"x",String.valueOf(downstreamSpareSlots)}, null));        //J+        return (total)               ? totalSpareSlots               : localSpareSlots;    }    /**     * Method handleClock     *     * @param clockData     */    public void handleClock(Object clockData) {        if (state == STATE_JOINED) {            if (clockData == SIGNAL_UPSTREAM) {                periodicUpstreamSend();            }            if (clockData == SIGNAL_DOWNSTREAM) {                periodicDownstreamSend();            }        }    }    /**     * Method getResult     *     * @param ns     * @param rid     * @param item     */    public void getResult(String ns, String rid, Payload[] item) {}    /**     * Method putResult     *     * @param ns     * @param rid     * @param iid     * @param result     */    public void putResult(String ns, String rid, int iid, boolean result) {}    /**     * Method renewalResult     *     * @param ns     * @param rid     * @param iid     * @param result     */    public void renewalResult(String ns, String rid, int iid, boolean result) {}    /**     * Method lscanResult     *     * @param ns     * @param rid     * @param item     */    public void lscanResult(String ns, String rid, Payload item) {}    /**     * Method newDataResult     *     * @param ns     * @param rid     * @param item     * @param renewed     */    public void newDataResult(String ns, String rid, Payload item,                              boolean renewed) {}    /**     * Method messageResult     *     * @param ns     * @param rid     * @param item     * @param midway     * @param local     * @return     */    public boolean messageResult(String ns, String rid, Payload item,                                 boolean midway, boolean local) {        if (state == STATE_JOINED) {            if (item instanceof TreeMessageDownstream) {                processDownstreamMessage((TreeMessageDownstream) item);                return true;            }            if (item instanceof TreeMessageUpstream) {                processUpstreamMessage((TreeMessageUpstream) item);                return true;            }            if (item instanceof TreeMessageUpstreamRedirect) {                processUpstreamRedirectMessage(                    (TreeMessageUpstreamRedirect) item);                return true;            }        }        return false;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?