📄 tuplerouterm.nc
字号:
1) process it, if is an aggregate result<p> or<p> 2) forward it, if it is a non-aggregate result<p> @param msg The message that was received */ event result_t Network.dataSub(QueryResultPtr qrMsg) { QueryResult qr; ParsedQuery *q; bool gotAgg = FALSE; bool pending; // call Leds.greenToggle(); if (getQuery(call QueryResultIntf.queryIdFromMsg(qrMsg), &q)) { //if this query is going to be deleted, reset the counter until //deletion since we're still hearing neighbors results about it... dbg(DBG_USR2, "Got query result for query %d\n", q->qid); if (q->markedForDeletion) { q->markedForDeletion = EPOCHS_TIL_DELETION; } call QueryResultIntf.fromBytes(qrMsg, &qr, q); //now determine where to route this result to -- either an //aggregation operator or to our parent gotAgg = q->hasAgg; if (!gotAgg) { //didn't give to an aggregate, so just pass it on... TinyDBError err; dbg(DBG_USR2, "forwarding result for query %d to buffer %d\n", q->qid, q->bufferId); mEnqResult = *(QueryResultPtr)qrMsg; err = call DBBuffer.enqueue(q->bufferId, &mEnqResult, &pending, q); // mSendingResult = FALSE; // if this is true after enqueue, we'll // post the sendTuplesTask when send finishes, which we // may not want to do //ignore result buffer busy items for now, since they just mean //we can't keep up with the sample rate the user is requested , but we //shouldn't abort if (err != err_ResultBufferBusy && err != err_NoError) SIG_ERR(err); } else { //got an agg -- do all the aggregation expressions mResult = qr;#ifdef HSN_ROUTING mNumMerges++;#endif if (!IS_AGGREGATING_RESULT()) //don't double aggregate! aggregateResult(q, &mResult, 0); } } return SUCCESS; } /** Apply all aggregate operators to this result. Apply them one after another, starting with exprId. <p> This is called from TUPLE_ROUTER_RESULT_MESSAGE and from AGGREGATED_RESULT_EVENT @param q The query that the result applies to @param qr The query result @param exprID The expression to apply to qr*/ void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId) { Expr *e; if (exprId >= q->numExprs) { //no more aggregation expressions UNSET_AGGREGATING_RESULT(); return; } e = call ParsedQueryIntf.getExprPtr(q,exprId); if (e->opType != kSEL) { SET_AGGREGATING_RESULT(); if (call AggOperator.processPartialResult(qr, q, e) != err_NoError) { UNSET_AGGREGATING_RESULT(); //error, just do the next one //(errors may just mean the result doesn't apply to the agg) aggregateResult(q,qr,exprId+1); } } else aggregateResult(q,qr,exprId+1); //move on to the next one } /* --------------------------------- Timer Events ---------------------------------*/ /** Adjust the rate that the main tuple router clock fires at based on EPOCH DURATION of all of the queries that are currently installed */ void setSampleRate() { QueryListHandle qlh; short rate = -1; //walk through queries, choose lowest sample rate/* qlh = mQs; *//* while (qlh != NULL) { *//* if (rate == -1) *//* rate = (**qlh).q.epochDuration; *//* else *//* rate = gcd((**qlh).q.epochDuration,rate); *//* qlh = (QueryListHandle)(**qlh).next; *//* } */ /* //throttle rate to maximum *//* if (rate <= MIN_SAMPLE_RATE) { *//* // rate = gcd(MIN_SAMPLE_RATE,rate); *//* rate = MIN_SAMPLE_RATE; *//* } *//* dbg(DBG_USR3,"rate = %d\n", rate); //fflush(stdout); */ //HACK rate = kMS_PER_CLOCK_EVENT; //hardcode! //now set the rate at which we have to deliver tuples to each query //as a multiple of this rate qlh = mQs;#ifdef qSNOOZE //check to see if the new query is the only query or is exactly //the same rate as the previous queries if (qlh != NULL && (**qlh).next == NULL || (qlh != NULL && mAllQueriesSameRate && (**((QueryListHandle)(**qlh).next)).q.epochDuration == (**qlh).q.epochDuration)) { mAllQueriesSameRate = TRUE; } else mAllQueriesSameRate = FALSE;#endif while (qlh != NULL) { if ((**qlh).q.epochDuration == kONE_SHOT) { //read it fast! (**qlh).q.clocksPerSample = 16; (**qlh).q.curResult = 0; } else (**qlh).q.clocksPerSample = (uint16_t)((((uint32_t)(**qlh).q.epochDuration * kBASE_EPOCH_RATE) / (uint32_t)rate)); atomic { //if (mQueryClockCount != 0) { //(**qlh).q.clockCount = mQueryClockCount; //mQueryClockCount = 0; //} else {#ifdef qSNOOZE if ((**qlh).q.clocksPerSample > WAKING_CLOCKS) { (**qlh).q.clockCount = WAKING_CLOCKS; } else {#endif (**qlh).q.clockCount = (**qlh).q.clocksPerSample; //reset counter#ifdef qSNOOZE } #endif //} } qlh = (QueryListHandle)(**qlh).next; } if (rate != mOldRate) { //restart the clock if rate changed // tos_time_t cur_time = call Time.get(); // uint32_t distFromPrev; mOldRate = rate; mIsRunning = TRUE; signal AbsoluteTimer.fired(); } } /** Find the GCD of two non-negative integers @param a The first integer @param b The secnd integer @return the GCD of a and b */ short gcd(short a, short b) { short r = -1, temp; if (a > b) { temp = a; a = b; b = temp; } while (TRUE) { r = b % a; if (r == 0) break; b = a; a = r; } return a; } /** Set the EPOCH DURATION of the specified query to the specifed value. @param qid The query that needs its sample rate adjusted @param epochDur The new epoch duration for qid */ void setRate(uint8_t qid, uint16_t epochDur) { ParsedQuery *q; if (getQuery(qid, &q)) { q->clocksPerSample = (uint16_t)((((uint32_t)epochDur * kBASE_EPOCH_RATE) / (uint32_t)mOldRate)); } } /** Make all queries sample less frequently Note: We guarantee that slowDownSampling(); speedUpSampling(); will leave the sample rate unchanged. */ void slowDownSampling() { QueryListHandle qlh = mQs; while (qlh != NULL) { (**qlh).q.clocksPerSample += kBASE_EPOCH_RATE; qlh = (QueryListHandle)(**qlh).next; } } /** Make all queries sample more frequently */ void speedUpSampling() { QueryListHandle qlh = mQs; while (qlh != NULL) { if ((**qlh).q.clocksPerSample > 1) (**qlh).q.clocksPerSample -= kBASE_EPOCH_RATE; qlh = (QueryListHandle)(**qlh).next; } } /** Compute the appropriate sample rate .. */ enum { msXmit = 32, mahCapacity = 5800, maxVReading = 985, minVReading = 370, Vdraw = 3, sPerSample = 1 }; uint32_t uaActive, uaXmit, uaSleep; void initConsts() { uaActive = 16900; uaXmit = 17320; uaSleep = 220; }void computeRates(uint32_t lifetimeHoursRem, uint32_t curVReading, uint32_t ujSampleCost, uint32_t ujAggCost, uint32_t numMsgs, uint32_t numSamples, uint32_t *epochDur, //on exit, appropriate epoch duration uint16_t *aggWinSize, //on exit, aggregate window size uint16_t *mvPerDay // on exit, number of raw voltage units that we expect to consume per day ) { uint32_t ujXmitCost= ((uaXmit * msXmit * Vdraw))/(1000); uint32_t mahRemaining = ((curVReading - minVReading)* mahCapacity)/(maxVReading - minVReading); uint32_t uaAvg = ((mahRemaining * 1000)/lifetimeHoursRem); uint32_t uaAvgActive = (ujSampleCost*numSamples + ujXmitCost*numMsgs + ujAggCost*numSamples)/(Vdraw * sPerSample) + uaActive; uint32_t dutyCycle = ((uaAvg - uaSleep)*100000)/(uaAvgActive - uaSleep); if (uaAvg < uaSleep) *epochDur = -1; else { *epochDur = (sPerSample * 100000 * 1000)/(dutyCycle); if (*epochDur < sPerSample * 1000) *epochDur = sPerSample * 1000; } dbg(DBG_USR2, "epoch dur set to %d", *epochDur); *aggWinSize = 1; *mvPerDay = ((curVReading - minVReading)*24*1000)/(lifetimeHoursRem); //note that this assumes we don't do any reduction in communication via aggregation } void updateStatistics(uint16_t deltaV, uint16_t elapsedHours, uint16_t mvPerDay) { uint16_t actualVPerDay = (deltaV * 24)/(elapsedHours); uint16_t ratio = (mvPerDay * 100 )/ (actualVPerDay * 1000); uaActive = (uaActive * 100)/ratio; uaXmit = (uaXmit * 100)/ratio; uaSleep = (uaSleep * 100)/ratio; dbg(DBG_USR2, "new uaActive = %d, uaXmit = %d, uaSleep = %d\n", uaActive, uaXmit, uaSleep); } SchemaErrorNo setSampleRateForLifetime(uint8_t qid, uint16_t lifetimeHours) { SchemaErrorNo errNo; errNo = SCHEMA_SUCCESS; if (IS_SETTING_SAMPLE_RATE()) { errNo = SCHEMA_ERROR; goto done; } SET_SETTING_SAMPLE_RATE(); mLifetime = lifetimeHours; mCurSampleRateQuery = qid;#if !defined(PLATFORM_PC) if (call AttrUse.getAttrValue("voltage", (char *)&mVoltage, &errNo) == FAIL) { errNo = SCHEMA_ERROR; goto fail; } if (errNo == SCHEMA_SUCCESS) continueSetSampleRate(); else if (errNo != SCHEMA_RESULT_PENDING) goto fail;#else mVoltage = 900; continueSetSampleRate();#endif goto done;#if !defined(PLATFORM_PC) fail: UNSET_SETTING_SAMPLE_RATE();#endif done: return errNo; } void continueSetSampleRate() { uint32_t epochDur; uint16_t aggWinSize; uint16_t mvPerDay; computeRates(mLifetime, mVoltage, 0, 0, 1, 1, &epochDur, &aggWinSize, &mvPerDay); dbg(DBG_USR2, "computed epoch dur for query %d = %d\n", mCurSampleRateQuery, epochDur); setRate(mCurSampleRateQuery, (short)(epochDur & 0x00007FFF)); UNSET_SETTING_SAMPLE_RATE();#ifdef kLIFE_CMD if (mLifetimeCommandPending) { call SetLifetimeCommand.commandDone("life", NULL, SCHEMA_SUCCESS); mLifetimeCommandPending = FALSE; }#endif } /** Clock fired event --<br> Works as follows:<br> 1) Output tuples from previous epochs<br> 2) Deterimine what queries fire in this epoch<br> 3) Collect samples from those queries<br> 4) Fill in the tuples in those queries<br> 5) Apply operators to those tuples<br> <p> While this is happening, results may arrive from other sensors nodes representing results from the last epoch. Those results need to be forwarded (if we're just selection), or stored (if we're aggregating) <p> Question: What to do if next time event goes off before this one is complete? Right now, this we'll simply ignore later events if this one hasn't finished processing <p> As of 10/5/2002, moved main body into a task.*/event result_t AbsoluteTimer.fired() { if (mIsRunning) {#ifdef qSNOOZE //set up the service scheduler on our first time through here if (mAllQueriesSameRate && !IS_SNOOZING() && mQs != NULL) { result_t r; ParsedQueryPtr pq = &(**mQs).q; tos_time_t t = call Time.get(); tos_service_schedule sched; short sleepClocks = (pq->clocksPerSample); // long sleepUs = (sleepClocks * mOldRate); //don't sleep if the clock rate is fast if (pq->clocksPerSample <= WAKING_CLOCKS) { mStopped = FALSE; goto dontSleep; } //sched.start_time = call TimeUtil.addUint32(t,sleepUs); //start will be called at the beginning of the epoch t.low32 += ((int32_t)pq->clocksPerSample * kMS_PER_CLOCK_EVENT) - (t.low32 % ((int32_t)pq->clocksPerSample* kMS_PER_CLOCK_EVENT)); sched.start_time = t; //call TimeUtil.addUint32(t,1); //start now -- ??? changed to 1, added + 1 sched.on_time = (((uint32_t)WAKING_CLOCKS + 1)* (uint32_t)mOldRate); //and stop will be called after WAKING_CLOCKS sched.off_time =(((uint32_t)sleepClocks - ((uint32_t)WAKING_CLOCKS + 1)) * (uint32_t)mOldRate); SET_SNOOZING(); r = call ServiceScheduler.reschedule(kTINYDB_SERVICE_ID, sched); //wait til this fires before we start... //synchronize with whoever sent us this query /* TS if (mCurSchedTime != 0) { call TimeUtil.addint32(t, mCurSchedTime); call ServiceScheduler.setNextEventTime(t); mCurSchedTime = 0; } */ } else {#endif dontSleep: if (!mStopped) { tos_time_t cur_time = call Time.get(); uint32_t rateUs = ((uint32_t)mOldRate); //convert to microsecs // call Leds.redToggle(); cur_time.low32 -= (cur_time.low32 % rateUs); cur_time.low32 += rateUs; //schedule for this time in the future atomic { call AbsoluteTimer.set(cur_time); } decrementQueryCounter(); po
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -