qlroutingpolicy.java
来自「High performance DB query」· Java 代码 · 共 311 行
JAVA
311 行
/* * @(#)$Id: QLRoutingPolicy.java,v 1.7 2004/07/02 23:59:21 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package pier.helpers.routingpolicy;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.log4j.Logger;import pier.data.Tuple;import pier.executors.PutEddy;import pier.operators.Eddy;import services.Output;import util.BitID;import util.logging.LogMessage;/** * Class QLRoutingPolicy * * the QL policy wants to favor puts that have smaller outstanding puts. * Thus, every X tuples, we reorder the nextOperator map to put the smallest outstanding tuples up front * To do this, we need to know the outstanding puts for each operator num. We have an array of the putEddies * (and thier associated operator num) this eddy is handling. Each PutEddy has a getOutstandingTuples method to help * When we reoptimize, we call getOutstandingTuples on all of our PutEddies and sort the list of PutEddies according to * the result * */public class QLRoutingPolicy extends RoutingPolicy { private static Logger logger = Logger.getLogger(QLRoutingPolicy.class); /* * map of done bits -> next operator list. The first operator in the * list for a given signature will be the operator chosen * Note that there is another way to do this - can also have 'next ready' * stored and then compute the next op from that. TBD how I will actually * do this. */ protected HashMap nextOperatorMap; /* * map of op num-> puteddy */ HashMap putEddies; // stat collector (opnum->numsends) HashMap operatorStats; int tupleCount = 0; int reorderFrequency = 1; // TODO: Make this a param /** * Constructor QLRoutingPolicy * * * @param sourceInitMap * @param doneReadyMap */ public QLRoutingPolicy(HashMap sourceInitMap, HashMap doneReadyMap) { super(sourceInitMap, doneReadyMap); operatorStats = new HashMap(); // build the next operator map based on the two maps nextOperatorMap = buildOperatorMap(sourceInitMap, doneReadyMap); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"nextOperatorMap: ", nextOperatorMap})); } } /** * Method setPutEddies * * @param putEddies */ public void setPutEddies(HashMap putEddies) { this.putEddies = putEddies; } /** * Method buildOperatorMap * We build this map on eddy initialization so we dont have to mess with bit operations at * runtime. * * @param sourceInitMap * @param doneReadyMap * @return */ protected HashMap buildOperatorMap(HashMap sourceInitMap, HashMap doneReadyMap) { HashMap nextOperatorMap = new HashMap(); // first get the ready bits from the source init map // get an iterator to the ready bits Iterator sourceIterator = sourceInitMap.values().iterator(); // iterate by key (done bits) while (sourceIterator.hasNext()) { // pull out the ready bits BitID readyBits = (BitID) (sourceIterator.next()); // the array of possible operators ArrayList possibleOperators = new ArrayList(readyBits.cardinality()); // find the bits that are on and thier corresponding position. Fill in the operator array int searchIndex = 0; int operatorIndex = readyBits.nextSetBit(searchIndex); while (operatorIndex != -1) { // add this position to the arraylist possibleOperators.add(new Integer(operatorIndex)); operatorStats.put(new Integer(operatorIndex), new Integer(0)); searchIndex = operatorIndex + 1; operatorIndex = readyBits.nextSetBit(searchIndex); } nextOperatorMap.put(readyBits, possibleOperators); } // now make operators from the doneready map Iterator readyIterator = doneReadyMap.values().iterator(); while (readyIterator.hasNext()) { BitID readyBits = (BitID) (readyIterator.next()); ArrayList possibleOperators = new ArrayList(readyBits.cardinality()); // find the bits that are on and thier corresponding position. Fill in the operator array int searchIndex = 0; int operatorIndex = readyBits.nextSetBit(searchIndex); while (operatorIndex != -1) { // add this position to the arraylist possibleOperators.add(new Integer(operatorIndex)); operatorStats.put(new Integer(operatorIndex), new Integer(0)); searchIndex = operatorIndex + 1; operatorIndex = readyBits.nextSetBit(searchIndex); } nextOperatorMap.put(readyBits, possibleOperators); } return nextOperatorMap; } /** * method reorderPutEddyList() * TODO: could implement this more efficiently */ private void reorderNextOperatorMap() { // iterate through nextOperatorMap Iterator values = nextOperatorMap.entrySet().iterator(); while (values.hasNext()) { Map.Entry entry = (Map.Entry) values.next(); ArrayList ops = (ArrayList) entry.getValue(); // if an entry has more than 1 operator, then it shoudl be a PUT choice if (ops.size() > 1) { // for this entry, get the put eddy from the puteddymap and call getoutstandingtuples on it int minOutstanding = Integer.MAX_VALUE; for (int x = 0; x < ops.size(); x++) { Integer opNum = (Integer) ops.get(x); PutEddy pe = (PutEddy) putEddies.get(opNum); if (pe == null) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Operator ", "" + opNum.intValue(), " is not a PutEddy!"})); } return; } int outstanding = pe.getOutstandingPuts(); if (outstanding < minOutstanding) { // move this op to the front if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"PutEddy ", "" + opNum.intValue(), " with ", "" + outstanding, " outstanding tuples being moved to front"})); } if (x != 0) { // dont need to move to front if already in front Integer tmp = (Integer) ops.get(0); ops.set(0, opNum); ops.set(x, tmp); } minOutstanding = outstanding; } } } } } /** * Method printOperatorStats */ public void printOperatorStats() { Iterator values = operatorStats.entrySet().iterator(); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Operator Stats: "})); } while (values.hasNext()) { Map.Entry entry = (Map.Entry) values.next(); Integer op = (Integer) entry.getKey(); Integer num = (Integer) entry.getValue(); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"" + op.intValue(), ", " + num.intValue()})); } } } /** * Method getNextOperator * * * Based on the metadata in the tuple (ready, done bits), find * the next operator to route to according to the routing policy * Basic just pulls the first operator off the possible list * * @param item the tuple to determine the next op for * @return */ public int getNextOperator(Tuple item) { // reoptimize if necessary tupleCount++; if (tupleCount == reorderFrequency) { tupleCount = 0; reorderNextOperatorMap(); } if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Readybits: ", (BitID) item.getMetadata( Eddy.READY_BITS)})); } // simply look up the next operator in the next op table and return it ArrayList possibleOps = (ArrayList) nextOperatorMap.get( (BitID) item.getMetadata(Eddy.READY_BITS)); if ((possibleOps == null) || (possibleOps.size() < 1)) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Error finding next op!", })); } return -1; } // grab the first operator Integer nextOp = (Integer) possibleOps.get(0); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Next op: ", nextOp.intValue() + ""})); } // if (possibleOps.size() > 1){ // Integer stat = (Integer)operatorStats.get(nextOp); // operatorStats.put(nextOp, new Integer(((Integer)operatorStats.get(nextOp)).intValue() +1)); // } return nextOp.intValue(); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?