⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aggoperator.nc

📁 用于传感器网络的节点操作系统 TinyOS 结构设计非常有意思
💻 NC
📖 第 1 页 / 共 2 页
字号:
/*									tab:4 * * * "Copyright (c) 2000-2002 The Regents of the University  of California.   * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice, the following * two paragraphs and the author appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS." * *//*									tab:4 *  IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.  By *  downloading, copying, installing or using the software you agree to *  this license.  If you do not agree to this license, do not download, *  install, copy or use the software. * *  Intel Open Source License  * *  Copyright (c) 2002 Intel Corporation  *  All rights reserved.  *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions are *  met: *  *	Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. *	Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in the *  documentation and/or other materials provided with the distribution. *      Neither the name of the Intel Corporation nor the names of its *  contributors may be used to endorse or promote products derived from *  this software without specific prior written permission. *   *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS *  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *  PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE INTEL OR ITS *  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, *  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, *  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR *  PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  *  *//* * Authors:	Sam Madden *              Design by Sam Madden, Wei Hong, and Joe Hellerstein * Date last modified:  6/26/02 * * */module AggOperator {  provides {    interface Operator;    command TinyDBError addResults(QueryResult *qr, ParsedQuery *q, Expr *e);    command TinyDBError finalizeAggExpr(QueryResult *qr, ParsedQueryPtr q, Expr *e, char *result_buf);    command short getGroupNoFromQr(QueryResult *qr);  }  uses {    interface MemAlloc;    interface QueryProcessor;    interface TupleIntf;    interface ParsedQueryIntf;    interface ExprEval;    interface Leds;    interface QueryResultIntf;        command void signalError(TinyDBError err);      }}implementation {  typedef struct {    int numGroups; //how many groups are there    int groupSize; //how many bytes per group    char groupData[1];  //data for groups -- depends on type of aggregate -- of size groupSize * numGroups  } GroupData, *GroupDataPtr, **GroupDataHandle;  typedef struct {    short value;    short id;  } AlgebraicData;  typedef struct {    short sum;    short count;  } AverageData;  typedef struct {    short ids[3];    short mins[3];  } Min3Data;  typedef struct {    short groupNo;    union {      bool empty;      char exprIdx; //idx of operator that owns us    } u;    union{      AlgebraicData algd;      AverageData avgd;      Min3Data min3d;    } d;  } GroupRecord;  typedef void (*GroupDataCallback)(GroupRecord *d);  GroupDataCallback mCallback;  ParsedQuery *mCurQuery;  Expr *mCurExpr;  QueryResult *mCurResult;  Tuple *mCurTuple;  short mCurGroup;  Handle mAlloced;  GroupRecord *mGroupRecord;  enum {    kMAX_SHORT = 0x7FFF,    kMIN_SHORT = 0x8000  };  /* -------------------------------- Local prototypes -------------------------------- */  void getGroupData(ParsedQuery *pq, short groupNo , Expr *e, GroupDataCallback callback);  void updateGroupForTuple(GroupRecord *d);  short groupSize(Expr *e);  void mergeAggregateValues(GroupRecord *dest, GroupRecord *merge, Expr *e);  void updateAggregateValue(GroupRecord *d, Expr *e, short fieldValue);  void initAggregateValue(GroupRecord *d, Expr *);  void updateGroupForPartialResult(GroupRecord *d);  GroupRecord *findGroup(GroupDataHandle dh, short groupNum);  GroupRecord *addGroup(GroupDataHandle dh, short groupNum);  bool removeEmptyGroup(GroupDataHandle dh);  short getGroupNo(Tuple *t, ParsedQuery *q, Expr *e);  bool aggEqual(GroupRecord *r1, GroupRecord *r2, Expr *e);  bool aggHasData(QueryResult *qr, ParsedQueryPtr q, Expr *e);  task void fireCallback();/* -------------------------------- Local Routines for getting / setting group records  -------------------------------- */  GroupRecord *GET_GROUP_DATA(GroupDataHandle dHan,uint8_t n) {    return (GroupRecord *)&((**dHan).groupData[(n) * (**dHan).groupSize]);  }  void SET_GROUP_DATA(GroupDataHandle dHan,uint8_t n,char *dataptr) {    memcpy(GET_GROUP_DATA(dHan,n), (const char *)(dataptr), (**dHan).groupSize);  }    void COPY_GROUP_DATA(GroupDataHandle dHan,uint8_t n,char *dest) {    memcpy((char *)(dest),(const char *)GET_GROUP_DATA(dHan,n), (**dHan).groupSize);  }  /* -------------------------------- Functions -------------------------------- */    /* Reset the state that this operator stores for the specified expression        (Called every epoch)       Operator state is appended to the expression (ick!) , in the opState handle --         each expression is owned by exactly one operator, which may store state there.    */     //Given a query-result from a different node, add the result into     //the locally computed value for the query.  The locally computed    //value is stored with the expression.    command TinyDBError Operator.processPartialResult(QueryResultPtr qr, ParsedQueryPtr qs, ExprPtr e) {      GroupRecord *gr = (GroupRecord *)qr->d.data;            if (qr->qrType != kAGG_SINGLE_FIELD) return err_InvalidAggregateRecord;      if (gr->u.exprIdx != e->idx) 	return err_InvalidAggregateRecord; //not for us      mCurExpr = e;      mCurQuery = qs;      mCurResult = qr;            getGroupData(qs,gr->groupNo, e, &updateGroupForPartialResult);      return err_NoError;    }    //check and see if this result represents the evaluation of this expression    command bool Operator.resultIsForExpr(QueryResultPtr qr, ExprPtr e) {      GroupRecord *gr = (GroupRecord *)qr->d.data;      if (qr->qrType != kAGG_SINGLE_FIELD) return FALSE;      if (gr->u.exprIdx != e->idx) return FALSE;      return TRUE;    }      void updateGroupForPartialResult(GroupRecord *d) {    GroupRecord *gr = (GroupRecord *)((QueryResult *)mCurResult)->d.data;    mergeAggregateValues(d,gr,mCurExpr);    signal Operator.processedResult(mCurResult, mCurQuery, mCurExpr);  }      /* Return the next result to this query.        If there are no more results, return err_NoMoreResults        qr->result_idx should be set to kFIRST_RESULT if the first result is desired,    or to the previous value of qr->result_idx returned by the last invocation    of AGG_OPERATOR_NEXT_RESULT      */  command TinyDBError Operator.nextResult(QueryResultPtr qr, ParsedQueryPtr qs, ExprPtr e) {    GroupDataHandle gdh = (GroupDataHandle)e->opState;    bool empty = TRUE;    GroupRecord *gr;    if (!aggHasData(qr,qs,e)) return err_NoMoreResults;    if (gdh != NULL) {      short idx = qr->result_idx;          do {  //loop til we find the next non-empty group	if (idx == kFIRST_RESULT) idx = 0;	else idx++;      	if (idx >= (**gdh).numGroups) return err_NoMoreResults;	qr->result_idx = idx;	qr->qid = qs->qid;	//just a single field result	qr->qrType = kAGG_SINGLE_FIELD;	//copy the data into the buffer	gr = GET_GROUP_DATA(gdh,idx);	if (!gr->u.empty) empty = FALSE; //don't output empty results      } while (empty);      COPY_GROUP_DATA(gdh,idx,qr->d.data);      gr = (GroupRecord *)qr->d.data;      gr->u.exprIdx = e->idx;    } else      return err_InvalidAggregateRecord;    return err_NoError;  }  /** Install all of the results for the last epoch of  expression e from pq into qr.   */  command TinyDBError addResults(QueryResult *qr, ParsedQuery *pq, ExprPtr e) {    int i = 0;    GroupRecord *gr;    GroupDataHandle gdh = (GroupDataHandle)e->opState;    TinyDBError err;    if (gdh == NULL) return err_NoError;    if (!aggHasData(qr,pq,e)) return err_NoError;    for (i = 0; i < (**gdh).numGroups; i++) {      gr = GET_GROUP_DATA(gdh, i);      if (!gr->u.empty) {	err = call QueryResultIntf.addAggResult(qr, gr->groupNo, (char *)gr, (**gdh).groupSize, pq, e->idx); 	if (err != err_NoError) {	  return err;	}	gr->u.exprIdx = e->idx;      }    }    return err_NoError;  }  //Given a tuple built locally, add the result into the locally computed  //value for the query  command TinyDBError Operator.processTuple(ParsedQueryPtr qs, TuplePtr t, ExprPtr e) {    mCurExpr = e;    mCurQuery = qs;    mCurTuple = t;    dbg(DBG_USR1,"in PROCESS_TUPLE, expr = %x\n", (unsigned int)mCurExpr);//fflush(stdout);    getGroupData(qs, getGroupNo(t,qs,e) , e, &updateGroupForTuple);    return err_NoError;  }  //epoch ended -- give aggregates a chance to reset themselves,  //if needed (some aggregates, like windowed and exponentially  //decaying averages won't reset on every epoch.)  command result_t Operator.endOfEpoch(ParsedQueryPtr q, ExprPtr e) {    GroupDataHandle gdh = (GroupDataHandle)e->opState;    GroupRecord *gr;    if (e->opType != kSEL && gdh != NULL) {      short i;      for (i = 0; i < (**gdh).numGroups; i++) {	gr = GET_GROUP_DATA(gdh,i);	initAggregateValue(gr, e);      }        }    return SUCCESS;  }  // finished a query  event result_t QueryProcessor.queryComplete(ParsedQuery *q) {    short i;      for (i = 0; i < q->numExprs; i++) {      Expr e = (call ParsedQueryIntf.getExpr(q, i));      if (e.opType != kSEL && (e.opState !=  NULL)) {	call MemAlloc.free((Handle)e.opState);      }    }     return SUCCESS;  }  //callback from getGroupData for PROCESS_TUPLE  //given the location of the aggregate record  //for the new tuple, update it  void updateGroupForTuple(GroupRecord *d) {    Tuple *t = mCurTuple;    Expr *e = mCurExpr;    ParsedQuery *q = mCurQuery;    char *fieldBytes = call TupleIntf.getFieldPtr(q, t, (char)e->ex.agg.field);    short size = call TupleIntf.fieldSize(q, (char)e->ex.agg.field);    short fieldVal = 0;    short i;    for (i = 0; i < size; i++) {      unsigned char b = (*fieldBytes++);      fieldVal += ((unsigned short)b)<<(i*8);        }      updateAggregateValue(d,e,fieldVal);    signal Operator.processedTuple(t, q, mCurExpr,TRUE);      }  //allocate is used to create the operator state (which stores all the group  //records) the first time a value is stored  event result_t MemAlloc.allocComplete(HandlePtr h, uint8_t success) {    GroupDataHandle dh = (GroupDataHandle)*h;    GroupRecord *newGroup;    if (h != (Handle *)mAlloced) return SUCCESS; //not for us    if (!success) {      dbg(DBG_USR1,"Error! Couldn't allocate aggregate data!");      call signalError(err_OutOfMemory);    }    dbg(DBG_USR1,"in AGG_ALLOC_DONE, expr = %x\n", (unsigned int)mCurExpr);//fflush(stdout);    (**dh).groupSize = groupSize(mCurExpr);    (**dh).numGroups = 0;    newGroup = addGroup(dh, mCurGroup);    initAggregateValue(newGroup, mCurExpr);    (*mCallback)(newGroup);    return SUCCESS;  }  //reallocate is used when a new group is allocated in the existing operator  //state  event result_t MemAlloc.reallocComplete(Handle h, uint8_t success) {    GroupRecord *newGroup;      if (h != (Handle)mAlloced) return SUCCESS; //not for us    if (!success) {      if (!removeEmptyGroup((GroupDataHandle)h)) { //check for empty groups -- if there are any, reuse them      	//maybe try to evict -- may be not possible	dbg(DBG_USR1,"Error! Couldn't reallocate aggregate data!");	call signalError(err_OutOfMemory);      }    }    newGroup = addGroup((GroupDataHandle)h, mCurGroup);    initAggregateValue(newGroup, mCurExpr);    (*mCallback)(newGroup);    return SUCCESS;  }  event result_t MemAlloc.compactComplete() {    return SUCCESS;  }  //binary search to locate the group record for the specified  //group num  GroupRecord *findGroup(GroupDataHandle dh, short groupNum) {    short min = 0, max = (**dh).numGroups;    GroupRecord *gr;    if (max == 0) return NULL; // no groups    while (TRUE) {      gr = GET_GROUP_DATA(dh, min);      if (gr->groupNo == groupNum) return gr;      if (max == (min + 1)) break;        if (gr->groupNo > groupNum) 	max = max - ((max - min)  >> 1);      else	min = min + ((max - min) >> 1);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -