aggoperator.nc
来自「tinyos最新版」· NC 代码 · 共 714 行 · 第 1/2 页
NC
714 行
for (i = 0; i < size; i++) { unsigned char b = (*fieldBytes++); fieldVal += ((unsigned short)b)<<(i*8); } updateAggregateValue(d,e,fieldVal); signal Operator.processedTuple(t, q, mCurExpr,TRUE); } //allocate is used to create the operator state (which stores all the group //records) the first time a value is stored event result_t MemAlloc.allocComplete(HandlePtr h, uint8_t success) { GroupDataHandle dh = (GroupDataHandle)*h; GroupRecord *newGroup; if (h != (Handle *)mAlloced) return SUCCESS; //not for us mAlloced = NULL; dbg(DBG_USR1, "In allocComplete\n"); if (!success) { dbg(DBG_USR1,"Error! Couldn't allocate aggregate data!"); SIG_ERR(err_OutOfMemory); return FAIL; } dbg(DBG_USR1,"in AGG_ALLOC_DONE, expr = %x\n", (unsigned int)mCurExpr);//fflush(stdout); (**dh).groupSize = groupSize(mCurExpr); (**dh).numGroups = 0; newGroup = addGroup(dh, mCurGroup); initAggregateValue(newGroup, mCurExpr, TRUE); (*mCallback)(newGroup); return SUCCESS; } //reallocate is used when a new group is allocated in the existing operator //state event result_t MemAlloc.reallocComplete(Handle h, uint8_t success) { GroupRecord *newGroup; if (h != (Handle)mAlloced) return SUCCESS; //not for us mAlloced = NULL; dbg(DBG_USR1, "In reallocComplete\n"); if (!success) { if (!removeEmptyGroup((GroupDataHandle)h)) { //check for empty groups -- if there are any, reuse them //maybe try to evict -- may be not possible dbg(DBG_USR1,"Error! Couldn't reallocate aggregate data!"); SIG_ERR(err_OutOfMemory); } } newGroup = addGroup((GroupDataHandle)h, mCurGroup); initAggregateValue(newGroup, mCurExpr, TRUE); (*mCallback)(newGroup); return SUCCESS; } event result_t MemAlloc.compactComplete() { return SUCCESS; } //binary search to locate the group record for the specified //group num GroupRecord *findGroup(GroupDataHandle dh, short groupNum) { short min = 0, max = (**dh).numGroups; GroupRecord *gr; if (max == 0) return NULL; // no groups while (TRUE) { gr = GET_GROUP_DATA(dh, min); if (gr->groupNo == groupNum) return gr; if (max == (min + 1)) break; if (gr->groupNo > groupNum) max = max - ((max - min) >> 1); else min = min + ((max - min) >> 1); } return NULL; } //scan the list of groups and remove one that is empty //return true if successful, false if there are no //emtpy groups bool removeEmptyGroup(GroupDataHandle dh) { short i, lastEmpty; bool found = FALSE; GroupRecord *gr; //scan backwards, looking for an empty group for (lastEmpty = (**dh).numGroups - 1; lastEmpty >= 0; lastEmpty--) { gr = GET_GROUP_DATA(dh,lastEmpty); if (gr->u.empty) { found = TRUE; break; } } if (!found) return FALSE; dbg(DBG_USR1,"found empty = %d\n", lastEmpty); //now shift everything after that group up one for (i = lastEmpty + 1; i < (**dh).numGroups; i++) { gr = GET_GROUP_DATA(dh,i); SET_GROUP_DATA(dh,i-1,(char *)&gr); } (**dh).numGroups--; return TRUE; } //add a group to a group data handle that has been realloced to be big enough to hold //the group record (we assume the new space is at the end of the data block) GroupRecord *addGroup(GroupDataHandle dh, short groupNum) { short i; bool shift = FALSE, first = FALSE; GroupRecord *gr,lastgr,newgr,tempgr,*ret=NULL; newgr.groupNo = groupNum; //do a simple insertion sort (**dh).numGroups++; for (i = 0; i < (**dh).numGroups; i++) { gr = GET_GROUP_DATA(dh,i); //did we find the place to insert? if ((!shift && gr->groupNo > groupNum) || (i+1 == (**dh).numGroups)) { lastgr = newgr; //yup shift = TRUE; first=TRUE; } if (shift) { //have we already inserted? tempgr = *gr; //move up the current record SET_GROUP_DATA(dh,i,(char *)&lastgr); lastgr = tempgr; if (first) { first=FALSE; ret = GET_GROUP_DATA(dh,i); } } } if (ret == NULL) { dbg(DBG_USR1,"ERROR: Retval is NULL on addGroup!\n");//fflush(stdout); } return ret; } //locate or allocate the group data for the group that t should update, //and invoke the callback with that data. void getGroupData(ParsedQuery *pq, short groupNo , Expr *e, GroupDataCallback callback) { GroupDataHandle dh = (GroupDataHandle)e->opState; mCallback = callback; mCurExpr = e; mCurQuery = pq; mCurGroup = groupNo; dbg(DBG_USR1, "In getGroupData, groupNo = %d, dh = %x\n", groupNo, dh); if (dh == NULL) { //we've got to allocate this baby mAlloced = (Handle) &e->opState; //ick if (call MemAlloc.allocate((HandlePtr)&e->opState, groupSize(e) + sizeof(GroupData)) == FAIL) { signal MemAlloc.allocComplete((Handle *)&e->opState, FALSE); } } else { GroupRecord *gr; //scan through it, looking to see if the needed group is there gr = findGroup(dh, groupNo); mGroupRecord = gr; //decouple so that we don't immediately return (yuck yuck!) if (gr != NULL) (*(callback))(gr); else { //group doesn't exist -- must realloc and continue mAlloced = (Handle) e->opState; if (call MemAlloc.reallocate((Handle)e->opState, groupSize(e) * ((**dh).numGroups + 1) + sizeof(GroupData)) == FAIL) //failure { signal MemAlloc.reallocComplete((Handle)e->opState, FALSE); } } } } task void fireCallback() { (*mCallback)(mGroupRecord); } short getGroupNo(Tuple *t, ParsedQuery *q, Expr *e) { char *fieldBytes; short size; short fieldVal = 0; short i; if (e->ex.agg.groupingField == (short)kNO_GROUPING_FIELD) return 0; //we're not using a group! fieldBytes = call TupleIntf.getFieldPtr(q, t, (char)e->ex.agg.groupingField); size = call TupleIntf.fieldSize(q, (char)e->ex.agg.groupingField); for (i = 0; i < size; i++) { unsigned char b = (*fieldBytes++); fieldVal += ((unsigned short)b)<<(i*8); } return (call ExprEval.evaluateGroupBy(e->ex.agg, fieldVal)); //(fieldVal) >> e->ex.agg.attenuation); //group number is attenuated by some number of bits } /** @return The group number from this query result */ command short getGroupNoFromQr(QueryResult *qr) { GroupRecord *gr = (GroupRecord *)qr->d.data; return gr->groupNo; } //compare two aggregate records, determine if they are equal bool aggEqual(GroupRecord *r1, GroupRecord *r2, Expr *e) { short size = groupSize(e); char *b1 = r1->aggdata; char *b2 = r2->aggdata; while (size--) if (*b1++ != *b2++) return FALSE; return TRUE; } /* ------------------------------------- Aggregation Operator Specific Commands ------------------------------------------- *//* Return the amount of storage required for an aggregate of the specified group Note that this will not generalize to support variable size (e.g. holistic aggregates)*/ short groupSize(Expr *e) { GroupRecord g; short base; ParamVals paramVals; setParamValues(¶mVals,e); base = sizeof(g) - sizeof(g.aggdata[0]); return base + call AggregateUse.stateSize(e->ex.agg.op, NULL, ¶mVals); } /* Given two aggregate records, merge them together into dest. */ void mergeAggregateValues(GroupRecord *dest, GroupRecord *merge, Expr *e) { ParamVals paramVals; setParamValues(¶mVals,e); dest->u.empty = FALSE; // the following is ugly if (e->ex.agg.op == kADP_DELTA) { //set up current epoch as an arg setCurrentEpoch(¶mVals, mCurQuery->currentEpoch); } call AggregateUse.merge(e->ex.agg.op, dest->aggdata, merge->aggdata, NULL, ¶mVals); } /* Given an aggregate value and a group, merge the value into the group */ void updateAggregateValue(GroupRecord *d, Expr *e, short fieldValue) { ParamVals paramVals; setParamValues(¶mVals,e); //dbg(DBG_USR3, "Update called.\n"); fieldValue = call ExprEval.evaluate(e, fieldValue); d->u.empty = FALSE; // the following is ugly if (e->ex.agg.op == kADP_DELTA) { //set up current epoch as an arg setCurrentEpoch(¶mVals, mCurQuery->currentEpoch); } call AggregateUse.update(e->ex.agg.op, d->aggdata, (char *)&fieldValue, NULL, ¶mVals); } /* Initialize the value of the specified aggregate value. */ void initAggregateValue(GroupRecord *d, Expr *e, bool isFirstTime) { ParamVals paramVals; setParamValues(¶mVals,e); //dbg(DBG_USR3, "Init called.\n"); d->u.empty = TRUE; call AggregateUse.init(e->ex.agg.op, d->aggdata, NULL, ¶mVals, isFirstTime); } /* Return true if this aggregate has data ready right now -- some aggregates, such as windowed averages, only produce data at the end of several epochs. */ bool aggHasData(GroupRecord *gr, Expr *e) { ParamVals paramVals; setParamValues(¶mVals, e); return call AggregateUse.hasData(e->ex.agg.op, gr->aggdata, NULL, ¶mVals); } command TinyDBError finalizeAggExpr(QueryResult *qr, ParsedQueryPtr q, Expr *e, char *result_buf) { GroupRecord *gr = (GroupRecord *)qr->d.data; ParamVals paramVals; setParamValues(¶mVals,e); //dbg(DBG_USR3, "Finalize Aggr Expr called.\n"); return call AggregateUse.finalize(e->ex.agg.op, gr->aggdata, result_buf, NULL, ¶mVals); } /***************************************************************************** * Functions for optimizing aggregates result routing ****************************************************************************/ /* * Returns TRUE if local result can affect final result given snooped result */ bool localDataAffectsResult(GroupRecord *local, GroupRecord *snooped, Expr *e) { ParamVals paramVals; AggregateProperties properties; int16_t resultBefore, resultAfter; setParamValues(¶mVals,e); properties = call AggregateUse.getProperties(e->ex.agg.op); if (isMonotonic(properties) && isExemplary(properties)) {// MIN, MAX-like aggregates //finalize snooped // using ints instead of buffers is a dirty hack. OK FOR NOW? call AggregateUse.finalize(e->ex.agg.op, snooped->aggdata, (char *) &resultBefore, NULL, ¶mVals); //aplly merge(snooped, local). Notice snooped is destination call AggregateUse.merge(e->ex.agg.op, snooped->aggdata, local->aggdata, NULL, ¶mVals); //finalize snooped again call AggregateUse.finalize(e->ex.agg.op, snooped->aggdata, (char *) &resultAfter, NULL, ¶mVals); //if result changed, local data affects it! return (resultBefore != resultAfter); } return TRUE; } }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?