📄 tuplerouterm.nc
字号:
//rate is now the number of milliseconds between clock ticks //need to set the clock appropriately /* if (rate < 255) { */ /* TOS_CALL_COMMAND(CLOCK_INIT)(rate, ONE_MS_CLOCK_CONST); */ /* } else { */ /* rate >>= 3; */ /* rate += 1; */ /* rate &= 0x00FF; */ /* TOS_CALL_COMMAND(CLOCK_INIT)(rate, EIGHT_MS_CLOCK_CONST); */ /* } */ if (rate != mOldRate) { //restart the clock if rate changed mOldRate = rate; //WARNING: is this needed? // call Timer.init(); //stop timer 0 //make this a critical section too -- if a timer event goes off while this is happening, who knows what that means? prevInt = call Interrupt.disable(); call Timer.stop(); if (prevInt) call Interrupt.enable(); //restart it at the new rate //TOS_CALL_COMMAND(TUPLE_ROUTER_TIMER_START)(0,0,32); //timer for outputting results //TOS_CALL_COMMAND(TUPLE_ROUTER_TIMER_START)(1,0,rate); //timer for outputting results call Timer.start(TIMER_REPEAT, rate); } computeOutputRate(); } /** Determine how many communication slots there are in the shortest duration query -- this will determine how long we can afford to maximally wait before sending a result. */ void computeOutputRate() { QueryListHandle qlh = mQs; short minSlots = 0x7FFF; while (qlh != NULL) { if (minSlots > (**qlh).q.clocksPerSample) minSlots = (**qlh).q.clocksPerSample; qlh = (QueryListHandle)(**qlh).next; } mXmitSlots = minSlots; } /** Find the GCD of two non-negative integers @param a The first integer @param b The secnd integer @return the GCD of a and b */ short gcd(short a, short b) { short r = -1, temp; if (a > b) { temp = a; a = b; b = temp; } while (TRUE) { r = b % a; if (r == 0) break; b = a; a = r; } return a; } /** Clock fired event --<br> Works as follows:<br> 1) Output tuples from previous epochs<br> 2) Deterimine what queries fire in this epoch<br> 3) Collect samples from those queries<br> 4) Fill in the tuples in those queries<br> 5) Apply operators to those tuples<br> <p> While this is happening, results may arrive from other sensors nodes representing results from the last epoch. Those results need to be forwarded (if we're just selection), or stored (if we're aggregating) <p> Question: What to do if next time event goes off before this one is complete? Right now, this we'll simply ignore later events if this one hasn't finished processing <p> As of 10/5/2002, moved main body into a task.*/ event result_t Timer.fired() { post mainTask(); return SUCCESS; } task void mainTask() { dbg(DBG_USR1,"IN CLOCK \n"); //fflush(stdout); mTicksThisInterval--; if (mTicksThisInterval <= 0) { //numSenders is used to determine the backoff period between message sends //xmitSlots tracks the epoch duration of the shortest query in clock ticks //msgsThisInterval is the number of messages heard during the last NUM_TICKS_PER_INTERVAL clock ticks //idea here is that we want to backoff more if : // 1) there is more network traffic // 2) there is more time between epochs mNumSenders = ((((mMsgsThisInterval + 1) * 2) * mXmitSlots) >> 7); //(2^7 == 128 == NUM_TICKS_THIS_INTERVAL) mMsgsThisInterval = 0; mTicksThisInterval = NUM_TICKS_PER_INTERVAL; } //don't do anything if we're currently sending (shouldn't need this, but...) if(IS_SENDING_MESSAGE()) { call statusMessage("sending"); return; } sendWaitingMessages(); if (mSendQueryNextClock) { mSendQueryNextClock = FALSE; post sendQuery(); } //test to see if we're already sampling, in which case we better //not reinvoke sampling! if (IS_FETCHING_ATTRIBUTE()) { call statusMessage("fetching"); mFetchTries++; //so we can escape a blocked fetch if (mFetchTries < UDF_WAIT_LOOP) return; else UNSET_FETCHING_ATTRIBUTE(); } else if (IS_ROUTING_TUPLES()) { call statusMessage("routing"); return; } else if ( IS_DELIVERING_TUPLES()) { call statusMessage("delivering"); return; } else if (IS_AGGREGATING_RESULT()) { call statusMessage("aggregating"); return; } mFetchTries = 0; // TOS_SIGNAL_EVENT(TUPLE_ROUTER_NEW_EPOCH)(); mCurRouteQuery = NULL; //find the first query we need to deliver results for mCurExpr = -1; dbg(DBG_USR1,"POSTING TASK.");//fflush(stdout); post deliverTuplesTask(); } /* --------------------------------- Tuple Output Routines ---------------------------------*/ /** Walk through queries, finding ones that have gone off (timer reached 0), and where the tuples are complete. Output said tuples to the appropriate output buffers. <p> mCurRouteQuery contains the last query routed, or NULL if starting at the first query (it's not a parameter, since this task needs to be rescheduled as successive tuples are delivered) */ task void deliverTuplesTask() { bool success; bool didAgg = FALSE; bool pending = FALSE; // if (IS_SENDING_MESSAGE()) return; //wait til networking send is done... dbg(DBG_USR1,"IN DELIVER TUPLES TASK.\n");//fflush(stdout); SET_DELIVERING_TUPLES(); mCurRouteQuery = nextQueryToRoute(mCurRouteQuery); if (mCurRouteQuery != NULL) { ParsedQuery *pq = &(**mCurRouteQuery).q; Expr *e = nextExpr(pq); TinyDBError err = err_NoError; QueryResult qr; uint16_t size; //init success success = (TOS_LOCAL_ADDRESS == pq->queryRoot)? FALSE : TRUE; //don't deliver tuples for root call QueryResultIntf.initQueryResult(&qr); // stamp current epoch number qr.epoch = pq->currentEpoch; qr.qid = pq->qid; //scan the query, looking for an aggregate operator -- //if we find it, output all the tuples it knows about -- //otherwise, just output the tuple associated with the query while (e != NULL) { if (e->opType != kSEL) { //add all of the aggregate results to the query result data structure err = call addResults(&qr, pq, e); didAgg = TRUE; //break; } else { if (!e->success) success = FALSE; } e = nextExpr(pq); } //then output the query result if (didAgg && err == err_NoError && call QueryResultIntf.numRecords(&qr,pq) > 0) { //enqueue all the results from this aggregate //err = call RadioQueue.enqueue((const char *)&mResult, DATA_LENGTH-sizeof(DbMsgHdr)); mEnqResult = qr; err = call DBBuffer.enqueue(pq->bufferId, &mEnqResult, &pending, pq); //ignore result buffer busy items for now, since they just mean //we can't keep up with the sample rate the user is requested , but we //shouldn't abort if (err != err_ResultBufferBusy && err != err_NoError) call signalError(err); } //just a selection query -- enqueue appropriate results if (success && !didAgg) { mEnqResult = qr; call QueryResultIntf.fromTuple( &mEnqResult, pq , call ParsedQueryIntf.getTuplePtr(pq)); //err = call RadioQueue.enqueue((const char *)&mResult, DATA_LENGTH-sizeof(DbMsgHdr)); err = call DBBuffer.enqueue(pq->bufferId, &mEnqResult, &pending, pq); if (err != err_ResultBufferBusy && err != err_NoError) { call signalError(err); } } pq->clockCount = pq->clocksPerSample; //reschedule this query //one shot queries may have finished scanning their buffer if (pq->fromQid != kNO_QUERY && pq->epochDuration == kONE_SHOT) { uint8_t fromBuf; err = call DBBuffer.qidToBuffer(pq->fromQid, &fromBuf); //stop queries that have scanned the entire buffer err = call DBBuffer.size(fromBuf, &size); if (err != err_NoError || pq->curResult++ >= size) { //bool ok,prev; //prev = call Interrupt.disable(); //UNSET_DELIVERING_TUPLES(); pq->markedForDeletion = EPOCHS_TIL_DELETION; //we need to destroy this query asap //removeQuery(pq->qid, &ok); //SET_DELIVERING_TUPLES(); //if (prev) call Interrupt.enable(); } } //send tuples for next query if (!pending) post deliverTuplesTask(); mCurExpr = -1; //reset for next query } else { UNSET_DELIVERING_TUPLES(); //now that tuples from last epoch are delivered, start fetching //new tuples dbg(DBG_USR1,"FETCTHING TUPLES\n"); //fflush(stdout); startFetchingTuples(); } } /** Called from the main timer loop -- task that drains the internal message queue. Uses mOutputCount to track the time until the next message should be sent. */ void sendWaitingMessages() { if (mOutputCount > 0) { mOutputCount--; } if (mOutputCount <= 0) { TinyDBError err; DbMsgHdr *hdr; ParsedQuery *q; if (IS_SENDING_MESSAGE()) { //don't dequeue if we're already sending ... call signalError(err_MSF_SendWaitingBusy); return; } SET_SENDING_MESSAGE(); err = dequeueMessage(&mMsg); if (err == err_NoError) { getQuery(call QueryResultIntf.queryIdFromMsg((char *)mMsg.data), &q); call Leds.redToggle(); hdr = (DbMsgHdr *)mMsg.data; //hdr->xmitSlots = mNumSenders; if (q != NULL) hdr->timeRemaining = (unsigned char)(q->clockCount & 0x00FF); else hdr->timeRemaining = 0xFF; if (call Network.sendDataMessage(&mMsg) != err_NoError) { UNSET_SENDING_MESSAGE(); call signalError(err_MSF_SendWaiting); } //schedule the next result to deliver if (TOS_LOCAL_ADDRESS == q->queryRoot) { mOutputCount = 4; } else { if (mFixedComm) { mOutputCount = TOS_LOCAL_ADDRESS * 2; } else { mOutputCount = max(4,(((call Random.rand() & 0x7FFF) % ((mNumSenders >> 1)+1)) << 1)); } //mOutputCount = 16; } } else { UNSET_SENDING_MESSAGE(); if (err != err_NoMoreResults) call signalError(err); } } } /** Event that's signalled when a send is complete */ event result_t Network.outputDone(TOS_MsgPtr msg, uint8_t amId) { if (IS_SENDING_MESSAGE() ) { UNSET_SENDING_MESSAGE(); if (/*msg == &mQmsg &&*/ IS_SENDING_QUERY()) { mSendQueryNextClock = TRUE; } } return SUCCESS; } void startFetchingTuples() { QueryListHandle qlh = mQs; bool mustSample = FALSE; //update queries, determine if any needs to sample data this epoch while (qlh != NULL) { //reset queries that just restarted if ((**qlh).q.clocksPerSample > 0 && (**qlh).q.clockCount == (**qlh).q.clocksPerSample) { resetTupleState(&(**qlh).q); //clear it out call Network.endOfEpoch(); } if ((**qlh).q.clocksPerSample > 0 && --(**qlh).q.clockCount <= 0) { call Leds.yellowToggle(); if ((**qlh).q.markedForDeletion > 0) { if (--(**qlh).q.markedForDeletion == 0) { //delete the query bool success; removeQuery((**qlh).q.qid, &success); } (**qlh).q.clockCount = (**qlh).q.clocksPerSample; // just reschedule this query... } else { //only actually process local tuples if we're not the root. if ((**qlh).q.queryRoot != TOS_LOCAL_ADDRESS) mustSample = TRUE; (**qlh).q.currentEpoch++; } break; } qlh = (QueryListHandle)(**qlh).next; } if (mustSample) { fetchNextAttr(); } } void resetTupleState(ParsedQuery *q) { short i; Expr *e; //clear out this tuple call TupleIntf.tupleInit(q,call ParsedQueryIntf.getTuplePtr(q)); for (i = 0; i < q->numExprs; i++) { e = call ParsedQueryIntf.getExprPtr(q, i); call AggOperator.endOfEpoch(q, e); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -