📄 aggoperator.nc
字号:
} return NULL; } //scan the list of groups and remove one that is empty //return true if successful, false if there are no //emtpy groups bool removeEmptyGroup(GroupDataHandle dh) { short i, lastEmpty; bool found = FALSE; GroupRecord *gr; //scan backwards, looking for an empty group for (lastEmpty = (**dh).numGroups - 1; lastEmpty >= 0; lastEmpty--) { gr = GET_GROUP_DATA(dh,lastEmpty); if (gr->u.empty) { found = TRUE; break; } } if (!found) return FALSE; dbg(DBG_USR1,"found empty = %d\n", lastEmpty); //now shift everything after that group up one for (i = lastEmpty + 1; i < (**dh).numGroups; i++) { gr = GET_GROUP_DATA(dh,i); SET_GROUP_DATA(dh,i-1,(char *)&gr); } (**dh).numGroups--; return TRUE; } //add a group to a group data handle that has been realloced to be big enough to hold //the group record (we assume the new space is at the end of the data block) GroupRecord *addGroup(GroupDataHandle dh, short groupNum) { short i; bool shift = FALSE, first = FALSE; GroupRecord *gr,lastgr,newgr,tempgr,*ret=NULL; newgr.groupNo = groupNum; //do a simple insertion sort (**dh).numGroups++; for (i = 0; i < (**dh).numGroups; i++) { gr = GET_GROUP_DATA(dh,i); //did we find the place to insert? if ((!shift && gr->groupNo > groupNum) || (i+1 == (**dh).numGroups)) { lastgr = newgr; //yup shift = TRUE; first=TRUE; } if (shift) { //have we already inserted? tempgr = *gr; //move up the current record SET_GROUP_DATA(dh,i,(char *)&lastgr); lastgr = tempgr; if (first) { first=FALSE; ret = GET_GROUP_DATA(dh,i); } } } if (ret == NULL) { dbg(DBG_USR1,"ERROR: Retval is NULL on addGroup!\n");//fflush(stdout); } return ret; } //locate or allocate the group data for the group that t should update, //and invoke the callback with that data. void getGroupData(ParsedQuery *pq, short groupNo , Expr *e, GroupDataCallback callback) { GroupDataHandle dh = (GroupDataHandle)e->opState; mCallback = callback; mCurExpr = e; mCurQuery = pq; mCurGroup = groupNo; if (dh == NULL) { //we've got to allocate this baby mAlloced = (Handle) &e->opState; //ick call MemAlloc.allocate((HandlePtr)&e->opState, groupSize(e) + sizeof(GroupData)); } else { GroupRecord *gr; //scan through it, looking to see if the needed group is there gr = findGroup(dh, groupNo); mGroupRecord = gr; //decouple so that we don't immediately return (yuck yuck!) if (gr != NULL) (*(callback))(gr); else { //group doesn't exist -- must realloc and continue mAlloced = (Handle) e->opState; if (call MemAlloc.reallocate((Handle)e->opState, groupSize(e) * ((**dh).numGroups + 1) + sizeof(GroupData)) == FAIL) //failure { signal MemAlloc.reallocComplete((Handle)e->opState, FALSE); } } } } task void fireCallback() { (*mCallback)(mGroupRecord); } short getGroupNo(Tuple *t, ParsedQuery *q, Expr *e) { char *fieldBytes; short size; short fieldVal = 0; short i; if (e->ex.agg.groupingField == (short)kNO_GROUPING_FIELD) return 0; //we're not using a group! fieldBytes = call TupleIntf.getFieldPtr(q, t, (char)e->ex.agg.groupingField); size = call TupleIntf.fieldSize(q, (char)e->ex.agg.groupingField); for (i = 0; i < size; i++) { unsigned char b = (*fieldBytes++); fieldVal += ((unsigned short)b)<<(i*8); } return (call ExprEval.evaluateGroupBy(e->ex.agg, fieldVal)); //(fieldVal) >> e->ex.agg.attenuation); //group number is attenuated by some number of bits } //compare two aggregate records, determine if they are equal bool aggEqual(GroupRecord *r1, GroupRecord *r2, Expr *e) { short size = groupSize(e); char *b1 = (char *)&r1->d; char *b2 = (char *)&r2->d; while (size--) if (*b1++ != *b2++) return FALSE; return TRUE; } /* ------------------------------------- Aggregation Operator Specific Commands ------------------------------------------- *//* Return the amount of storage required for an aggregate of the specified group Note that this will not generalize to support variable size (e.g. holistic aggregates)*/ short groupSize(Expr *e) { GroupRecord g; short base = sizeof(g) - sizeof(g.d); switch (e->ex.agg.op) { case SUM: case MIN: case MAX: case COUNT: case EXP_AVG: case WIN_MIN: case WIN_MAX: case WIN_SUM: case WIN_CNT: return base + sizeof(AlgebraicData); break; case WIN_AVG: case AVERAGE: return base + sizeof(AverageData); break; case MIN3: return base + sizeof(Min3Data); } return 0; } /* Given two aggregate records, merge them together into dest. */ void mergeAggregateValues(GroupRecord *dest, GroupRecord *merge, Expr *e) { short i,j, k; dest->u.empty = FALSE; switch (e->ex.agg.op) { case SUM: case WIN_SUM: dest->d.algd.value += merge->d.algd.value; break; case MIN: case WIN_MIN: if (dest->d.algd.value > merge->d.algd.value) { dest->d.algd.value = merge->d.algd.value; dest->d.algd.id = merge->d.algd.id; } break; case MAX: case WIN_MAX: if (dest->d.algd.value < merge->d.algd.value) { dest->d.algd.value = merge->d.algd.value; dest->d.algd.id = merge->d.algd.id; } break; case COUNT: case WIN_CNT: dest->d.algd.value += merge->d.algd.value; break; case WIN_AVG: case AVERAGE: dest->d.avgd.sum += merge->d.avgd.sum; dest->d.avgd.count += merge->d.avgd.count; break; case MIN3: i = 0; j = 0; //loop through dest, filling in 3 slots with //top three values from merge or dest while (i < 3) { if (dest->d.min3d.mins[i] > merge->d.min3d.mins[j]) { //shift up dest for (k = i+1; k < 3; k++) { dest->d.min3d.mins[k] = dest->d.min3d.mins[k-1]; dest->d.min3d.ids[k] = dest->d.min3d.ids[k-1]; } //copy top el of merge into top of dest dest->d.min3d.mins[i] = merge->d.min3d.mins[j]; dest->d.min3d.ids[i] = merge->d.min3d.ids[j]; j++; //move to next el of merge } //otherwise, top of dest is min i++; //always move to next el of dest } break; case EXP_AVG: { uint8_t newBits = 16 - e->ex.tagg.u.newBitsPerSample; dest->d.algd.value = (dest->d.algd.value - (dest->d.algd.value >> newBits)) + (merge->d.algd.value >> newBits); break; } } } /* Given an aggregate value and a group, merge the value into the group */ void updateAggregateValue(GroupRecord *d, Expr *e, short fieldValue) { short i, j; fieldValue = call ExprEval.evaluate(e, fieldValue); //fieldValue /= 10; d->u.empty = FALSE; switch (e->ex.agg.op) { case SUM: case WIN_SUM: d->d.algd.value += fieldValue; break; case MIN: case WIN_MIN: if (d->d.algd.value > fieldValue) { d->d.algd.value = fieldValue; d->d.algd.id = TOS_LOCAL_ADDRESS; } break; case MAX: case WIN_MAX: if (d->d.algd.value < fieldValue) { d->d.algd.value = fieldValue; d->d.algd.id = TOS_LOCAL_ADDRESS; } break; case COUNT: case WIN_CNT: d->d.algd.value++; break; case AVERAGE: case WIN_AVG: d->d.avgd.sum += fieldValue; d->d.avgd.count ++; break; case MIN3: for (i = 0; i < 3; i++) { if (d->d.min3d.mins[i] > fieldValue) { //shift up for (j = i+1; j < 3; j++) { d->d.min3d.mins[j] = d->d.min3d.mins[j-1]; d->d.min3d.ids[j] = d->d.min3d.ids[j-1]; } d->d.min3d.mins[i] = fieldValue; d->d.min3d.ids[i] = TOS_LOCAL_ADDRESS; break; //once it's inserted, quit! } } break; case EXP_AVG: { uint8_t newBits = 16 - e->ex.tagg.u.newBitsPerSample; d->d.algd.value = (d->d.algd.value - (d->d.algd.value >> newBits)) + (fieldValue >> newBits); break; } } } /* Initialize the value of the specified aggregate value. */ void initAggregateValue(GroupRecord *d, Expr *e) { short i; d->u.empty = TRUE; switch (e->ex.agg.op) { case SUM: d->d.algd.value = 0; break; case MIN: d->d.algd.value = kMAX_SHORT; d->d.algd.id = TOS_LOCAL_ADDRESS; break; case MAX: d->d.algd.value = kMIN_SHORT; d->d.algd.id = TOS_LOCAL_ADDRESS; break; case COUNT: d->d.algd.value = 0; break; case AVERAGE: d->d.avgd.sum = 0; d->d.avgd.count = 0; break; case MIN3: for (i = 0; i < 3; i++) { d->d.min3d.ids[i] = -1; d->d.min3d.mins[i] = kMAX_SHORT; } break; case EXP_AVG: //don't do anything (never reset value!) break; case WIN_MIN: if (e->ex.tagg.epochsLeft == 0) { d->d.algd.value = kMAX_SHORT; e->ex.tagg.epochsLeft = e->ex.tagg.u.epochsPerWindow; } else e->ex.tagg.epochsLeft--; break; break; case WIN_MAX: if (e->ex.tagg.epochsLeft == 0) { d->d.algd.value = kMIN_SHORT; e->ex.tagg.epochsLeft = e->ex.tagg.u.epochsPerWindow; } else e->ex.tagg.epochsLeft--; break; case WIN_SUM: case WIN_CNT: if (e->ex.tagg.epochsLeft == 0) { d->d.algd.value = 0; e->ex.tagg.epochsLeft = e->ex.tagg.u.epochsPerWindow; } else e->ex.tagg.epochsLeft--; break; case WIN_AVG: if (e->ex.tagg.epochsLeft == 0) { d->d.avgd.sum = 0; d->d.avgd.count = 0; e->ex.tagg.epochsLeft = e->ex.tagg.u.epochsPerWindow; } else e->ex.tagg.epochsLeft--; break; } } /* Return true if this aggregate has data ready right now -- some aggregates, such as windowed averages, only produce data at the end of several epochs. */ bool aggHasData(QueryResult *qr, ParsedQueryPtr q, Expr *e) { switch (e->ex.agg.op) { case WIN_MIN: case WIN_MAX: case WIN_SUM: case WIN_CNT: case WIN_AVG: if (e->ex.tagg.epochsLeft == 0 || TOS_LOCAL_ADDRESS == 0) return TRUE; else return FALSE; break; default: return TRUE; } } /** @return The group number from this query result */ command short getGroupNoFromQr(QueryResult *qr) { GroupRecord *gr = (GroupRecord *)qr->d.data; return gr->groupNo; } command TinyDBError finalizeAggExpr(QueryResult *qr, ParsedQueryPtr q, Expr *e, char *result_buf) { GroupRecord *gr = (GroupRecord *)qr->d.data; switch (e->ex.agg.op) { case SUM: case MIN: case MAX: case COUNT: case EXP_AVG: case WIN_MIN: case WIN_MAX: case WIN_SUM: case WIN_CNT: *(short *)result_buf = gr->d.algd.value; break; case AVERAGE: case WIN_AVG: if (gr->d.avgd.count == 0) *(short *)result_buf = 0; else *(short *)result_buf = gr->d.avgd.sum / gr->d.avgd.count; break; case MIN3: break; } return err_NoError; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -