treemaintainer.java
来自「High performance DB query」· Java 代码 · 共 840 行 · 第 1/2 页
JAVA
840 行
/* * @(#)$Id: TreeMaintainer.java,v 1.16 2005/09/08 18:08:41 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package pier.helpers.trees;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.log4j.Logger;import overlay.provider.Provider;import overlay.provider.ProviderClient;import services.LocalNode;import services.Output;import services.network.Payload;import services.stats.StatCollector;import services.stats.StatVars;import services.timer.TimerClient;import util.BitID;import util.logging.StructuredLogMessage;import util.network.serialization.SerializationManager;/** * Class TreeMaintainer * */public class TreeMaintainer implements TimerClient, ProviderClient { private static Logger logger = Logger.getLogger(TreeMaintainer.class); private static final Integer SIGNAL_UPSTREAM = new Integer(0); private static final Integer SIGNAL_DOWNSTREAM = new Integer(1); private static final boolean TOTAL_SLOTS = true; private static final boolean LOCAL_SLOTS = false; private static final int STATE_NEW = 0; private static final int STATE_READY = 1; private static final int STATE_JOINED = 2; private static final int STATE_DONE = 3; private int state; protected Provider theProvider; protected BitID rootID; protected int maxChildren; protected double advertiseChildrenFraction; protected String ns; protected HashMap children; protected TreeNodeEntry parent; protected BitID selfID; protected InetSocketAddress selfSocketAddress; protected String selfIDStr; protected TreeNodeEntry selfEntry; protected byte height; protected long upstreamRenewPeriod; protected long upstreamLifetime; protected long downstreamRenewPeriod; protected long downstreamLifetime; protected TreeDetails treeDetails; protected ArrayList clients; protected ArrayList pathToRoot; protected boolean root; /** * Constructor TreeMaintainer * */ public TreeMaintainer() { this.state = STATE_NEW; this.clients = new ArrayList(); //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Tree State New", null, null)); //J+ } /** * Method init * * @param theProvider * @param rootID * @param maxChildren * @param advertiseChildrenFraction * @param ns * @param upstreamRenewPeriod * @param upstreamLifetime * @param downstreamRenewPeriod * @param downstreamLifetime */ public void init(Provider theProvider, BitID rootID, int maxChildren, double advertiseChildrenFraction, String ns, long upstreamRenewPeriod, long upstreamLifetime, long downstreamRenewPeriod, long downstreamLifetime) { this.theProvider = theProvider; this.rootID = rootID; this.maxChildren = maxChildren; this.advertiseChildrenFraction = advertiseChildrenFraction; this.ns = ns; this.upstreamRenewPeriod = upstreamRenewPeriod; this.upstreamLifetime = upstreamLifetime; this.downstreamRenewPeriod = downstreamRenewPeriod; this.downstreamLifetime = downstreamLifetime; this.treeDetails = new TreeDetails(rootID, maxChildren, advertiseChildrenFraction, upstreamRenewPeriod, upstreamLifetime); this.parent = new TreeNodeEntry(rootID, null, 0, downstreamLifetime); this.children = new HashMap(); this.selfID = theProvider.getNodeID(); this.selfSocketAddress = theProvider.getNodeSocketAddress(); this.selfIDStr = selfID.toNumString(); this.selfEntry = new TreeNodeEntry(selfID, selfSocketAddress, 0, 0); this.height = 0; this.root = false; this.state = STATE_READY; //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Tree State Ready", new Object[]{"a",ns,"k",rootID}, null)); //J+ enterTree(); } /** * Method registerClient * * @param client */ public void registerClient(TreeMaintainerClient client) { clients.add(client); enterTree(); } /** * Method deregisterClient * * @param client * @return */ public boolean deregisterClient(TreeMaintainerClient client) { clients.remove(client); if (clients.size() == 0) { exitTree(); return true; } else { return false; } } /** * Method enterTree */ protected void enterTree() { if ((state == STATE_READY) && (clients.size() > 0)) { state = STATE_JOINED; //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Tree State Joined", new Object[]{"a",ns,"k",rootID}, null)); //J+ LocalNode.myTimer.scheduleMS(0, SIGNAL_UPSTREAM, this); LocalNode.myTimer.scheduleMS(0, SIGNAL_DOWNSTREAM, this); theProvider.message(ns, this, true, true); } } /** * Method exitTree */ protected void exitTree() { if (state != STATE_DONE) { state = STATE_DONE; //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Tree State Done", new Object[]{"a",ns,"k",rootID}, null)); //J+ theProvider.message(ns, this, true, false); } } /** * Method isFullInit * @return */ public boolean isFullInit() { return (state == STATE_JOINED); } /** * Method isRoot * @return */ public boolean isRoot() { return root; } /** * Method getParent * @return */ public TreeNodeEntry getParent() { if (state == STATE_JOINED) { if ((root == false) && (parent.getDeletionTime() < LocalNode.myTimer.getCurrentTimeMS())) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "GetParent Preparation Deleting Parent",new Object[]{"a",ns,"k",parent.getID(),"e",String.valueOf(parent.deletionTime)}, null)); //J+ this.parent = new TreeNodeEntry(rootID, null, 0, downstreamLifetime); } return parent; } else { return null; } } /** * Method getChildren * @return */ public HashMap getChildren() { if (state == STATE_JOINED) { long currentTime = LocalNode.myTimer.getCurrentTimeMS(); Iterator entries = children.entrySet().iterator(); while (entries.hasNext()) { TreeNodeEntry theChild = (TreeNodeEntry) ((Map.Entry) entries.next()).getValue(); if (theChild.getDeletionTime() < currentTime) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "GetChildren Preparation Deleting Child",new Object[]{"a",ns,"k",theChild.getID(),"e",String.valueOf(theChild.deletionTime)}, null)); //J+ entries.remove(); } } return children; } else { return null; } } /** * Method sendToParent * * @param message * @param deliverOnRedirect */ public void sendToParent(Payload message, boolean deliverOnRedirect) { if (state == STATE_JOINED) { doUpstreamSend(message, deliverOnRedirect, StatVars.TREE_DATAUPSTREAM); } } /** * Method sentToChildren * * @param message */ public void sendToChildren(Payload message) { if (state == STATE_JOINED) { long currentTime = LocalNode.myTimer.getCurrentTimeMS(); Iterator entries = children.entrySet().iterator(); while (entries.hasNext()) { TreeNodeEntry theChild = (TreeNodeEntry) ((Map.Entry) entries.next()).getValue(); if (theChild.getDeletionTime() < currentTime) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Downstream Preparation Deleting Child",new Object[]{"a",ns,"k",theChild.getID(),"e",String.valueOf(theChild.deletionTime)}, null)); //J+ entries.remove(); } else { doDownstreamSend(message, theChild, StatVars.TREE_DATADOWNSTREAM); } } } } /** * Method sendToClients * * @param message * @param upstream * @param midway */ public void sendToClients(Payload message, boolean upstream, boolean midway) { if (state == STATE_JOINED) { distributeToClients(upstream, selfEntry, message, midway); } } /** * Method doUpstreamSend * * @param message * @param deliverOnRedirect * @param statgroup */ protected void doUpstreamSend(Payload message, boolean deliverOnRedirect, int statgroup) { int totalSpareSlots = getSpareSlots(TOTAL_SLOTS); TreeMessageUpstream treeMessage = new TreeMessageUpstream(selfSocketAddress, selfID, totalSpareSlots, height, message, deliverOnRedirect, null); int interiorNode = (children.size() == 0) ? 1 : 0; StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns, StatVars.TREE_INTERIORNODE, interiorNode, StatVars.STAT_DBL_VALUE); //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Upstream Send to Parent",new Object[]{"a",ns,"k",parent.getID()}, new Object[]{"w",message})); //J+ if ((root == false) && (parent.getDeletionTime() < LocalNode.myTimer.getCurrentTimeMS())) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Upstream Send Preparation Deleting Parent",new Object[]{"a",ns,"k",parent.getID(),"e",String.valueOf(parent.deletionTime)}, null)); //J+ this.parent = new TreeNodeEntry(rootID, null, 0, downstreamLifetime); } doSend(treeMessage, parent, statgroup); } /** * Method doDowstreamSend * * @param message * @param child * @param statgroup */ protected void doDownstreamSend(Payload message, TreeNodeEntry child, int statgroup) { TreeMessageDownstream treeMessage = new TreeMessageDownstream(selfSocketAddress, selfID, message, pathToRoot); //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Downstream Send to Child",new Object[]{"a",ns,"k",child.getID()}, new Object[]{"w",message})); //J+ doSend(treeMessage, child, statgroup); } /** * Method doSend * * @param treeMessage * @param node * @param statgroup */ protected void doSend(TreeMessage treeMessage, TreeNodeEntry node, int statgroup) { InetSocketAddress nodeSocketAddress = node.getSocketAddress(); BitID nodeID = node.getID(); StatCollector.addSample( StatVars.NAMED, StatVars.TREE_PREFIX + ns, statgroup, SerializationManager.getPayloadSize(treeMessage)); if (nodeSocketAddress != null) { theProvider.send(nodeSocketAddress, nodeID, ns, selfIDStr, 0, treeMessage, upstreamLifetime, false, false); } else { theProvider.send(nodeID, ns, selfIDStr, 0, treeMessage, upstreamLifetime, true, false); } node.updateLastSendTime(); } /** * Method processJoinMessage * * @param message */ protected void processUpstreamMessage(TreeMessageUpstream message) { boolean redirected = false; BitID childID = message.getSourceID(); InetSocketAddress childSocketAddress = message.getSourceSocketAddress(); int childAdvertisedSlots = message.getTotalAdvertisedSlots(); TreeNodeEntry childEntry = null; // Do not process message from self, this condition happens at root. if (childID.equals(selfID)) { //J- if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this,"Root Tree Insert Message",new Object[]{"a",ns,"k",childID,"h",childSocketAddress,"z",String.valueOf(childAdvertisedSlots)}, new Object[]{"w",message})); //J+ childEntry = selfEntry; if (root == false) { root = true; pathToRoot = new ArrayList(); pathToRoot.add(selfID);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?