aggoperator.nc

来自「tinyos最新版」· NC 代码 · 共 714 行 · 第 1/2 页

NC
714
字号
    for (i = 0; i < size; i++) {      unsigned char b = (*fieldBytes++);      fieldVal += ((unsigned short)b)<<(i*8);    }      updateAggregateValue(d,e,fieldVal);    signal Operator.processedTuple(t, q, mCurExpr,TRUE);  }  //allocate is used to create the operator state (which stores all the group  //records) the first time a value is stored  event result_t MemAlloc.allocComplete(HandlePtr h, uint8_t success) {    GroupDataHandle dh = (GroupDataHandle)*h;    GroupRecord *newGroup;    if (h != (Handle *)mAlloced) return SUCCESS; //not for us    mAlloced = NULL;    dbg(DBG_USR1, "In allocComplete\n");    if (!success) {      dbg(DBG_USR1,"Error! Couldn't allocate aggregate data!");      SIG_ERR(err_OutOfMemory);	  return FAIL;    }    dbg(DBG_USR1,"in AGG_ALLOC_DONE, expr = %x\n", (unsigned int)mCurExpr);//fflush(stdout);    (**dh).groupSize = groupSize(mCurExpr);    (**dh).numGroups = 0;    newGroup = addGroup(dh, mCurGroup);    initAggregateValue(newGroup, mCurExpr, TRUE);    (*mCallback)(newGroup);    return SUCCESS;  }  //reallocate is used when a new group is allocated in the existing operator  //state  event result_t MemAlloc.reallocComplete(Handle h, uint8_t success) {    GroupRecord *newGroup;      if (h != (Handle)mAlloced) return SUCCESS; //not for us    mAlloced = NULL;    dbg(DBG_USR1, "In reallocComplete\n");    if (!success) {      if (!removeEmptyGroup((GroupDataHandle)h)) { //check for empty groups -- if there are any, reuse them	//maybe try to evict -- may be not possible	dbg(DBG_USR1,"Error! Couldn't reallocate aggregate data!");	SIG_ERR(err_OutOfMemory);      }    }    newGroup = addGroup((GroupDataHandle)h, mCurGroup);    initAggregateValue(newGroup, mCurExpr, TRUE);    (*mCallback)(newGroup);    return SUCCESS;  }  event result_t MemAlloc.compactComplete() {    return SUCCESS;  }  //binary search to locate the group record for the specified  //group num  GroupRecord *findGroup(GroupDataHandle dh, short groupNum) {    short min = 0, max = (**dh).numGroups;    GroupRecord *gr;    if (max == 0) return NULL; // no groups    while (TRUE) {      gr = GET_GROUP_DATA(dh, min);      if (gr->groupNo == groupNum) return gr;      if (max == (min + 1)) break;      if (gr->groupNo > groupNum)	max = max - ((max - min)  >> 1);      else	min = min + ((max - min) >> 1);    }    return NULL;  }  //scan the list of groups and remove one that is empty  //return true if successful, false if there are no  //emtpy groups  bool removeEmptyGroup(GroupDataHandle dh) {    short i, lastEmpty;    bool found = FALSE;    GroupRecord *gr;    //scan backwards, looking for an empty group    for (lastEmpty = (**dh).numGroups - 1; lastEmpty >= 0; lastEmpty--) {      gr = GET_GROUP_DATA(dh,lastEmpty);      if (gr->u.empty) {	found = TRUE;	break;      }    }    if (!found) return FALSE;    dbg(DBG_USR1,"found empty = %d\n", lastEmpty);    //now shift everything after that group up one    for (i = lastEmpty + 1; i < (**dh).numGroups; i++) {      gr = GET_GROUP_DATA(dh,i);      SET_GROUP_DATA(dh,i-1,(char *)&gr);    }    (**dh).numGroups--;    return TRUE;  }  //add a group to a group data handle that has been realloced to be big enough to hold  //the group record (we assume the new space is at the end of the data block)  GroupRecord *addGroup(GroupDataHandle dh, short groupNum) {    short i;    bool shift = FALSE, first = FALSE;    GroupRecord *gr,lastgr,newgr,tempgr,*ret=NULL;      newgr.groupNo = groupNum;    //do a simple insertion sort    (**dh).numGroups++;      for (i = 0; i < (**dh).numGroups; i++) {      gr = GET_GROUP_DATA(dh,i);      //did we find the place to insert?      if ((!shift && gr->groupNo > groupNum) || (i+1 == (**dh).numGroups)) {	lastgr = newgr; //yup	shift = TRUE;	first=TRUE;      }          if (shift) {  //have we already inserted?	tempgr = *gr;  //move up the current record	SET_GROUP_DATA(dh,i,(char *)&lastgr);	lastgr = tempgr;	if (first) {	  first=FALSE;	  ret = GET_GROUP_DATA(dh,i);	}      }          }    if (ret == NULL) {      dbg(DBG_USR1,"ERROR: Retval is NULL on addGroup!\n");//fflush(stdout);    }    return ret;  }  //locate or allocate the group data for the group that t should update,  //and invoke the callback with that data.  void getGroupData(ParsedQuery *pq, short groupNo , Expr *e, GroupDataCallback callback) {    GroupDataHandle dh = (GroupDataHandle)e->opState;    mCallback = callback;      mCurExpr = e;    mCurQuery = pq;    mCurGroup = groupNo;    dbg(DBG_USR1, "In getGroupData, groupNo = %d, dh = %x\n", groupNo, dh);    if (dh == NULL) {      //we've got to allocate this baby      mAlloced = (Handle) &e->opState; //ick      if (call MemAlloc.allocate((HandlePtr)&e->opState, groupSize(e) + sizeof(GroupData)) == FAIL)	{	  signal MemAlloc.allocComplete((Handle *)&e->opState, FALSE);	}    } else {      GroupRecord *gr;      //scan through it, looking to see if the needed group is there      gr = findGroup(dh, groupNo);      mGroupRecord = gr;      //decouple so that we don't immediately return (yuck yuck!)      if (gr != NULL) (*(callback))(gr);      else {	//group doesn't exist -- must realloc and continue	mAlloced = (Handle) e->opState;		if (call MemAlloc.reallocate((Handle)e->opState, groupSize(e) * ((**dh).numGroups + 1) + sizeof(GroupData)) == FAIL) //failure	  {	    signal MemAlloc.reallocComplete((Handle)e->opState, FALSE);	  }            }    }  }    task void fireCallback() {      (*mCallback)(mGroupRecord);    }  short getGroupNo(Tuple *t, ParsedQuery *q, Expr *e) {    char *fieldBytes;    short size;    short fieldVal = 0;    short i;    if (e->ex.agg.groupingField == (short)kNO_GROUPING_FIELD) return 0; //we're not using a group!        fieldBytes = call TupleIntf.getFieldPtr(q, t, (char)e->ex.agg.groupingField);    size = call TupleIntf.fieldSize(q, (char)e->ex.agg.groupingField);    for (i = 0; i < size; i++) {      unsigned char b = (*fieldBytes++);      fieldVal += ((unsigned short)b)<<(i*8);    }    return (call ExprEval.evaluateGroupBy(e->ex.agg, fieldVal));    //(fieldVal) >> e->ex.agg.attenuation); //group number is attenuated by some number of bits  }     /** @return The group number from this query result */  command short getGroupNoFromQr(QueryResult *qr) {    GroupRecord *gr = (GroupRecord *)qr->d.data;        return gr->groupNo;  }  //compare two aggregate records, determine if they are equal  bool aggEqual(GroupRecord *r1, GroupRecord *r2, Expr *e) {    short size = groupSize(e);    char *b1 = r1->aggdata;    char *b2 = r2->aggdata;    while (size--)      if (*b1++ != *b2++) return FALSE;    return TRUE;  }  /* ------------------------------------- Aggregation Operator Specific Commands ------------------------------------------- *//* Return the amount of storage required for an aggregate of the specified group   Note that this will not generalize to support variable size (e.g. holistic aggregates)*/  short groupSize(Expr *e) {    GroupRecord g;    short base;        ParamVals paramVals;    setParamValues(&paramVals,e);        base = sizeof(g) - sizeof(g.aggdata[0]);    return base + call AggregateUse.stateSize(e->ex.agg.op, NULL, &paramVals);  }  /* Given two aggregate records, merge them together into dest. */  void mergeAggregateValues(GroupRecord *dest, GroupRecord *merge, Expr *e) {	ParamVals paramVals;    setParamValues(&paramVals,e);        dest->u.empty = FALSE;    // the following is ugly    if (e->ex.agg.op == kADP_DELTA) { //set up current epoch as an arg		setCurrentEpoch(&paramVals, mCurQuery->currentEpoch);	}	    call AggregateUse.merge(e->ex.agg.op, dest->aggdata, merge->aggdata, NULL, &paramVals);  }  /* Given an aggregate value and a group, merge the value into the group */  void updateAggregateValue(GroupRecord *d, Expr *e, short fieldValue) {    ParamVals paramVals;    setParamValues(&paramVals,e);     //dbg(DBG_USR3, "Update called.\n");        fieldValue = call ExprEval.evaluate(e, fieldValue);    d->u.empty = FALSE;        // the following is ugly    if (e->ex.agg.op == kADP_DELTA) { //set up current epoch as an arg		setCurrentEpoch(&paramVals, mCurQuery->currentEpoch);	}    call AggregateUse.update(e->ex.agg.op, d->aggdata, (char *)&fieldValue, NULL, &paramVals);  }  /* Initialize the value of the specified aggregate value. */  void initAggregateValue(GroupRecord *d, Expr *e, bool isFirstTime) {    ParamVals paramVals;    setParamValues(&paramVals,e);        //dbg(DBG_USR3, "Init called.\n");    d->u.empty = TRUE;        call AggregateUse.init(e->ex.agg.op, d->aggdata, NULL, &paramVals, isFirstTime);  }    /* Return true if this aggregate has data ready right now     -- some aggregates, such as windowed averages, only     produce data at the end of several epochs.  */  bool aggHasData(GroupRecord *gr, Expr *e) {  	ParamVals paramVals;  	setParamValues(&paramVals, e);  	  	return call AggregateUse.hasData(e->ex.agg.op, gr->aggdata, NULL, &paramVals);  }  command TinyDBError finalizeAggExpr(QueryResult *qr, ParsedQueryPtr q, Expr *e, char *result_buf) {    GroupRecord *gr = (GroupRecord *)qr->d.data;        ParamVals paramVals;    setParamValues(&paramVals,e);    //dbg(DBG_USR3, "Finalize Aggr Expr called.\n");    return call AggregateUse.finalize(e->ex.agg.op, gr->aggdata, result_buf, NULL, &paramVals);  }    /*****************************************************************************   * Functions for optimizing aggregates result routing   ****************************************************************************/    /*   * Returns TRUE if local result can affect final result given snooped result   */  bool localDataAffectsResult(GroupRecord *local, GroupRecord *snooped, Expr *e) {  	ParamVals paramVals;  	AggregateProperties properties;  	int16_t resultBefore, resultAfter;  	    setParamValues(&paramVals,e);        properties = call AggregateUse.getProperties(e->ex.agg.op);        if (isMonotonic(properties) && isExemplary(properties)) {// MIN, MAX-like aggregates    	//finalize snooped    	// using ints instead of buffers is a dirty hack. OK FOR NOW?    	call AggregateUse.finalize(e->ex.agg.op, snooped->aggdata, (char *) &resultBefore, NULL, &paramVals);    	//aplly merge(snooped, local). Notice snooped is destination    	call AggregateUse.merge(e->ex.agg.op, snooped->aggdata, local->aggdata, NULL, &paramVals);    	//finalize snooped again    	call AggregateUse.finalize(e->ex.agg.op, snooped->aggdata, (char *) &resultAfter, NULL, &paramVals);    	//if result changed, local data affects it!    	return (resultBefore != resultAfter);    }    	return TRUE;  }    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?