treemaintainer.java
来自「High performance DB query」· Java 代码 · 共 840 行 · 第 1/2 页
JAVA
840 行
} } else { childEntry = (TreeNodeEntry) children.get(childID); int localSpareSlots = getSpareSlots(LOCAL_SLOTS); boolean tryRedirect = (childEntry == null) ? (localSpareSlots <= 0) : (localSpareSlots < 0); TreeNodeEntry redirectEntry = (tryRedirect == true) ? determineRedirect(childID) : null; if ((childEntry == null) && ((tryRedirect == false) || (redirectEntry == null))) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Added New Tree Child Locally",new Object[]{"a",ns,"k",childID,"t",String.valueOf(tryRedirect)}, null)); //J+ childEntry = new TreeNodeEntry(childID, childSocketAddress, childAdvertisedSlots, upstreamLifetime); children.put(childID, childEntry); updateHeight(message.getHeight()); } if ((childEntry != null) && ((tryRedirect == false) || (redirectEntry == null))) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Updating Tree Child", new Object[]{"a",ns,"k",childID,"h",childSocketAddress,"o",String.valueOf(childAdvertisedSlots)}, new Object[]{"w",message})); //J+ childEntry.updateLifetime(childAdvertisedSlots, upstreamLifetime); updateHeight(message.getHeight()); } if ((tryRedirect == true) && (redirectEntry != null)) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Redirecting Tree Child",new Object[]{"a",ns,"k",childID,"n",String.valueOf(childEntry==null),"f",redirectEntry}, null)); //J+ TreeMessageUpstreamRedirect redirectMessage = new TreeMessageUpstreamRedirect( selfSocketAddress, selfID, redirectEntry.getSocketAddress(), redirectEntry.getID()); StatCollector.addSample( StatVars.NAMED, StatVars.TREE_PREFIX + ns, StatVars.TREE_REDIRECT, SerializationManager.getPayloadSize(redirectMessage)); theProvider.send(childSocketAddress, childID, ns, selfIDStr, 0, redirectMessage, 0, false, false); children.remove(childID); redirected = true; } } StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns, StatVars.TREE_CHILDREN, children.size(), StatVars.STAT_DBL_VALUE); if ((message.getUserMessage() != null) && ((redirected == false) || (message.getDeliverOnRedirect() == true))) { distributeToClients(true, childEntry, message.getUserMessage(), !root); } } private void updateHeight(byte childHeight) { if (childHeight + 1 > height) { height = (byte) (childHeight + 1); StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns, StatVars.TREE_HEIGHT, height, StatVars.STAT_DBL_VALUE); } } /** * Method processJoinRedirectMessage * * @param message */ protected void processUpstreamRedirectMessage( TreeMessageUpstreamRedirect message) { BitID parentID = message.getRouteViaID(); InetSocketAddress parentSocketAddress = message.getRouteViaSocketAddress(); TreeNodeEntry newParent = new TreeNodeEntry(parentID, parentSocketAddress, 0, downstreamLifetime); //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Received Redirect Notice", new Object[] {"a",ns,"f",newParent}, new Object[]{"w",message})); //J+ StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns, StatVars.TREE_IPPARENT, 1, StatVars.STAT_DBL_VALUE); parent = newParent; doUpstreamSend(null, false, StatVars.TREE_IMMEDIATEUPSTREAM); } /** * Method processDownstreamMessage * * @param message */ protected void processDownstreamMessage(TreeMessageDownstream message) { boolean midway = (children.size() == 0) ? false : true; this.root = false; //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Updating Tree Parent", new Object[]{"a",ns,"k",message.getSourceID(),"h",message.getSourceSocketAddress()}, new Object[]{"w",message})); //J+ if (message.getPathToRoot() != null) { pathToRoot = (ArrayList) message.getPathToRoot().clone(); pathToRoot.add(selfID); StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns, StatVars.TREE_DEPTH, pathToRoot.size() - 1, StatVars.STAT_DBL_VALUE); } parent.updateLifetime(0, downstreamLifetime); if (message.getUserMessage() != null) { distributeToClients(false, parent, message.getUserMessage(), midway); } } /** * Method distributeToClients * * @param upstream * @param source * @param message * @param midway */ protected void distributeToClients(boolean upstream, TreeNodeEntry source, Payload message, boolean midway) { ArrayList clientSet = new ArrayList(clients.size()); Iterator entries = clients.iterator(); while (entries.hasNext()) { TreeMaintainerClient client = (TreeMaintainerClient) entries.next(); clientSet.add(client); } entries = clientSet.iterator(); while (entries.hasNext()) { TreeMaintainerClient client = (TreeMaintainerClient) entries.next(); if (upstream) { client.handleDataFromChild(source, message, midway); } else { client.handleDataFromParent(source, message, midway); } } } /** * Method periodicUpstreamSend */ protected void periodicUpstreamSend() { long currentTime = LocalNode.myTimer.getCurrentTimeMS(); long nextRequiredSend = parent.getLastSendTime() + upstreamRenewPeriod; if (currentTime >= nextRequiredSend) { doUpstreamSend(null, false, StatVars.TREE_TIMEDUPSTREAM); nextRequiredSend = parent.getLastSendTime() + upstreamRenewPeriod; } long timeTillNextSend = nextRequiredSend - currentTime; LocalNode.myTimer.scheduleMS(timeTillNextSend, SIGNAL_UPSTREAM, this); } /** * Method periodicDownstreamSend */ protected void periodicDownstreamSend() { long currentTime = LocalNode.myTimer.getCurrentTimeMS(); long minTimeTillNextSend = downstreamRenewPeriod; Iterator entries = children.entrySet().iterator(); while (entries.hasNext()) { TreeNodeEntry theChild = (TreeNodeEntry) ((Map.Entry) entries.next()).getValue(); if (theChild.getDeletionTime() < currentTime) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Periodic Downstream Preparation Deleting Child",new Object[]{"a",ns,"k",theChild.getID(),"e",String.valueOf(theChild.deletionTime)}, null)); //J+ entries.remove(); continue; } long nextRequiredSend = theChild.lastSendTime + downstreamRenewPeriod; if (currentTime >= nextRequiredSend) { doDownstreamSend(null, theChild, StatVars.TREE_TIMEDDOWNSTREAM); nextRequiredSend = theChild.getLastSendTime() + downstreamRenewPeriod; } long timeTillNextSend = nextRequiredSend - currentTime; if (timeTillNextSend < minTimeTillNextSend) { minTimeTillNextSend = timeTillNextSend; } } LocalNode.myTimer.scheduleMS(minTimeTillNextSend, SIGNAL_DOWNSTREAM, this); } /** * Method determineRedirect * * @param childID * @return */ protected TreeNodeEntry determineRedirect(BitID childID) { Iterator entries = children.entrySet().iterator(); int currentBestCount = 0; TreeNodeEntry currentBestChild = null; while (entries.hasNext()) { TreeNodeEntry theChild = (TreeNodeEntry) ((Map.Entry) entries.next()).getValue(); if ((theChild.getAdvertisedSlots() > 0) && ( !(theChild.getID().equals(childID)))) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this,"Redirect Preparation Found Possible Redirect Child",new Object[]{"a",ns,"k",childID,"f",theChild.getID(),"e",String.valueOf(theChild.deletionTime),"z",String.valueOf(theChild.getAdvertisedSlots())}, null)); //J+ if ((currentBestCount == 0) || (theChild.getAdvertisedSlots() < currentBestCount)) { currentBestCount = theChild.getAdvertisedSlots(); currentBestChild = theChild; } } } if (currentBestChild != null) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Redirect Preparation Found Redirect Child", new Object[]{"a",ns,"k",childID,"f",currentBestChild.getID(),"e",String.valueOf(currentBestChild.deletionTime),"z",String.valueOf(currentBestChild.getAdvertisedSlots())}, null)); //J+ // Proactively update spare slots of redirect target to include additional slots from the child being redirect, minus one to account for the child itself currentBestChild.decrementSlots(); return currentBestChild; } else { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Redirect Not Possible", new Object[]{"a",ns,"k",childID}, null)); //J+ StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns, StatVars.TREE_FAILEDREDIRECT, 1); return null; } } /** * Method getTotalSpareSlots * * @param total * @return */ protected int getSpareSlots(boolean total) { long currentTime = LocalNode.myTimer.getCurrentTimeMS(); int downstreamSpareSlots = 0; Iterator entries = children.entrySet().iterator(); while (entries.hasNext()) { TreeNodeEntry theChild = (TreeNodeEntry) ((Map.Entry) entries.next()).getValue(); if (theChild.getDeletionTime() < currentTime) { entries.remove(); //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Calculating Total Spare Slots - Deleting Child",new Object[]{"a",ns,"k",theChild.getID(),"e",String.valueOf(theChild.getDeletionTime())}, null)); //J+ } else { downstreamSpareSlots += theChild.getAdvertisedSlots(); } } // Add this nodes slot capacity int localUsedSlots = children.size(); int localSpareSlots = maxChildren - localUsedSlots; int localAdvertisedSpareSlots = (int) (localSpareSlots * advertiseChildrenFraction); int totalSpareSlots = localAdvertisedSpareSlots + downstreamSpareSlots; //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Done Calculating Total Spare Slots", new Object[]{"a",ns,"t",String.valueOf(totalSpareSlots),"c",String.valueOf(localUsedSlots),"z",String.valueOf(localAdvertisedSpareSlots),"x",String.valueOf(downstreamSpareSlots)}, null)); //J+ return (total) ? totalSpareSlots : localSpareSlots; } /** * Method handleClock * * @param clockData */ public void handleClock(Object clockData) { if (state == STATE_JOINED) { if (clockData == SIGNAL_UPSTREAM) { periodicUpstreamSend(); } if (clockData == SIGNAL_DOWNSTREAM) { periodicDownstreamSend(); } } } /** * Method getResult * * @param ns * @param rid * @param item */ public void getResult(String ns, String rid, Payload[] item) {} /** * Method putResult * * @param ns * @param rid * @param iid * @param result */ public void putResult(String ns, String rid, int iid, boolean result) {} /** * Method renewalResult * * @param ns * @param rid * @param iid * @param result */ public void renewalResult(String ns, String rid, int iid, boolean result) {} /** * Method lscanResult * * @param ns * @param rid * @param item */ public void lscanResult(String ns, String rid, Payload item) {} /** * Method newDataResult * * @param ns * @param rid * @param item * @param renewed */ public void newDataResult(String ns, String rid, Payload item, boolean renewed) {} /** * Method messageResult * * @param ns * @param rid * @param item * @param midway * @param local * @return */ public boolean messageResult(String ns, String rid, Payload item, boolean midway, boolean local) { if (state == STATE_JOINED) { if (item instanceof TreeMessageDownstream) { processDownstreamMessage((TreeMessageDownstream) item); return true; } if (item instanceof TreeMessageUpstream) { processUpstreamMessage((TreeMessageUpstream) item); return true; } if (item instanceof TreeMessageUpstreamRedirect) { processUpstreamRedirectMessage( (TreeMessageUpstreamRedirect) item); return true; } } return false; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?