⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tuplerouterm.nc

📁 nesC写的heed算法
💻 NC
📖 第 1 页 / 共 5 页
字号:
      1) process it, if is an aggregate result<p>        or<p>      2) forward it, if it is a non-aggregate result<p>      @param msg The message that was received  */  event result_t Network.dataSub(QueryResultPtr qrMsg) {    QueryResult qr;    ParsedQuery *q;    bool gotAgg = FALSE;    bool pending;    // call Leds.greenToggle();    if (getQuery(call QueryResultIntf.queryIdFromMsg(qrMsg), &q)) {      //if this query is going to be deleted, reset the counter until      //deletion since we're still hearing neighbors results about it...      dbg(DBG_USR2, "Got query result for query %d\n", q->qid);      if (q->markedForDeletion) {	q->markedForDeletion = EPOCHS_TIL_DELETION;      }      call QueryResultIntf.fromBytes(qrMsg, &qr, q);      //now determine where to route this result to -- either an      //aggregation operator or to our parent      gotAgg = q->hasAgg;      if (!gotAgg) { //didn't give to an aggregate, so just pass it on...	TinyDBError err;	dbg(DBG_USR2, "forwarding result for query %d to buffer %d\n", q->qid, q->bufferId);	mEnqResult = *(QueryResultPtr)qrMsg;	err = call DBBuffer.enqueue(q->bufferId, &mEnqResult, &pending, q);	// mSendingResult = FALSE; // if this is true after enqueue, we'll 	                        // post the sendTuplesTask when send finishes, which we 	                        // may not want to do	//ignore result buffer busy items for now, since they just mean	//we can't keep up with the sample rate the user is requested , but we	//shouldn't abort	if (err != err_ResultBufferBusy && err != err_NoError) SIG_ERR(err);      } else { //got an agg -- do all the aggregation expressions	mResult = qr;#ifdef HSN_ROUTING	mNumMerges++;#endif	if (!IS_AGGREGATING_RESULT()) //don't double aggregate!	  aggregateResult(q, &mResult, 0);      }    }     return SUCCESS;  }  /** Apply all aggregate operators to this result.    Apply them one after another, starting with exprId.    <p>    This is called from TUPLE_ROUTER_RESULT_MESSAGE and from    AGGREGATED_RESULT_EVENT    @param q The query that the result applies to    @param qr The query result    @param exprID The expression to apply to qr*/  void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId) {    Expr *e;    if (exprId >= q->numExprs) { //no more aggregation expressions      UNSET_AGGREGATING_RESULT();      return;    }    e = call ParsedQueryIntf.getExprPtr(q,exprId);    if (e->opType != kSEL) {      SET_AGGREGATING_RESULT();      if (call AggOperator.processPartialResult(qr, q, e) != err_NoError) {	UNSET_AGGREGATING_RESULT(); //error, just do the next one	//(errors may just mean the result doesn't apply to the agg)	aggregateResult(q,qr,exprId+1);      }    } else      aggregateResult(q,qr,exprId+1); //move on to the next one  }  /* --------------------------------- Timer Events ---------------------------------*/  /** Adjust the rate that the main tuple router clock fires at based      on EPOCH DURATION of all of the queries that are currently installed  */  void setSampleRate() {    QueryListHandle qlh;    short rate = -1;    //walk through queries, choose lowest sample rate/*      qlh = mQs; *//*      while (qlh != NULL) { *//*        if (rate == -1)  *//*  	rate = (**qlh).q.epochDuration; *//*        else  *//*  	rate = gcd((**qlh).q.epochDuration,rate); *//*        qlh = (QueryListHandle)(**qlh).next; *//*      } */  /*      //throttle rate to maximum *//*      if (rate <= MIN_SAMPLE_RATE) { *//*        //    rate = gcd(MIN_SAMPLE_RATE,rate); *//*        rate = MIN_SAMPLE_RATE; *//*      } *//*      dbg(DBG_USR3,"rate = %d\n", rate); //fflush(stdout); */      //HACK        rate = kMS_PER_CLOCK_EVENT; //hardcode!    //now set the rate at which we have to deliver tuples to each query    //as a multiple of this rate    qlh = mQs;#ifdef qSNOOZE    //check to see if the new query is the only query or is exactly    //the same rate as the previous queries    if (qlh != NULL && (**qlh).next == NULL ||	(qlh != NULL && mAllQueriesSameRate && (**((QueryListHandle)(**qlh).next)).q.epochDuration == (**qlh).q.epochDuration)) {      mAllQueriesSameRate = TRUE;    } else      mAllQueriesSameRate = FALSE;#endif    while (qlh != NULL) {      if ((**qlh).q.epochDuration == kONE_SHOT) { //read it fast!	(**qlh).q.clocksPerSample = 16;	(**qlh).q.curResult = 0;      } else	(**qlh).q.clocksPerSample = (uint16_t)((((uint32_t)(**qlh).q.epochDuration * kBASE_EPOCH_RATE) / (uint32_t)rate));            atomic {	//if (mQueryClockCount != 0) {	//(**qlh).q.clockCount = mQueryClockCount;	//mQueryClockCount = 0;	//} else {#ifdef qSNOOZE 	if ((**qlh).q.clocksPerSample > WAKING_CLOCKS) {	  (**qlh).q.clockCount = WAKING_CLOCKS;	} else {#endif	  (**qlh).q.clockCount = (**qlh).q.clocksPerSample; //reset counter#ifdef qSNOOZE 	}	#endif		//}      }      qlh = (QueryListHandle)(**qlh).next;    }  if (rate != mOldRate) { //restart the clock if rate changed      // tos_time_t cur_time = call Time.get();      // uint32_t distFromPrev;      mOldRate = rate;      mIsRunning = TRUE;      signal AbsoluteTimer.fired();    }  }  /** Find the GCD of two non-negative integers      @param a The first integer      @param b The secnd integer      @return the GCD of a and b  */  short gcd(short a, short b) {    short r = -1, temp;    if (a > b) {      temp = a;      a = b;      b = temp;    }      while (TRUE) {      r = b % a;      if (r == 0) break;      b = a;      a = r;    }    return a;  }    /** Set the EPOCH DURATION of the specified query to the specifed value.      @param qid The query that needs its sample rate adjusted      @param epochDur The new epoch duration for qid   */  void setRate(uint8_t qid, uint16_t epochDur) {    ParsedQuery *q;    if (getQuery(qid, &q)) {      q->clocksPerSample = (uint16_t)((((uint32_t)epochDur * kBASE_EPOCH_RATE) / (uint32_t)mOldRate));    }  }    /** Make all queries sample less frequently   Note: We guarantee that slowDownSampling(); speedUpSampling(); will leave   the sample rate unchanged.  */  void slowDownSampling() {    QueryListHandle qlh = mQs;    while (qlh != NULL) {      (**qlh).q.clocksPerSample += kBASE_EPOCH_RATE;      qlh = (QueryListHandle)(**qlh).next;    }  }  /** Make all queries sample more frequently */  void speedUpSampling() {    QueryListHandle qlh = mQs;    while (qlh != NULL) {      if ((**qlh).q.clocksPerSample > 1)	(**qlh).q.clocksPerSample -= kBASE_EPOCH_RATE;      qlh = (QueryListHandle)(**qlh).next;    }  }  /** Compute the appropriate sample rate .. */    enum {    msXmit = 32,    mahCapacity = 5800,    maxVReading = 985,    minVReading = 370,    Vdraw = 3,    sPerSample = 1  };    uint32_t uaActive, uaXmit, uaSleep;  void initConsts() {    uaActive = 16900;    uaXmit = 17320;    uaSleep = 220;  }void computeRates(uint32_t lifetimeHoursRem,		    uint32_t curVReading,		    uint32_t ujSampleCost,		    uint32_t ujAggCost,		    uint32_t numMsgs,		    uint32_t numSamples,		    uint32_t *epochDur, //on exit, appropriate epoch duration		    uint16_t *aggWinSize, //on exit, aggregate window size		    uint16_t *mvPerDay  // on exit, number of raw voltage units that we expect to consume per day		    ) {    uint32_t ujXmitCost= ((uaXmit * msXmit * Vdraw))/(1000);    uint32_t mahRemaining = ((curVReading - minVReading)* mahCapacity)/(maxVReading - minVReading);    uint32_t uaAvg = ((mahRemaining * 1000)/lifetimeHoursRem);    uint32_t uaAvgActive = (ujSampleCost*numSamples + ujXmitCost*numMsgs + ujAggCost*numSamples)/(Vdraw * sPerSample) + uaActive;    uint32_t dutyCycle = ((uaAvg - uaSleep)*100000)/(uaAvgActive - uaSleep);    if (uaAvg < uaSleep)      *epochDur = -1;    else {      *epochDur = (sPerSample * 100000 * 1000)/(dutyCycle);      if (*epochDur < sPerSample * 1000)	*epochDur = sPerSample * 1000;    }    dbg(DBG_USR2, "epoch dur set to %d", *epochDur);    *aggWinSize = 1;    *mvPerDay = ((curVReading - minVReading)*24*1000)/(lifetimeHoursRem);    //note that this assumes we don't do any reduction in communication via aggregation      }  void updateStatistics(uint16_t deltaV, uint16_t elapsedHours, uint16_t mvPerDay) {    uint16_t actualVPerDay = (deltaV * 24)/(elapsedHours);    uint16_t ratio = (mvPerDay * 100 )/ (actualVPerDay * 1000);    uaActive = (uaActive * 100)/ratio;    uaXmit  = (uaXmit * 100)/ratio;    uaSleep = (uaSleep * 100)/ratio;    dbg(DBG_USR2, "new uaActive = %d, uaXmit = %d, uaSleep = %d\n", uaActive, uaXmit, uaSleep);  }  SchemaErrorNo setSampleRateForLifetime(uint8_t qid, uint16_t lifetimeHours) {    SchemaErrorNo errNo;    errNo = SCHEMA_SUCCESS;    if (IS_SETTING_SAMPLE_RATE()) {      errNo = SCHEMA_ERROR;      goto done;    }    SET_SETTING_SAMPLE_RATE();    mLifetime = lifetimeHours;    mCurSampleRateQuery = qid;#if !defined(PLATFORM_PC)    if (call AttrUse.getAttrValue("voltage", (char *)&mVoltage, &errNo) == FAIL) {      errNo = SCHEMA_ERROR;      goto fail;    }    if (errNo == SCHEMA_SUCCESS)      continueSetSampleRate();    else if (errNo != SCHEMA_RESULT_PENDING)      goto fail;#else    mVoltage = 900;    continueSetSampleRate();#endif    goto done;#if !defined(PLATFORM_PC)  fail:    UNSET_SETTING_SAMPLE_RATE();#endif  done:    return errNo;  }  void continueSetSampleRate() {    uint32_t epochDur;    uint16_t aggWinSize;    uint16_t mvPerDay;    computeRates(mLifetime, mVoltage, 0, 0, 1, 1, &epochDur, &aggWinSize, &mvPerDay);    dbg(DBG_USR2, "computed epoch dur for query %d = %d\n", mCurSampleRateQuery, epochDur);    setRate(mCurSampleRateQuery, (short)(epochDur & 0x00007FFF));    UNSET_SETTING_SAMPLE_RATE();#ifdef kLIFE_CMD    if (mLifetimeCommandPending) {      call SetLifetimeCommand.commandDone("life", NULL, SCHEMA_SUCCESS);      mLifetimeCommandPending = FALSE;    }#endif  }  /** Clock fired event --<br>   Works as follows:<br>   1) Output tuples from previous epochs<br>   2) Deterimine what queries fire in this epoch<br>   3) Collect samples from those queries<br>   4) Fill in the tuples in those queries<br>   5) Apply operators to those tuples<br>   <p>   While this is happening, results may arrive from other sensors   nodes representing results from the last epoch.  Those results need   to be forwarded (if we're just selection), or stored (if we're aggregating)   <p>   Question:  What to do if next time event goes off before this one is   complete?  Right now, this we'll simply ignore later events if this   one hasn't finished processing   <p>   As of 10/5/2002, moved main body into a task.*/event result_t AbsoluteTimer.fired() {    if (mIsRunning) {#ifdef qSNOOZE      //set up the service scheduler on our first time through here      if (mAllQueriesSameRate && 	  !IS_SNOOZING() &&	  mQs != NULL) {	result_t r;		ParsedQueryPtr pq = &(**mQs).q;	tos_time_t t = call Time.get();	tos_service_schedule sched;	short sleepClocks = (pq->clocksPerSample);	// long sleepUs = (sleepClocks * mOldRate);	//don't sleep if the clock rate is fast	if (pq->clocksPerSample <= WAKING_CLOCKS) {	  mStopped = FALSE;	  goto dontSleep;	}		//sched.start_time = call TimeUtil.addUint32(t,sleepUs); //start will be called at the beginning of the epoch	t.low32 += ((int32_t)pq->clocksPerSample * kMS_PER_CLOCK_EVENT) - (t.low32 % ((int32_t)pq->clocksPerSample* kMS_PER_CLOCK_EVENT));	sched.start_time = t; //call TimeUtil.addUint32(t,1); //start now -- ??? changed to 1, added + 1	sched.on_time = (((uint32_t)WAKING_CLOCKS + 1)* (uint32_t)mOldRate); //and stop will be called after WAKING_CLOCKS	sched.off_time =(((uint32_t)sleepClocks - ((uint32_t)WAKING_CLOCKS + 1)) * (uint32_t)mOldRate);		SET_SNOOZING();		r = call ServiceScheduler.reschedule(kTINYDB_SERVICE_ID, sched); //wait til this fires before we start...	//synchronize with whoever sent us this query	/* TS	if (mCurSchedTime != 0) {	  call TimeUtil.addint32(t, mCurSchedTime);	  call ServiceScheduler.setNextEventTime(t);	  mCurSchedTime = 0;	}	*/      } else {#endif      dontSleep:	if (!mStopped) {	  	  tos_time_t cur_time = call Time.get();	  uint32_t rateUs = ((uint32_t)mOldRate); //convert to microsecs	  	  // call Leds.redToggle(); 	  cur_time.low32 -= (cur_time.low32 % rateUs);	  cur_time.low32 += rateUs; //schedule for this time in the future	  atomic {	    call AbsoluteTimer.set(cur_time);	  }	  decrementQueryCounter();	  po

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -