⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aggoperator.nc

📁 用于传感器网络的节点操作系统 TinyOS 结构设计非常有意思
💻 NC
📖 第 1 页 / 共 2 页
字号:
    }    return NULL;  }  //scan the list of groups and remove one that is empty  //return true if successful, false if there are no  //emtpy groups  bool removeEmptyGroup(GroupDataHandle dh) {    short i, lastEmpty;    bool found = FALSE;    GroupRecord *gr;    //scan backwards, looking for an empty group    for (lastEmpty = (**dh).numGroups - 1; lastEmpty >= 0; lastEmpty--) {      gr = GET_GROUP_DATA(dh,lastEmpty);      if (gr->u.empty) {	found = TRUE;	break;      }    }    if (!found) return FALSE;    dbg(DBG_USR1,"found empty = %d\n", lastEmpty);    //now shift everything after that group up one    for (i = lastEmpty + 1; i < (**dh).numGroups; i++) {      gr = GET_GROUP_DATA(dh,i);      SET_GROUP_DATA(dh,i-1,(char *)&gr);    }    (**dh).numGroups--;    return TRUE;  }  //add a group to a group data handle that has been realloced to be big enough to hold  //the group record (we assume the new space is at the end of the data block)  GroupRecord *addGroup(GroupDataHandle dh, short groupNum) {    short i;    bool shift = FALSE, first = FALSE;    GroupRecord *gr,lastgr,newgr,tempgr,*ret=NULL;      newgr.groupNo = groupNum;    //do a simple insertion sort    (**dh).numGroups++;      for (i = 0; i < (**dh).numGroups; i++) {      gr = GET_GROUP_DATA(dh,i);      //did we find the place to insert?      if ((!shift && gr->groupNo > groupNum) || (i+1 == (**dh).numGroups)) {	lastgr = newgr; //yup	shift = TRUE;	first=TRUE;      }          if (shift) {  //have we already inserted?	tempgr = *gr;  //move up the current record	SET_GROUP_DATA(dh,i,(char *)&lastgr);	lastgr = tempgr;	if (first) {	  first=FALSE;	  ret = GET_GROUP_DATA(dh,i);	}      }          }    if (ret == NULL) {      dbg(DBG_USR1,"ERROR: Retval is NULL on addGroup!\n");//fflush(stdout);    }    return ret;  }  //locate or allocate the group data for the group that t should update,  //and invoke the callback with that data.  void getGroupData(ParsedQuery *pq, short groupNo , Expr *e, GroupDataCallback callback) {    GroupDataHandle dh = (GroupDataHandle)e->opState;    mCallback = callback;      mCurExpr = e;    mCurQuery = pq;    mCurGroup = groupNo;      if (dh == NULL) {      //we've got to allocate this baby      mAlloced = (Handle) &e->opState; //ick      call MemAlloc.allocate((HandlePtr)&e->opState, groupSize(e) + sizeof(GroupData));    } else {      GroupRecord *gr;      //scan through it, looking to see if the needed group is there      gr = findGroup(dh, groupNo);      mGroupRecord = gr;      //decouple so that we don't immediately return (yuck yuck!)      if (gr != NULL) (*(callback))(gr);      else {	//group doesn't exist -- must realloc and continue	mAlloced = (Handle) e->opState;		if (call MemAlloc.reallocate((Handle)e->opState, groupSize(e) * ((**dh).numGroups + 1) + sizeof(GroupData)) == FAIL) //failure	  {	    signal MemAlloc.reallocComplete((Handle)e->opState, FALSE);	  }            }    }  }    task void fireCallback() {      (*mCallback)(mGroupRecord);    }  short getGroupNo(Tuple *t, ParsedQuery *q, Expr *e) {    char *fieldBytes;    short size;    short fieldVal = 0;    short i;    if (e->ex.agg.groupingField == (short)kNO_GROUPING_FIELD) return 0; //we're not using a group!        fieldBytes = call TupleIntf.getFieldPtr(q, t, (char)e->ex.agg.groupingField);    size = call TupleIntf.fieldSize(q, (char)e->ex.agg.groupingField);    for (i = 0; i < size; i++) {      unsigned char b = (*fieldBytes++);      fieldVal += ((unsigned short)b)<<(i*8);        }    return (call ExprEval.evaluateGroupBy(e->ex.agg, fieldVal));    //(fieldVal) >> e->ex.agg.attenuation); //group number is attenuated by some number of bits  }  //compare two aggregate records, determine if they are equal  bool aggEqual(GroupRecord *r1, GroupRecord *r2, Expr *e) {    short size = groupSize(e);    char *b1 = (char *)&r1->d;    char *b2 = (char *)&r2->d;    while (size--)       if (*b1++ != *b2++) return FALSE;    return TRUE;  }  /* ------------------------------------- Aggregation Operator Specific Commands ------------------------------------------- *//* Return the amount of storage required for an aggregate of the specified group   Note that this will not generalize to support variable size (e.g. holistic aggregates)*/  short groupSize(Expr *e) {    GroupRecord g;    short base = sizeof(g) - sizeof(g.d);    switch (e->ex.agg.op) {    case SUM:    case MIN:    case MAX:    case COUNT:    case EXP_AVG:    case WIN_MIN:    case WIN_MAX:    case WIN_SUM:    case WIN_CNT:      return base + sizeof(AlgebraicData);      break;    case WIN_AVG:    case AVERAGE:      return base + sizeof(AverageData);      break;    case MIN3:      return base + sizeof(Min3Data);    }    return 0;  }  /* Given two aggregate records, merge them together into dest. */  void mergeAggregateValues(GroupRecord *dest, GroupRecord *merge, Expr *e) {    short i,j, k;    dest->u.empty = FALSE;    switch (e->ex.agg.op) {    case SUM:    case WIN_SUM:      dest->d.algd.value += merge->d.algd.value;      break;    case MIN:    case WIN_MIN:      if (dest->d.algd.value > merge->d.algd.value) {	dest->d.algd.value = merge->d.algd.value;	dest->d.algd.id = merge->d.algd.id;      }      break;    case MAX:    case WIN_MAX:      if (dest->d.algd.value < merge->d.algd.value) {	dest->d.algd.value = merge->d.algd.value;	dest->d.algd.id = merge->d.algd.id;      }      break;    case COUNT:    case WIN_CNT:      dest->d.algd.value += merge->d.algd.value;      break;    case WIN_AVG:    case AVERAGE:      dest->d.avgd.sum += merge->d.avgd.sum;      dest->d.avgd.count += merge->d.avgd.count;      break;    case MIN3:      i = 0; j = 0;      //loop through dest, filling in 3 slots with      //top three values from merge or dest      while (i < 3) {	if (dest->d.min3d.mins[i] > merge->d.min3d.mins[j]) {	  //shift up dest	  for (k = i+1; k < 3; k++) {	    dest->d.min3d.mins[k] = dest->d.min3d.mins[k-1];	    dest->d.min3d.ids[k] = dest->d.min3d.ids[k-1];	  }	  //copy top el of merge into top of dest	  dest->d.min3d.mins[i] = merge->d.min3d.mins[j];	  dest->d.min3d.ids[i] = merge->d.min3d.ids[j];	  j++; //move to next el of merge	} //otherwise, top of dest is min	i++; //always move to next el of dest      }      break;    case EXP_AVG:       {	uint8_t newBits = 16 - e->ex.tagg.u.newBitsPerSample;	dest->d.algd.value = (dest->d.algd.value - (dest->d.algd.value >> newBits)) + (merge->d.algd.value >> newBits);	break;      }    }  }  /* Given an aggregate value and a group, merge the value into the group */  void updateAggregateValue(GroupRecord *d, Expr *e, short fieldValue) {    short i, j;        fieldValue = call ExprEval.evaluate(e, fieldValue);    //fieldValue /= 10;    d->u.empty = FALSE;    switch (e->ex.agg.op) {    case SUM:    case WIN_SUM:      d->d.algd.value += fieldValue;      break;    case MIN:    case WIN_MIN:      if (d->d.algd.value > fieldValue) {	d->d.algd.value = fieldValue;	d->d.algd.id = TOS_LOCAL_ADDRESS;      }      break;    case MAX:    case WIN_MAX:      if (d->d.algd.value < fieldValue) {	d->d.algd.value = fieldValue;	d->d.algd.id = TOS_LOCAL_ADDRESS;      }      break;    case COUNT:    case WIN_CNT:      d->d.algd.value++;      break;    case AVERAGE:    case WIN_AVG:      d->d.avgd.sum += fieldValue;      d->d.avgd.count ++;      break;    case MIN3:      for (i = 0; i < 3; i++) {	if (d->d.min3d.mins[i] > fieldValue) {	  //shift up	  for (j = i+1; j < 3; j++) {	    d->d.min3d.mins[j] = d->d.min3d.mins[j-1];	    d->d.min3d.ids[j] = d->d.min3d.ids[j-1];	  }	  d->d.min3d.mins[i] = fieldValue;	  d->d.min3d.ids[i] = TOS_LOCAL_ADDRESS;	  break; //once it's inserted, quit!	}      }      break;    case EXP_AVG:       {	uint8_t newBits = 16 - e->ex.tagg.u.newBitsPerSample;	d->d.algd.value = (d->d.algd.value - (d->d.algd.value >> newBits)) + (fieldValue >> newBits);	break;      }    }  }  /* Initialize the value of the specified aggregate value. */  void initAggregateValue(GroupRecord *d, Expr *e) {    short i;    d->u.empty = TRUE;    switch (e->ex.agg.op) {    case SUM:      d->d.algd.value = 0;      break;    case MIN:      d->d.algd.value = kMAX_SHORT;      d->d.algd.id = TOS_LOCAL_ADDRESS;      break;    case MAX:      d->d.algd.value = kMIN_SHORT;      d->d.algd.id = TOS_LOCAL_ADDRESS;      break;    case COUNT:      d->d.algd.value = 0;      break;    case AVERAGE:      d->d.avgd.sum = 0;      d->d.avgd.count = 0;      break;    case MIN3:      for (i = 0; i < 3; i++) {	d->d.min3d.ids[i] =  -1;	d->d.min3d.mins[i] = kMAX_SHORT;      }      break;    case EXP_AVG:      //don't do anything (never reset value!)      break;    case WIN_MIN:      if (e->ex.tagg.epochsLeft == 0) {	d->d.algd.value = kMAX_SHORT;	e->ex.tagg.epochsLeft = e->ex.tagg.u.epochsPerWindow;      } else	e->ex.tagg.epochsLeft--;      break;      break;    case WIN_MAX:      if (e->ex.tagg.epochsLeft == 0) {	d->d.algd.value = kMIN_SHORT;	e->ex.tagg.epochsLeft = e->ex.tagg.u.epochsPerWindow;      } else	e->ex.tagg.epochsLeft--;      break;    case WIN_SUM:    case WIN_CNT:      if (e->ex.tagg.epochsLeft == 0) {	d->d.algd.value = 0;	e->ex.tagg.epochsLeft = e->ex.tagg.u.epochsPerWindow;      } else	e->ex.tagg.epochsLeft--;      break;    case WIN_AVG:      if (e->ex.tagg.epochsLeft == 0) {	d->d.avgd.sum = 0;	d->d.avgd.count = 0;	e->ex.tagg.epochsLeft = e->ex.tagg.u.epochsPerWindow;      } else	e->ex.tagg.epochsLeft--;      break;    }      }    /* Return true if this aggregate has data ready right now     -- some aggregates, such as windowed averages, only     produce data at the end of several epochs.  */  bool aggHasData(QueryResult *qr, ParsedQueryPtr q, Expr *e) {    switch (e->ex.agg.op) {    case WIN_MIN: case WIN_MAX: case WIN_SUM: case WIN_CNT: case WIN_AVG:      if (e->ex.tagg.epochsLeft == 0 || TOS_LOCAL_ADDRESS == 0)	return TRUE;      else 	return FALSE;      break;    default:      return TRUE;    }  }  /** @return The group number from this query result */  command short getGroupNoFromQr(QueryResult *qr) {    GroupRecord *gr = (GroupRecord *)qr->d.data;        return gr->groupNo;  }  command TinyDBError finalizeAggExpr(QueryResult *qr, ParsedQueryPtr q, Expr *e, char *result_buf) {    GroupRecord *gr = (GroupRecord *)qr->d.data;    switch (e->ex.agg.op) {    case SUM:    case MIN:    case MAX:    case COUNT:    case EXP_AVG:    case WIN_MIN: case WIN_MAX: case WIN_SUM: case WIN_CNT:      *(short *)result_buf = gr->d.algd.value;      break;    case AVERAGE:    case WIN_AVG:      if (gr->d.avgd.count == 0)	*(short *)result_buf = 0;      else	*(short *)result_buf = gr->d.avgd.sum / gr->d.avgd.count;      break;    case MIN3:      break;    }    return err_NoError;  }}    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -