treemaintainer.java

来自「High performance DB query」· Java 代码 · 共 840 行 · 第 1/2 页

JAVA
840
字号
/* * @(#)$Id: TreeMaintainer.java,v 1.16 2005/09/08 18:08:41 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704.  Attention:  Intel License Inquiry. */package pier.helpers.trees;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.log4j.Logger;import overlay.provider.Provider;import overlay.provider.ProviderClient;import services.LocalNode;import services.Output;import services.network.Payload;import services.stats.StatCollector;import services.stats.StatVars;import services.timer.TimerClient;import util.BitID;import util.logging.StructuredLogMessage;import util.network.serialization.SerializationManager;/** * Class TreeMaintainer * */public class TreeMaintainer implements TimerClient, ProviderClient {    private static Logger logger = Logger.getLogger(TreeMaintainer.class);    private static final Integer SIGNAL_UPSTREAM = new Integer(0);    private static final Integer SIGNAL_DOWNSTREAM = new Integer(1);    private static final boolean TOTAL_SLOTS = true;    private static final boolean LOCAL_SLOTS = false;    private static final int STATE_NEW = 0;    private static final int STATE_READY = 1;    private static final int STATE_JOINED = 2;    private static final int STATE_DONE = 3;    private int state;    protected Provider theProvider;    protected BitID rootID;    protected int maxChildren;    protected double advertiseChildrenFraction;    protected String ns;    protected HashMap children;    protected TreeNodeEntry parent;    protected BitID selfID;    protected InetSocketAddress selfSocketAddress;    protected String selfIDStr;    protected TreeNodeEntry selfEntry;    protected byte height;    protected long upstreamRenewPeriod;    protected long upstreamLifetime;    protected long downstreamRenewPeriod;    protected long downstreamLifetime;    protected TreeDetails treeDetails;    protected ArrayList clients;    protected ArrayList pathToRoot;    protected boolean root;    /**     * Constructor TreeMaintainer     *     */    public TreeMaintainer() {        this.state = STATE_NEW;        this.clients = new ArrayList();        //J-        if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Tree State New", null, null));        //J+    }    /**     * Method init     *     * @param theProvider     * @param rootID     * @param maxChildren     * @param advertiseChildrenFraction     * @param ns     * @param upstreamRenewPeriod     * @param upstreamLifetime     * @param downstreamRenewPeriod     * @param downstreamLifetime     */    public void init(Provider theProvider, BitID rootID, int maxChildren,                     double advertiseChildrenFraction, String ns,                     long upstreamRenewPeriod, long upstreamLifetime,                     long downstreamRenewPeriod, long downstreamLifetime) {        this.theProvider = theProvider;        this.rootID = rootID;        this.maxChildren = maxChildren;        this.advertiseChildrenFraction = advertiseChildrenFraction;        this.ns = ns;        this.upstreamRenewPeriod = upstreamRenewPeriod;        this.upstreamLifetime = upstreamLifetime;        this.downstreamRenewPeriod = downstreamRenewPeriod;        this.downstreamLifetime = downstreamLifetime;        this.treeDetails = new TreeDetails(rootID, maxChildren,                                           advertiseChildrenFraction,                                           upstreamRenewPeriod,                                           upstreamLifetime);        this.parent = new TreeNodeEntry(rootID, null, 0, downstreamLifetime);        this.children = new HashMap();        this.selfID = theProvider.getNodeID();        this.selfSocketAddress = theProvider.getNodeSocketAddress();        this.selfIDStr = selfID.toNumString();        this.selfEntry = new TreeNodeEntry(selfID, selfSocketAddress, 0, 0);        this.height = 0;        this.root = false;        this.state = STATE_READY;        //J-        if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Tree State Ready", new Object[]{"a",ns,"k",rootID}, null));        //J+        enterTree();    }    /**     * Method registerClient     *     * @param client     */    public void registerClient(TreeMaintainerClient client) {        clients.add(client);        enterTree();    }    /**     * Method deregisterClient     *     * @param client     * @return     */    public boolean deregisterClient(TreeMaintainerClient client) {        clients.remove(client);        if (clients.size() == 0) {            exitTree();            return true;        } else {            return false;        }    }    /**     * Method enterTree     */    protected void enterTree() {        if ((state == STATE_READY) && (clients.size() > 0)) {            state = STATE_JOINED;            //J-            if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Tree State Joined", new Object[]{"a",ns,"k",rootID}, null));            //J+            LocalNode.myTimer.scheduleMS(0, SIGNAL_UPSTREAM, this);            LocalNode.myTimer.scheduleMS(0, SIGNAL_DOWNSTREAM, this);            theProvider.message(ns, this, true, true);        }    }    /**     * Method exitTree     */    protected void exitTree() {        if (state != STATE_DONE) {            state = STATE_DONE;            //J-            if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Tree State Done", new Object[]{"a",ns,"k",rootID}, null));            //J+            theProvider.message(ns, this, true, false);        }    }    /**     * Method isFullInit     * @return     */    public boolean isFullInit() {        return (state == STATE_JOINED);    }    /**     * Method isRoot     * @return     */    public boolean isRoot() {        return root;    }    /**     * Method getParent     * @return     */    public TreeNodeEntry getParent() {        if (state == STATE_JOINED) {            if ((root == false)                    && (parent.getDeletionTime()                        < LocalNode.myTimer.getCurrentTimeMS())) {                //J-                if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "GetParent Preparation Deleting Parent",new Object[]{"a",ns,"k",parent.getID(),"e",String.valueOf(parent.deletionTime)}, null));                //J+                this.parent = new TreeNodeEntry(rootID, null, 0,                                                downstreamLifetime);            }            return parent;        } else {            return null;        }    }    /**     * Method getChildren     * @return     */    public HashMap getChildren() {        if (state == STATE_JOINED) {            long currentTime = LocalNode.myTimer.getCurrentTimeMS();            Iterator entries = children.entrySet().iterator();            while (entries.hasNext()) {                TreeNodeEntry theChild =                    (TreeNodeEntry) ((Map.Entry) entries.next()).getValue();                if (theChild.getDeletionTime() < currentTime) {                    //J-    		        if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "GetChildren Preparation Deleting Child",new Object[]{"a",ns,"k",theChild.getID(),"e",String.valueOf(theChild.deletionTime)}, null));	    	        //J+                    entries.remove();                }            }            return children;        } else {            return null;        }    }    /**     * Method sendToParent     *     * @param message     * @param deliverOnRedirect     */    public void sendToParent(Payload message, boolean deliverOnRedirect) {        if (state == STATE_JOINED) {            doUpstreamSend(message, deliverOnRedirect,                           StatVars.TREE_DATAUPSTREAM);        }    }    /**     * Method sentToChildren     *     * @param message     */    public void sendToChildren(Payload message) {        if (state == STATE_JOINED) {            long currentTime = LocalNode.myTimer.getCurrentTimeMS();            Iterator entries = children.entrySet().iterator();            while (entries.hasNext()) {                TreeNodeEntry theChild =                    (TreeNodeEntry) ((Map.Entry) entries.next()).getValue();                if (theChild.getDeletionTime() < currentTime) {                    //J-                    if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Downstream Preparation Deleting Child",new Object[]{"a",ns,"k",theChild.getID(),"e",String.valueOf(theChild.deletionTime)}, null));                    //J+                    entries.remove();                } else {                    doDownstreamSend(message, theChild,                                     StatVars.TREE_DATADOWNSTREAM);                }            }        }    }    /**     * Method sendToClients     *     * @param message     * @param upstream     * @param midway     */    public void sendToClients(Payload message, boolean upstream,                              boolean midway) {        if (state == STATE_JOINED) {            distributeToClients(upstream, selfEntry, message, midway);        }    }    /**     * Method doUpstreamSend     *     * @param message     * @param deliverOnRedirect     * @param statgroup     */    protected void doUpstreamSend(Payload message, boolean deliverOnRedirect,                                  int statgroup) {        int totalSpareSlots = getSpareSlots(TOTAL_SLOTS);        TreeMessageUpstream treeMessage =            new TreeMessageUpstream(selfSocketAddress, selfID, totalSpareSlots,                                    height, message, deliverOnRedirect, null);        int interiorNode = (children.size() == 0)                           ? 1                           : 0;        StatCollector.addSample(StatVars.NAMED, StatVars.TREE_PREFIX + ns,                                StatVars.TREE_INTERIORNODE, interiorNode,                                StatVars.STAT_DBL_VALUE);        //J-        if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Upstream Send to Parent",new Object[]{"a",ns,"k",parent.getID()}, new Object[]{"w",message}));        //J+        if ((root == false)                && (parent.getDeletionTime()                    < LocalNode.myTimer.getCurrentTimeMS())) {            //J-            if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Upstream Send Preparation Deleting Parent",new Object[]{"a",ns,"k",parent.getID(),"e",String.valueOf(parent.deletionTime)}, null));            //J+            this.parent = new TreeNodeEntry(rootID, null, 0,                                            downstreamLifetime);        }        doSend(treeMessage, parent, statgroup);    }    /**     * Method doDowstreamSend     *     * @param message     * @param child     * @param statgroup     */    protected void doDownstreamSend(Payload message, TreeNodeEntry child,                                    int statgroup) {        TreeMessageDownstream treeMessage =            new TreeMessageDownstream(selfSocketAddress, selfID, message,                                      pathToRoot);        //J-        if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this, "Downstream Send to Child",new Object[]{"a",ns,"k",child.getID()}, new Object[]{"w",message}));        //J+        doSend(treeMessage, child, statgroup);    }    /**     * Method doSend     *     * @param treeMessage     * @param node     * @param statgroup     */    protected void doSend(TreeMessage treeMessage, TreeNodeEntry node,                          int statgroup) {        InetSocketAddress nodeSocketAddress = node.getSocketAddress();        BitID nodeID = node.getID();        StatCollector.addSample(            StatVars.NAMED, StatVars.TREE_PREFIX + ns, statgroup,            SerializationManager.getPayloadSize(treeMessage));        if (nodeSocketAddress != null) {            theProvider.send(nodeSocketAddress, nodeID, ns, selfIDStr, 0,                             treeMessage, upstreamLifetime, false, false);        } else {            theProvider.send(nodeID, ns, selfIDStr, 0, treeMessage,                             upstreamLifetime, true, false);        }        node.updateLastSendTime();    }    /**     * Method processJoinMessage     *     * @param message     */    protected void processUpstreamMessage(TreeMessageUpstream message) {        boolean redirected = false;        BitID childID = message.getSourceID();        InetSocketAddress childSocketAddress = message.getSourceSocketAddress();        int childAdvertisedSlots = message.getTotalAdvertisedSlots();        TreeNodeEntry childEntry = null;        // Do not process message from self, this condition happens at root.        if (childID.equals(selfID)) {            //J-            if (Output.debuggingEnabled) logger.debug(new StructuredLogMessage(this,"Root Tree Insert Message",new Object[]{"a",ns,"k",childID,"h",childSocketAddress,"z",String.valueOf(childAdvertisedSlots)}, new Object[]{"w",message}));            //J+            childEntry = selfEntry;            if (root == false) {                root = true;                pathToRoot = new ArrayList();                pathToRoot.add(selfID);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?