aggoperator.nc

来自「nesC写的heed算法」· NC 代码 · 共 695 行 · 第 1/2 页

NC
695
字号
// $Id: AggOperator.nc,v 1.17.4.5 2003/08/26 09:08:14 cssharp Exp $/*									tab:4 * "Copyright (c) 2000-2003 The Regents of the University  of California.   * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice, the following * two paragraphs and the author appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS." * * Copyright (c) 2002-2003 Intel Corporation * All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE      * file. If you do not find these files, copies can be found by writing to * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, Berkeley, CA,  * 94704.  Attention:  Intel License Inquiry. *//* * Authors:	Sam Madden *              Design by Sam Madden, Wei Hong, and Joe Hellerstein * Date last modified:  6/26/02 * * *//** * @author Sam Madden * @author Design by Sam Madden * @author Wei Hong * @author and Joe Hellerstein */includes Aggregates;module AggOperator {  provides {    interface Operator;    command TinyDBError addResults(QueryResult *qr, ParsedQuery *q, Expr *e);    command TinyDBError finalizeAggExpr(QueryResult *qr, ParsedQueryPtr q, Expr *e, char *result_buf);    command short getGroupNoFromQr(QueryResult *qr);  }  uses {    interface MemAlloc;    interface QueryProcessor;    interface TupleIntf;    interface ParsedQueryIntf;    interface ExprEval;    interface Leds;    interface QueryResultIntf;    interface AggregateUse;    command void signalError(TinyDBError err, int lineNo);  }}implementation {    #define SIG_ERR(errNo) call signalError((errNo), __LINE__)    typedef struct {    int numGroups; //how many groups are there    int groupSize; //how many bytes per group    char groupData[1];  //data for groups -- depends on type of aggregate -- of size groupSize * numGroups  } GroupData, *GroupDataPtr, **GroupDataHandle;    typedef struct {    short groupNo;//2    union {      bool empty;      char exprIdx; //idx of operator that owns us    } u;//3    char aggdata[1];//aggregate-dependent data of variable size    //size of this space returned as part of groupSize() value  } __attribute__((packed)) GroupRecord;  typedef void (*GroupDataCallback)(GroupRecord *d);  GroupDataCallback mCallback;  ParsedQuery *mCurQuery;  Expr *mCurExpr;  QueryResult *mCurResult;  Tuple *mCurTuple;  short mCurGroup;  Handle mAlloced;  GroupRecord *mGroupRecord;  enum {    QUERY_DONE_STATE = 0x1010, //magic code indicating query is done!  };  /* -------------------------------- Local prototypes -------------------------------- */  void getGroupData(ParsedQuery *pq, short groupNo , Expr *e, GroupDataCallback callback);  void updateGroupForTuple(GroupRecord *d);  short groupSize(Expr *e);  void mergeAggregateValues(GroupRecord *dest, GroupRecord *merge, Expr *e);  void updateAggregateValue(GroupRecord *d, Expr *e, short fieldValue);  void initAggregateValue(GroupRecord *d, Expr * e, bool isFirstTime);  void updateGroupForPartialResult(GroupRecord *d);  bool aggHasData(GroupRecord *gr, Expr *e);  GroupRecord *findGroup(GroupDataHandle dh, short groupNum);  GroupRecord *addGroup(GroupDataHandle dh, short groupNum);  bool removeEmptyGroup(GroupDataHandle dh);  short getGroupNo(Tuple *t, ParsedQuery *q, Expr *e);  bool aggEqual(GroupRecord *r1, GroupRecord *r2, Expr *e);  //void addValToTemporalAggState(TemporalAggState *tas, short value);  //short	addAdpDeltaDataToTemporalAggState(TemporalAggState *tas, short lastVal, short newVal, uint16_t epoch);  //void addAvgdataToTemporalAggState(TemporalAggState *tas, short sum, short count);  task void fireCallback();/* -------------------------------- Local Routines for getting / setting group records  -------------------------------- */  GroupRecord *GET_GROUP_DATA(GroupDataHandle dHan,uint8_t n) {    return (GroupRecord *)&((**dHan).groupData[(n) * (**dHan).groupSize]);  }  void SET_GROUP_DATA(GroupDataHandle dHan,uint8_t n,char *dataptr) {    memcpy(GET_GROUP_DATA(dHan,n), (const char *)(dataptr), (**dHan).groupSize);  }    void COPY_GROUP_DATA(GroupDataHandle dHan,uint8_t n,char *dest) {    memcpy((char *)(dest),(const char *)GET_GROUP_DATA(dHan,n), (**dHan).groupSize);  }  /* --gr->aggdata------------ Functions -------------------------------- */    /* Reset the state that this operator stores for the specified expression       (Called every epoch)       Operator state is appended to the expression (ick!) , in the opState handle --         each expression is owned by exactly one operator, which may store state there.    */    //Given a query-result from a different node, add the result into    //the locally computed value for the query.  The locally computed    //value is stored with the expression.    command TinyDBError Operator.processPartialResult(QueryResultPtr qr, ParsedQueryPtr qs, ExprPtr e) {      GroupRecord *gr = (GroupRecord *)qr->d.data;            if (qr->qrType != kAGG_SINGLE_FIELD) return err_InvalidAggregateRecord;      if (gr->u.exprIdx != e->idx)	return err_InvalidAggregateRecord; //not for us      if (e->opState == (OperatorStateHandle)QUERY_DONE_STATE) return err_NoError;            mCurExpr = e;      mCurQuery = qs;      mCurResult = qr;      dbg(DBG_USR3, "AggOperator.processPartialResult: Calling getGroupData\n");            getGroupData(qs,gr->groupNo, e, &updateGroupForPartialResult);      return err_NoError;    }    //check and see if this result represents the evaluation of this expression    command bool Operator.resultIsForExpr(QueryResultPtr qr, ExprPtr e) {      GroupRecord *gr = (GroupRecord *)qr->d.data;      bool result = TRUE;      if (qr->qrType != kAGG_SINGLE_FIELD) result = FALSE;      if (gr->u.exprIdx != e->idx) result = FALSE;      //dbg(DBG_USR3, "resultIsForExpr: %d\n", result);      return result;    }      void updateGroupForPartialResult(GroupRecord *d) {    GroupRecord *gr = (GroupRecord *)((QueryResult *)mCurResult)->d.data;        dbg(DBG_USR3, "updateGroupForPartialResult called\n");        mergeAggregateValues(d,gr,mCurExpr);    signal Operator.processedResult(mCurResult, mCurQuery, mCurExpr);  }      /* Return the next result to this query.        If there are no more results, return err_NoMoreResults        qr->result_idx should be set to kFIRST_RESULT if the first result is desired,    or to the previous value of qr->result_idx returned by the last invocation    of AGG_OPERATOR_NEXT_RESULT      */  command TinyDBError Operator.nextResult(QueryResultPtr qr, ParsedQueryPtr qs, ExprPtr e) {    GroupDataHandle gdh = (GroupDataHandle)e->opState;    bool empty = TRUE;    GroupRecord *gr;    //if (!aggHasData(qr,qs,e)) return err_NoMoreResults;    if (gdh != NULL) {      short idx = qr->result_idx;          do {  //loop til we find the next non-empty group		if (idx == kFIRST_RESULT) idx = 0;	else idx++;			  	if (idx >= (**gdh).numGroups) return err_NoMoreResults;	qr->result_idx = idx;	qr->qid = qs->qid;	//just a single field result	qr->qrType = kAGG_SINGLE_FIELD;	//copy the data into the buffer	gr = GET_GROUP_DATA(gdh,idx);	//TODO: following hasData call is the SOURCE OF INEFFICIENCY	if (!gr->u.empty && aggHasData(gr,e)) empty = FALSE; //don't output empty results      } while (empty);      COPY_GROUP_DATA(gdh,idx,qr->d.data);      gr = (GroupRecord *)qr->d.data;      gr->u.exprIdx = e->idx;    } else      return err_InvalidAggregateRecord;    return err_NoError;  }  /** Install all of the results for the last epoch of  expression e from pq into qr.   */  command TinyDBError addResults(QueryResult *qr, ParsedQuery *pq, ExprPtr e) {    int i = 0;    GroupRecord *gr;    GroupDataHandle gdh = (GroupDataHandle)e->opState;    TinyDBError err;    if (gdh == NULL) return err_NoError;    //if (!aggHasData(qr,pq,e)) return err_NoError;    for (i = 0; i < (**gdh).numGroups; i++) {      gr = GET_GROUP_DATA(gdh, i);      if (!gr->u.empty && aggHasData(gr,e)) {	// XXX hack! modify epoch number here for temporal aggregates	// that may deliver tuples out of order	if (e->ex.agg.op == kADP_DELTA)	  qr->epoch = ((TemporalAdpDeltaData *)gr->aggdata)->lastResult.epoch;				err = call QueryResultIntf.addAggResult(qr, gr->groupNo, (char *)gr, (**gdh).groupSize, pq, e->idx);		if (err != err_NoError) {	  return err;	}	gr->u.exprIdx = e->idx;      }          }    return err_NoError;  }  //Given a tuple built locally, add the result into the locally computed  //value for the query  command TinyDBError Operator.processTuple(ParsedQueryPtr qs, TuplePtr t, ExprPtr e) {    mCurExpr = e;    mCurQuery = qs;    mCurTuple = t;    dbg(DBG_USR1,"in PROCESS_TUPLE, expr = %x\n", (unsigned int)mCurExpr);//fflush(stdout);    if (e->opState == (OperatorStateHandle)QUERY_DONE_STATE) return err_NoError;    getGroupData(qs, getGroupNo(t,qs,e) , e, &updateGroupForTuple);    return err_NoError;  }  //epoch ended -- give aggregates a chance to reset themselves,  //if needed (some aggregates, like windowed and exponentially  //decaying averages won't reset on every epoch.)  command result_t Operator.endOfEpoch(ParsedQueryPtr q, ExprPtr e) {    GroupDataHandle gdh = (GroupDataHandle)e->opState;    GroupRecord *gr;    if (e->opType != kSEL && gdh != NULL) {      short i;      for (i = 0; i < (**gdh).numGroups; i++) {	gr = GET_GROUP_DATA(gdh,i);	initAggregateValue(gr, e, FALSE);      }        }    return SUCCESS;  }  // finished a query  event result_t QueryProcessor.queryComplete(ParsedQuery *q) {    short i;      for (i = 0; i < q->numExprs; i++) {      Expr e = (call ParsedQueryIntf.getExpr(q, i));      if (e.opType != kSEL && (e.opState !=  NULL)) {	call MemAlloc.free((Handle)e.opState);	dbg(DBG_USR1, "Agg: cleaning up.\n");	e.opState = (OperatorStateHandle)QUERY_DONE_STATE; //mark it as empty!      }    }    return SUCCESS;  }  //callback from getGroupData for PROCESS_TUPLE  //given the location of the aggregate record  //for the new tuple, update it  void updateGroupForTuple(GroupRecord *d) {    Tuple *t = mCurTuple;    Expr *e = mCurExpr;    ParsedQuery *q = mCurQuery;    char *fieldBytes = call TupleIntf.getFieldPtr(q, t, (char)e->ex.agg.field);    short size = call TupleIntf.fieldSize(q, (char)e->ex.agg.field);    short fieldVal = 0;    short i;    for (i = 0; i < size; i++) {      unsigned char b = (*fieldBytes++);      fieldVal += ((unsigned short)b)<<(i*8);    }      updateAggregateValue(d,e,fieldVal);    signal Operator.processedTuple(t, q, mCurExpr,TRUE);  }  //allocate is used to create the operator state (which stores all the group  //records) the first time a value is stored

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?