aggoperator.nc

来自「tinyos最新版」· NC 代码 · 共 714 行 · 第 1/2 页

NC
714
字号
/*									tab:4 * * * "Copyright (c) 2000-2002 The Regents of the University  of California. * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice, the following * two paragraphs and the author appear in all copies of this software. * * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS." * *//*									tab:4 *  IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.  By *  downloading, copying, installing or using the software you agree to *  this license.  If you do not agree to this license, do not download, *  install, copy or use the software. * *  Intel Open Source License * *  Copyright (c) 2002 Intel Corporation *  All rights reserved. *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions are *  met: * *	Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. *	Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in the *  documentation and/or other materials provided with the distribution. *      Neither the name of the Intel Corporation nor the names of its *  contributors may be used to endorse or promote products derived from *  this software without specific prior written permission. * *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS *  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *  PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE INTEL OR ITS *  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, *  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, *  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR *  PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * *//* * Authors:	Sam Madden *              Design by Sam Madden, Wei Hong, and Joe Hellerstein * Date last modified:  6/26/02 * * */includes Aggregates;module AggOperator {  provides {    interface Operator;    command TinyDBError addResults(QueryResult *qr, ParsedQuery *q, Expr *e);    command TinyDBError finalizeAggExpr(QueryResult *qr, ParsedQueryPtr q, Expr *e, char *result_buf);    command short getGroupNoFromQr(QueryResult *qr);  }  uses {    interface MemAlloc;    interface QueryProcessor;    interface TupleIntf;    interface ParsedQueryIntf;    interface ExprEval;    interface Leds;    interface QueryResultIntf;    interface AggregateUse;    command void signalError(TinyDBError err, int lineNo);  }}implementation {    #define SIG_ERR(errNo) call signalError((errNo), __LINE__)    typedef struct {    int numGroups; //how many groups are there    int groupSize; //how many bytes per group    char groupData[1];  //data for groups -- depends on type of aggregate -- of size groupSize * numGroups  } GroupData, *GroupDataPtr, **GroupDataHandle;    typedef struct {    short groupNo;//2    union {      bool empty;      char exprIdx; //idx of operator that owns us    } u;//3    char aggdata[1];//aggregate-dependent data of variable size    //size of this space returned as part of groupSize() value  } __attribute__((packed)) GroupRecord;  typedef void (*GroupDataCallback)(GroupRecord *d);  GroupDataCallback mCallback;  ParsedQuery *mCurQuery;  Expr *mCurExpr;  QueryResult *mCurResult;  Tuple *mCurTuple;  short mCurGroup;  Handle mAlloced;  GroupRecord *mGroupRecord;  enum {    QUERY_DONE_STATE = 0x1010, //magic code indicating query is done!  };  /* -------------------------------- Local prototypes -------------------------------- */  void getGroupData(ParsedQuery *pq, short groupNo , Expr *e, GroupDataCallback callback);  void updateGroupForTuple(GroupRecord *d);  short groupSize(Expr *e);  void mergeAggregateValues(GroupRecord *dest, GroupRecord *merge, Expr *e);  void updateAggregateValue(GroupRecord *d, Expr *e, short fieldValue);  void initAggregateValue(GroupRecord *d, Expr * e, bool isFirstTime);  void updateGroupForPartialResult(GroupRecord *d);  bool aggHasData(GroupRecord *gr, Expr *e);  GroupRecord *findGroup(GroupDataHandle dh, short groupNum);  GroupRecord *addGroup(GroupDataHandle dh, short groupNum);  bool removeEmptyGroup(GroupDataHandle dh);  short getGroupNo(Tuple *t, ParsedQuery *q, Expr *e);  bool aggEqual(GroupRecord *r1, GroupRecord *r2, Expr *e);  //void addValToTemporalAggState(TemporalAggState *tas, short value);  //short	addAdpDeltaDataToTemporalAggState(TemporalAggState *tas, short lastVal, short newVal, uint16_t epoch);  //void addAvgdataToTemporalAggState(TemporalAggState *tas, short sum, short count);  task void fireCallback();/* -------------------------------- Local Routines for getting / setting group records  -------------------------------- */  GroupRecord *GET_GROUP_DATA(GroupDataHandle dHan,uint8_t n) {    return (GroupRecord *)&((**dHan).groupData[(n) * (**dHan).groupSize]);  }  void SET_GROUP_DATA(GroupDataHandle dHan,uint8_t n,char *dataptr) {    memcpy(GET_GROUP_DATA(dHan,n), (const char *)(dataptr), (**dHan).groupSize);  }    void COPY_GROUP_DATA(GroupDataHandle dHan,uint8_t n,char *dest) {    memcpy((char *)(dest),(const char *)GET_GROUP_DATA(dHan,n), (**dHan).groupSize);  }  /* --gr->aggdata------------ Functions -------------------------------- */    /* Reset the state that this operator stores for the specified expression       (Called every epoch)       Operator state is appended to the expression (ick!) , in the opState handle --         each expression is owned by exactly one operator, which may store state there.    */    //Given a query-result from a different node, add the result into    //the locally computed value for the query.  The locally computed    //value is stored with the expression.    command TinyDBError Operator.processPartialResult(QueryResultPtr qr, ParsedQueryPtr qs, ExprPtr e) {      GroupRecord *gr = (GroupRecord *)qr->d.data;            if (qr->qrType != kAGG_SINGLE_FIELD) return err_InvalidAggregateRecord;      if (gr->u.exprIdx != e->idx)	return err_InvalidAggregateRecord; //not for us      if (e->opState == (OperatorStateHandle)QUERY_DONE_STATE) return err_NoError;            mCurExpr = e;      mCurQuery = qs;      mCurResult = qr;      dbg(DBG_USR3, "AggOperator.processPartialResult: Calling getGroupData\n");            getGroupData(qs,gr->groupNo, e, &updateGroupForPartialResult);      return err_NoError;    }    //check and see if this result represents the evaluation of this expression    command bool Operator.resultIsForExpr(QueryResultPtr qr, ExprPtr e) {      GroupRecord *gr = (GroupRecord *)qr->d.data;      bool result = TRUE;      if (qr->qrType != kAGG_SINGLE_FIELD) result = FALSE;      if (gr->u.exprIdx != e->idx) result = FALSE;      //dbg(DBG_USR3, "resultIsForExpr: %d\n", result);      return result;    }      void updateGroupForPartialResult(GroupRecord *d) {    GroupRecord *gr = (GroupRecord *)((QueryResult *)mCurResult)->d.data;        dbg(DBG_USR3, "updateGroupForPartialResult called\n");        mergeAggregateValues(d,gr,mCurExpr);    signal Operator.processedResult(mCurResult, mCurQuery, mCurExpr);  }      /* Return the next result to this query.        If there are no more results, return err_NoMoreResults        qr->result_idx should be set to kFIRST_RESULT if the first result is desired,    or to the previous value of qr->result_idx returned by the last invocation    of AGG_OPERATOR_NEXT_RESULT      */  command TinyDBError Operator.nextResult(QueryResultPtr qr, ParsedQueryPtr qs, ExprPtr e) {    GroupDataHandle gdh = (GroupDataHandle)e->opState;    bool empty = TRUE;    GroupRecord *gr;    //if (!aggHasData(qr,qs,e)) return err_NoMoreResults;    if (gdh != NULL) {      short idx = qr->result_idx;          do {  //loop til we find the next non-empty group			if (idx == kFIRST_RESULT) idx = 0;			else idx++;			  			if (idx >= (**gdh).numGroups) return err_NoMoreResults;			qr->result_idx = idx;			qr->qid = qs->qid;			//just a single field result			qr->qrType = kAGG_SINGLE_FIELD;			//copy the data into the buffer			gr = GET_GROUP_DATA(gdh,idx);			//TODO: following hasData call is the SOURCE OF INEFFICIENCY			if (!gr->u.empty && aggHasData(gr,e)) empty = FALSE; //don't output empty results      } while (empty);      COPY_GROUP_DATA(gdh,idx,qr->d.data);      gr = (GroupRecord *)qr->d.data;      gr->u.exprIdx = e->idx;    } else      return err_InvalidAggregateRecord;    return err_NoError;  }  /** Install all of the results for the last epoch of  expression e from pq into qr.   */  command TinyDBError addResults(QueryResult *qr, ParsedQuery *pq, ExprPtr e) {    int i = 0;    GroupRecord *gr;    GroupDataHandle gdh = (GroupDataHandle)e->opState;    TinyDBError err;    if (gdh == NULL) return err_NoError;    //if (!aggHasData(qr,pq,e)) return err_NoError;    for (i = 0; i < (**gdh).numGroups; i++) {      gr = GET_GROUP_DATA(gdh, i);      if (!gr->u.empty && aggHasData(gr,e)) {		// XXX hack! modify epoch number here for temporal aggregates		// that may deliver tuples out of order		if (e->ex.agg.op == kADP_DELTA)			qr->epoch = ((TemporalAdpDeltaData *)gr->aggdata)->lastResult.epoch;					err = call QueryResultIntf.addAggResult(qr, gr->groupNo, (char *)gr, (**gdh).groupSize, pq, e->idx);			if (err != err_NoError) {		  return err;		}		gr->u.exprIdx = e->idx;      }    }    return err_NoError;  }  //Given a tuple built locally, add the result into the locally computed  //value for the query  command TinyDBError Operator.processTuple(ParsedQueryPtr qs, TuplePtr t, ExprPtr e) {    mCurExpr = e;    mCurQuery = qs;    mCurTuple = t;    dbg(DBG_USR1,"in PROCESS_TUPLE, expr = %x\n", (unsigned int)mCurExpr);//fflush(stdout);    if (e->opState == (OperatorStateHandle)QUERY_DONE_STATE) return err_NoError;    getGroupData(qs, getGroupNo(t,qs,e) , e, &updateGroupForTuple);    return err_NoError;  }  //epoch ended -- give aggregates a chance to reset themselves,  //if needed (some aggregates, like windowed and exponentially  //decaying averages won't reset on every epoch.)  command result_t Operator.endOfEpoch(ParsedQueryPtr q, ExprPtr e) {    GroupDataHandle gdh = (GroupDataHandle)e->opState;    GroupRecord *gr;    if (e->opType != kSEL && gdh != NULL) {      short i;      for (i = 0; i < (**gdh).numGroups; i++) {	gr = GET_GROUP_DATA(gdh,i);	initAggregateValue(gr, e, FALSE);      }        }    return SUCCESS;  }  // finished a query  event result_t QueryProcessor.queryComplete(ParsedQuery *q) {    short i;      for (i = 0; i < q->numExprs; i++) {      Expr e = (call ParsedQueryIntf.getExpr(q, i));      if (e.opType != kSEL && (e.opState !=  NULL)) {	call MemAlloc.free((Handle)e.opState);	dbg(DBG_USR1, "Agg: cleaning up.\n");	e.opState = (OperatorStateHandle)QUERY_DONE_STATE; //mark it as empty!      }    }    return SUCCESS;  }  //callback from getGroupData for PROCESS_TUPLE  //given the location of the aggregate record  //for the new tuple, update it  void updateGroupForTuple(GroupRecord *d) {    Tuple *t = mCurTuple;    Expr *e = mCurExpr;    ParsedQuery *q = mCurQuery;    char *fieldBytes = call TupleIntf.getFieldPtr(q, t, (char)e->ex.agg.field);    short size = call TupleIntf.fieldSize(q, (char)e->ex.agg.field);    short fieldVal = 0;    short i;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?