qlroutingpolicy.java

来自「High performance DB query」· Java 代码 · 共 311 行

JAVA
311
字号
/* * @(#)$Id: QLRoutingPolicy.java,v 1.7 2004/07/02 23:59:21 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704.  Attention:  Intel License Inquiry. */package pier.helpers.routingpolicy;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.log4j.Logger;import pier.data.Tuple;import pier.executors.PutEddy;import pier.operators.Eddy;import services.Output;import util.BitID;import util.logging.LogMessage;/** * Class QLRoutingPolicy * *  the QL policy wants to favor puts that have smaller outstanding puts. *  Thus, every X tuples, we reorder the nextOperator map to put the smallest outstanding tuples up front *  To do this, we need to know the outstanding puts for each operator num.  We have an array of the putEddies *  (and thier associated operator num) this eddy is handling.  Each PutEddy has a getOutstandingTuples method to help *  When we reoptimize, we call getOutstandingTuples on all of our PutEddies and sort the list of PutEddies according to *  the result * */public class QLRoutingPolicy extends RoutingPolicy {    private static Logger logger = Logger.getLogger(QLRoutingPolicy.class);    /*     * map of done bits -> next operator list.  The first operator in the     *  list for a given signature will be the operator chosen     *  Note that there is another way to do this - can also have 'next ready'     *  stored and then compute the next op from that.  TBD how I will actually     *  do this.     */    protected HashMap nextOperatorMap;    /*     * map of op num-> puteddy     */    HashMap putEddies;    // stat collector (opnum->numsends)    HashMap operatorStats;    int tupleCount = 0;    int reorderFrequency = 1;    // TODO: Make this a param    /**     * Constructor QLRoutingPolicy     *     *     * @param sourceInitMap     * @param doneReadyMap     */    public QLRoutingPolicy(HashMap sourceInitMap, HashMap doneReadyMap) {        super(sourceInitMap, doneReadyMap);        operatorStats = new HashMap();        // build the next operator map based on the two maps        nextOperatorMap = buildOperatorMap(sourceInitMap, doneReadyMap);        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{"nextOperatorMap: ",                                                     nextOperatorMap}));        }    }    /**     * Method setPutEddies     *     * @param putEddies     */    public void setPutEddies(HashMap putEddies) {        this.putEddies = putEddies;    }    /**     * Method buildOperatorMap     * We build this map on eddy initialization so we dont have to mess with bit operations at     * runtime.     *     * @param sourceInitMap     * @param doneReadyMap     * @return     */    protected HashMap buildOperatorMap(HashMap sourceInitMap,                                       HashMap doneReadyMap) {        HashMap nextOperatorMap = new HashMap();        // first get the ready bits from the source init map        // get an iterator to the ready bits        Iterator sourceIterator = sourceInitMap.values().iterator();        // iterate by key (done bits)        while (sourceIterator.hasNext()) {            // pull out the ready bits            BitID readyBits = (BitID) (sourceIterator.next());            // the array of possible operators            ArrayList possibleOperators =                new ArrayList(readyBits.cardinality());            // find the bits that are on and thier corresponding position.  Fill in the operator array            int searchIndex = 0;            int operatorIndex = readyBits.nextSetBit(searchIndex);            while (operatorIndex != -1) {                // add this position to the arraylist                possibleOperators.add(new Integer(operatorIndex));                operatorStats.put(new Integer(operatorIndex), new Integer(0));                searchIndex = operatorIndex + 1;                operatorIndex = readyBits.nextSetBit(searchIndex);            }            nextOperatorMap.put(readyBits, possibleOperators);        }        // now make operators from the doneready map        Iterator readyIterator = doneReadyMap.values().iterator();        while (readyIterator.hasNext()) {            BitID readyBits = (BitID) (readyIterator.next());            ArrayList possibleOperators =                new ArrayList(readyBits.cardinality());            // find the bits that are on and thier corresponding position.  Fill in the operator array            int searchIndex = 0;            int operatorIndex = readyBits.nextSetBit(searchIndex);            while (operatorIndex != -1) {                // add this position to the arraylist                possibleOperators.add(new Integer(operatorIndex));                operatorStats.put(new Integer(operatorIndex), new Integer(0));                searchIndex = operatorIndex + 1;                operatorIndex = readyBits.nextSetBit(searchIndex);            }            nextOperatorMap.put(readyBits, possibleOperators);        }        return nextOperatorMap;    }    /**     * method reorderPutEddyList()     * TODO: could implement this more efficiently     */    private void reorderNextOperatorMap() {        // iterate through nextOperatorMap        Iterator values = nextOperatorMap.entrySet().iterator();        while (values.hasNext()) {            Map.Entry entry = (Map.Entry) values.next();            ArrayList ops = (ArrayList) entry.getValue();            // if an entry has more than 1 operator, then it shoudl be a PUT choice            if (ops.size() > 1) {                // for this entry, get the put eddy from the puteddymap and call getoutstandingtuples on it                int minOutstanding = Integer.MAX_VALUE;                for (int x = 0; x < ops.size(); x++) {                    Integer opNum = (Integer) ops.get(x);                    PutEddy pe = (PutEddy) putEddies.get(opNum);                    if (pe == null) {                        if (Output.debuggingEnabled) {                            logger.debug(new LogMessage(new Object[]{                                "Operator ",                                "" + opNum.intValue(),                                " is not a PutEddy!"}));                        }                        return;                    }                    int outstanding = pe.getOutstandingPuts();                    if (outstanding < minOutstanding) {                        // move this op to the front                        if (Output.debuggingEnabled) {                            logger.debug(new LogMessage(new Object[]{"PutEddy ",                                                                     "" + opNum.intValue(),                                                                     " with ",                                                                     "" + outstanding,                                                                     " outstanding tuples being moved to front"}));                        }                        if (x != 0) {    // dont need to move to front if already in front                            Integer tmp = (Integer) ops.get(0);                            ops.set(0, opNum);                            ops.set(x, tmp);                        }                        minOutstanding = outstanding;                    }                }            }        }    }    /**     * Method printOperatorStats     */    public void printOperatorStats() {        Iterator values = operatorStats.entrySet().iterator();        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{"Operator Stats: "}));        }        while (values.hasNext()) {            Map.Entry entry = (Map.Entry) values.next();            Integer op = (Integer) entry.getKey();            Integer num = (Integer) entry.getValue();            if (Output.debuggingEnabled) {                logger.debug(new LogMessage(new Object[]{"" + op.intValue(),                                                         ", "                                                         + num.intValue()}));            }        }    }    /**     * Method getNextOperator     *     *     * Based on the metadata in the tuple (ready, done bits), find     * the next operator to route to according to the routing policy     * Basic just pulls the first operator off the possible list     *     * @param item the tuple to determine the next op for     * @return     */    public int getNextOperator(Tuple item) {        // reoptimize if necessary        tupleCount++;        if (tupleCount == reorderFrequency) {            tupleCount = 0;            reorderNextOperatorMap();        }        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{"Readybits: ",                                                     (BitID) item.getMetadata(                                                         Eddy.READY_BITS)}));        }        // simply look up the next operator in the next op table and return it        ArrayList possibleOps = (ArrayList) nextOperatorMap.get(                                    (BitID) item.getMetadata(Eddy.READY_BITS));        if ((possibleOps == null) || (possibleOps.size() < 1)) {            if (Output.debuggingEnabled) {                logger.debug(new LogMessage(new Object[]{                    "Error finding next op!", }));            }            return -1;        }        // grab the first operator        Integer nextOp = (Integer) possibleOps.get(0);        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{"Next op: ",                                                     nextOp.intValue() + ""}));        }        // if (possibleOps.size() > 1){        // Integer stat = (Integer)operatorStats.get(nextOp);        // operatorStats.put(nextOp, new Integer(((Integer)operatorStats.get(nextOp)).intValue() +1));        // }        return nextOp.intValue();    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?