aggoperator.nc
来自「nesC写的heed算法」· NC 代码 · 共 695 行 · 第 1/2 页
NC
695 行
// $Id: AggOperator.nc,v 1.17.4.5 2003/08/26 09:08:14 cssharp Exp $/* tab:4 * "Copyright (c) 2000-2003 The Regents of the University of California. * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice, the following * two paragraphs and the author appear in all copies of this software. * * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS." * * Copyright (c) 2002-2003 Intel Corporation * All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE * file. If you do not find these files, copies can be found by writing to * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, Berkeley, CA, * 94704. Attention: Intel License Inquiry. *//* * Authors: Sam Madden * Design by Sam Madden, Wei Hong, and Joe Hellerstein * Date last modified: 6/26/02 * * *//** * @author Sam Madden * @author Design by Sam Madden * @author Wei Hong * @author and Joe Hellerstein */includes Aggregates;module AggOperator { provides { interface Operator; command TinyDBError addResults(QueryResult *qr, ParsedQuery *q, Expr *e); command TinyDBError finalizeAggExpr(QueryResult *qr, ParsedQueryPtr q, Expr *e, char *result_buf); command short getGroupNoFromQr(QueryResult *qr); } uses { interface MemAlloc; interface QueryProcessor; interface TupleIntf; interface ParsedQueryIntf; interface ExprEval; interface Leds; interface QueryResultIntf; interface AggregateUse; command void signalError(TinyDBError err, int lineNo); }}implementation { #define SIG_ERR(errNo) call signalError((errNo), __LINE__) typedef struct { int numGroups; //how many groups are there int groupSize; //how many bytes per group char groupData[1]; //data for groups -- depends on type of aggregate -- of size groupSize * numGroups } GroupData, *GroupDataPtr, **GroupDataHandle; typedef struct { short groupNo;//2 union { bool empty; char exprIdx; //idx of operator that owns us } u;//3 char aggdata[1];//aggregate-dependent data of variable size //size of this space returned as part of groupSize() value } __attribute__((packed)) GroupRecord; typedef void (*GroupDataCallback)(GroupRecord *d); GroupDataCallback mCallback; ParsedQuery *mCurQuery; Expr *mCurExpr; QueryResult *mCurResult; Tuple *mCurTuple; short mCurGroup; Handle mAlloced; GroupRecord *mGroupRecord; enum { QUERY_DONE_STATE = 0x1010, //magic code indicating query is done! }; /* -------------------------------- Local prototypes -------------------------------- */ void getGroupData(ParsedQuery *pq, short groupNo , Expr *e, GroupDataCallback callback); void updateGroupForTuple(GroupRecord *d); short groupSize(Expr *e); void mergeAggregateValues(GroupRecord *dest, GroupRecord *merge, Expr *e); void updateAggregateValue(GroupRecord *d, Expr *e, short fieldValue); void initAggregateValue(GroupRecord *d, Expr * e, bool isFirstTime); void updateGroupForPartialResult(GroupRecord *d); bool aggHasData(GroupRecord *gr, Expr *e); GroupRecord *findGroup(GroupDataHandle dh, short groupNum); GroupRecord *addGroup(GroupDataHandle dh, short groupNum); bool removeEmptyGroup(GroupDataHandle dh); short getGroupNo(Tuple *t, ParsedQuery *q, Expr *e); bool aggEqual(GroupRecord *r1, GroupRecord *r2, Expr *e); //void addValToTemporalAggState(TemporalAggState *tas, short value); //short addAdpDeltaDataToTemporalAggState(TemporalAggState *tas, short lastVal, short newVal, uint16_t epoch); //void addAvgdataToTemporalAggState(TemporalAggState *tas, short sum, short count); task void fireCallback();/* -------------------------------- Local Routines for getting / setting group records -------------------------------- */ GroupRecord *GET_GROUP_DATA(GroupDataHandle dHan,uint8_t n) { return (GroupRecord *)&((**dHan).groupData[(n) * (**dHan).groupSize]); } void SET_GROUP_DATA(GroupDataHandle dHan,uint8_t n,char *dataptr) { memcpy(GET_GROUP_DATA(dHan,n), (const char *)(dataptr), (**dHan).groupSize); } void COPY_GROUP_DATA(GroupDataHandle dHan,uint8_t n,char *dest) { memcpy((char *)(dest),(const char *)GET_GROUP_DATA(dHan,n), (**dHan).groupSize); } /* --gr->aggdata------------ Functions -------------------------------- */ /* Reset the state that this operator stores for the specified expression (Called every epoch) Operator state is appended to the expression (ick!) , in the opState handle -- each expression is owned by exactly one operator, which may store state there. */ //Given a query-result from a different node, add the result into //the locally computed value for the query. The locally computed //value is stored with the expression. command TinyDBError Operator.processPartialResult(QueryResultPtr qr, ParsedQueryPtr qs, ExprPtr e) { GroupRecord *gr = (GroupRecord *)qr->d.data; if (qr->qrType != kAGG_SINGLE_FIELD) return err_InvalidAggregateRecord; if (gr->u.exprIdx != e->idx) return err_InvalidAggregateRecord; //not for us if (e->opState == (OperatorStateHandle)QUERY_DONE_STATE) return err_NoError; mCurExpr = e; mCurQuery = qs; mCurResult = qr; dbg(DBG_USR3, "AggOperator.processPartialResult: Calling getGroupData\n"); getGroupData(qs,gr->groupNo, e, &updateGroupForPartialResult); return err_NoError; } //check and see if this result represents the evaluation of this expression command bool Operator.resultIsForExpr(QueryResultPtr qr, ExprPtr e) { GroupRecord *gr = (GroupRecord *)qr->d.data; bool result = TRUE; if (qr->qrType != kAGG_SINGLE_FIELD) result = FALSE; if (gr->u.exprIdx != e->idx) result = FALSE; //dbg(DBG_USR3, "resultIsForExpr: %d\n", result); return result; } void updateGroupForPartialResult(GroupRecord *d) { GroupRecord *gr = (GroupRecord *)((QueryResult *)mCurResult)->d.data; dbg(DBG_USR3, "updateGroupForPartialResult called\n"); mergeAggregateValues(d,gr,mCurExpr); signal Operator.processedResult(mCurResult, mCurQuery, mCurExpr); } /* Return the next result to this query. If there are no more results, return err_NoMoreResults qr->result_idx should be set to kFIRST_RESULT if the first result is desired, or to the previous value of qr->result_idx returned by the last invocation of AGG_OPERATOR_NEXT_RESULT */ command TinyDBError Operator.nextResult(QueryResultPtr qr, ParsedQueryPtr qs, ExprPtr e) { GroupDataHandle gdh = (GroupDataHandle)e->opState; bool empty = TRUE; GroupRecord *gr; //if (!aggHasData(qr,qs,e)) return err_NoMoreResults; if (gdh != NULL) { short idx = qr->result_idx; do { //loop til we find the next non-empty group if (idx == kFIRST_RESULT) idx = 0; else idx++; if (idx >= (**gdh).numGroups) return err_NoMoreResults; qr->result_idx = idx; qr->qid = qs->qid; //just a single field result qr->qrType = kAGG_SINGLE_FIELD; //copy the data into the buffer gr = GET_GROUP_DATA(gdh,idx); //TODO: following hasData call is the SOURCE OF INEFFICIENCY if (!gr->u.empty && aggHasData(gr,e)) empty = FALSE; //don't output empty results } while (empty); COPY_GROUP_DATA(gdh,idx,qr->d.data); gr = (GroupRecord *)qr->d.data; gr->u.exprIdx = e->idx; } else return err_InvalidAggregateRecord; return err_NoError; } /** Install all of the results for the last epoch of expression e from pq into qr. */ command TinyDBError addResults(QueryResult *qr, ParsedQuery *pq, ExprPtr e) { int i = 0; GroupRecord *gr; GroupDataHandle gdh = (GroupDataHandle)e->opState; TinyDBError err; if (gdh == NULL) return err_NoError; //if (!aggHasData(qr,pq,e)) return err_NoError; for (i = 0; i < (**gdh).numGroups; i++) { gr = GET_GROUP_DATA(gdh, i); if (!gr->u.empty && aggHasData(gr,e)) { // XXX hack! modify epoch number here for temporal aggregates // that may deliver tuples out of order if (e->ex.agg.op == kADP_DELTA) qr->epoch = ((TemporalAdpDeltaData *)gr->aggdata)->lastResult.epoch; err = call QueryResultIntf.addAggResult(qr, gr->groupNo, (char *)gr, (**gdh).groupSize, pq, e->idx); if (err != err_NoError) { return err; } gr->u.exprIdx = e->idx; } } return err_NoError; } //Given a tuple built locally, add the result into the locally computed //value for the query command TinyDBError Operator.processTuple(ParsedQueryPtr qs, TuplePtr t, ExprPtr e) { mCurExpr = e; mCurQuery = qs; mCurTuple = t; dbg(DBG_USR1,"in PROCESS_TUPLE, expr = %x\n", (unsigned int)mCurExpr);//fflush(stdout); if (e->opState == (OperatorStateHandle)QUERY_DONE_STATE) return err_NoError; getGroupData(qs, getGroupNo(t,qs,e) , e, &updateGroupForTuple); return err_NoError; } //epoch ended -- give aggregates a chance to reset themselves, //if needed (some aggregates, like windowed and exponentially //decaying averages won't reset on every epoch.) command result_t Operator.endOfEpoch(ParsedQueryPtr q, ExprPtr e) { GroupDataHandle gdh = (GroupDataHandle)e->opState; GroupRecord *gr; if (e->opType != kSEL && gdh != NULL) { short i; for (i = 0; i < (**gdh).numGroups; i++) { gr = GET_GROUP_DATA(gdh,i); initAggregateValue(gr, e, FALSE); } } return SUCCESS; } // finished a query event result_t QueryProcessor.queryComplete(ParsedQuery *q) { short i; for (i = 0; i < q->numExprs; i++) { Expr e = (call ParsedQueryIntf.getExpr(q, i)); if (e.opType != kSEL && (e.opState != NULL)) { call MemAlloc.free((Handle)e.opState); dbg(DBG_USR1, "Agg: cleaning up.\n"); e.opState = (OperatorStateHandle)QUERY_DONE_STATE; //mark it as empty! } } return SUCCESS; } //callback from getGroupData for PROCESS_TUPLE //given the location of the aggregate record //for the new tuple, update it void updateGroupForTuple(GroupRecord *d) { Tuple *t = mCurTuple; Expr *e = mCurExpr; ParsedQuery *q = mCurQuery; char *fieldBytes = call TupleIntf.getFieldPtr(q, t, (char)e->ex.agg.field); short size = call TupleIntf.fieldSize(q, (char)e->ex.agg.field); short fieldVal = 0; short i; for (i = 0; i < size; i++) { unsigned char b = (*fieldBytes++); fieldVal += ((unsigned short)b)<<(i*8); } updateAggregateValue(d,e,fieldVal); signal Operator.processedTuple(t, q, mCurExpr,TRUE); } //allocate is used to create the operator state (which stores all the group //records) the first time a value is stored
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?