📄 tuplerouterm.nc
字号:
bool IS_SENDING_QUERY() { return (mPendingMask & SENDING_QUERY_BIT) != 0; } void UNSET_SENDING_QUERY() { (mPendingMask &= ( SENDING_QUERY_BIT ^ 0xFFFF)); } void SET_SENDING_QUERY() { (mPendingMask |= SENDING_QUERY_BIT); } bool IS_IN_QUERY_MSG() { return (mPendingMask & IN_QUERY_MSG_BIT) != 0; } void UNSET_IS_IN_QUERY_MSG() { (mPendingMask &= ( IN_QUERY_MSG_BIT ^ 0xFFFF)); } void SET_IS_IN_QUERY_MSG() { (mPendingMask |= IN_QUERY_MSG_BIT); } bool IS_SETTING_SAMPLE_RATE() { return (mPendingMask & SETTING_SAMPLE_RATE_BIT) != 0; } void UNSET_SETTING_SAMPLE_RATE() { (mPendingMask &= ( SETTING_SAMPLE_RATE_BIT ^ 0xFFFF)); } void SET_SETTING_SAMPLE_RATE() { (mPendingMask |= SETTING_SAMPLE_RATE_BIT); } bool IS_SNOOZING() { return (mPendingMask & SNOOZING_BIT) != 0; } void UNSET_SNOOZING() { (mPendingMask &= ( SNOOZING_BIT ^ 0xFFFF)); } void SET_SNOOZING() { (mPendingMask |= SNOOZING_BIT); } bool IS_OPENING_WRITE_BUF() { return (mPendingMask & OPENING_WRITE_BUF_BIT) != 0; } void UNSET_OPENING_WRITE_BUF() { (mPendingMask &= ( OPENING_WRITE_BUF_BIT ^ 0xFFFF)); } void SET_OPENING_WRITE_BUF() { (mPendingMask |= OPENING_WRITE_BUF_BIT); } /* ----------------------------- Prototypes for Internal Routines ------------------------------ */ void continueQuery(Handle *memory); bool addQueryField(QueryMessagePtr qmsg); bool allocPendingQuery(MemoryCallback callback, Query *q); bool allocQuery(MemoryCallback callback, Query *q); void parsedCallback(Handle *memory); bool parseQuery(Query *q, ParsedQuery *pq); void parsedQuery(bool success); void continueParsing(result_t success); bool queryComplete(Query q); bool reallocQueryForTuple(MemoryCallback callback, QueryListHandle qlh); void resizedCallback(Handle *memory); void setSampleRate(); void speedUpSampling(); void slowDownSampling(); void finishedOpeningWriteBuffer(ParsedQuery *pq); void finishedOpeningReadBuffer(ParsedQuery *pq, uint8_t bufferId); void continueFromBufferFetch(TinyDBError err); void setRate(uint8_t qid, uint16_t epochDur); short gcd(short a, short b); bool fetchNextAttr(); TupleFieldDesc getNextQueryField(ParsedQuery **q); QueryListHandle nextQueryToRoute(QueryListHandle curQuery); bool routeToQuery(ParsedQuery *q, Tuple *t); Expr *nextExpr(ParsedQuery *q); bool getQuery(uint8_t qid, ParsedQuery **q); void startFetchingTuples(); void resetTupleState(ParsedQuery *q); void fillInAttrVal(SchemaErrorNo errorNo); void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId); TinyDBError dequeueMessage(TOS_Msg *msg); int chooseQueueVictim(const char *data, int len); task void removeQueryTask(); TinyDBError forwardQuery(QueryMessagePtr qmsg); void finishedBufferSetup(ParsedQuery *pq); void keepRouting(); void initConsts(); void decrementQueryCounter(); void failedOpeningWriteBuffer(ParsedQuery *pq); /* Routines to adjust the sample rate based on power consumption. */ void computeRates(uint32_t lifetimeHoursRem, uint32_t curVReading, uint32_t ujSampleCost, uint32_t ujAggCost, uint32_t numMsgs, uint32_t numSamples, uint32_t *epochDur, uint16_t *aggWinSize, uint16_t *vPerDay); void updateStatistics(uint16_t deltaV, uint16_t elapsedHours, uint16_t vPerDay); void continueSetSampleRate(); SchemaErrorNo setSampleRateForLifetime(uint8_t qid, uint16_t lifetimeHours); void doTimeSync(uint8_t timeSyncData[5], uint16_t clockCount); void checkTime(); void sendDummyQueryResult(uint8_t qid, uint8_t numFields, uint16_t curEpoch); TinyDBError sendTuple(ParsedQuery *pq, QueryResultPtr qr, bool *pending); result_t outputDone(TOS_MsgPtr msg); uint16_t max(uint16_t a, uint16_t b) { return a<b?b:a; } void startQueryAttrs(ParsedQuery *pq) { short i; for (i = 0; i < pq->numFields; i++) { if (call AttrUse.startAttr(pq->queryToSchemaFieldMap[i]) == FAIL) mNumAttrs--; // won't get a startAttrDone in this case } } void startAllQueryAttrs() { QueryListHandle curq; SET_STARTING_ATTRIBUTE(); // first add up total number of attributes for (curq = mQs; curq != NULL; curq = (QueryListHandle)(**curq).next) { if ((**curq).q.clocksPerSample > 0) //this query is ready to go mNumAttrs += (**curq).q.numFields; } for (curq = mQs; curq != NULL; curq = (QueryListHandle)(**curq).next) { if ((**curq).q.clocksPerSample > 0) //this query is ready to go startQueryAttrs(&((**curq).q)); } } // void statusMessage(char *m); /* Tasks */ task void deliverTuplesTask(); task void routeTask(); task void sendQuery(); task void fillInTask(); task void mainTask(); task void queryMsgTask();/* -----------------------------------------------------------------------------*//* --------------------------------- Functions ---------------------------------*//* -----------------------------------------------------------------------------*/ /** Intialize the tuple router */ command result_t StdControl.init() {#ifdef kLIFE_CMD ParamList params;#endif // TOSH_MAKE_BOOST_ENABLE_OUTPUT(); // port E pin 4 // TOSH_CLR_BOOST_ENABLE_PIN(); // set low mPendingMask = 0; mCycleToSend = 0; atomic { mQs = NULL; } mTail = NULL; mCurQuery = NULL; mNumBlocked = 0; // mOldInterval = 0; mOldRate = 0; mFetchTries = 0; //hangs in fetch sometimes -- retry count mTriedAllocWaiting = FALSE;#ifdef kQUERY_SHARING mTriedQueryRequest = FALSE;#endif mSendQueryNextClock = FALSE; mClockCount = 0; mAllocState = STATE_NOT_ALLOCING; mLastQuery = NULL; mIsRunning = FALSE; mRadioWaiting = FALSE; mSendingResult = FALSE; atomic { mMustTimestamp = TS_NO; } initConsts(); //set up constants for sample rate based on lifetime call ChildControl.init(); call NetControl.init(); call TimerControl.init();#ifdef kUART_DEBUGGER if (TOS_LOCAL_ADDRESS != 0) call UartDebuggerControl.init();#endif#ifdef kLIFE_CMD mLifetimeCommandPending = FALSE; setParamList(¶ms, 2, UINT8, UINT16); call SetLifetimeCommand.registerCommand("life", VOID, 0, ¶ms);#endif mStopped = TRUE; mSendFailed = 0; mCurSchedTime = 0; mLastDiff = 0; mLastHeard = 0; mStoppedQid = 0xFF;#ifdef HSN_ROUTING mHSNValue = 20; // XXX 20 is an arbitrary initial value#endif return SUCCESS; } command result_t StdControl.start() { mNumAttrs = 0;#ifdef HSN_ROUTING mNumMerges = 0;#endif mDeliverWait = 0; //reset xmission wait cycles every time through mWaitIsDummy = FALSE; UNSET_FETCHING_ATTRIBUTE(); UNSET_STARTING_ATTRIBUTE(); UNSET_ROUTING_TUPLES(); UNSET_AGGREGATING_RESULT(); //send networking update event if (!isRoot() || !IS_SNOOZING()) { call ChildControl.start(); call NetControl.start(); //if (mOldInterval != 0) //call setSimpleTimeInterval(mOldInterval);#if defined(PLATFORM_MICA2) || defined(PLATFORM_MICA2DOT) call PowerMgmtDisable();#endif startAllQueryAttrs(); }#ifdef HAS_ROUTECONTROL else call RouteControl.manualUpdate();#endif#ifdef qSNOOZE if (IS_SNOOZING()) { // UNSET_SNOOZING(); mStopped = FALSE; signal AbsoluteTimer.fired(); } else {#else mStopped = FALSE;#endif call TimerControl.start(); if (kMS_PER_CLOCK_EVENT > 64) call setSimpleTimeInterval(kMS_PER_CLOCK_EVENT/2); // mStopped = FALSE;#ifdef kUART_DEBUGGER if (TOS_LOCAL_ADDRESS != 0) call UartDebuggerControl.start();#endif //don't reinitialize this after snooze //call Leds.greenOn(); //call Leds.redOn(); // Mica2Dots only have red#ifdef qSNOOZE }#endif return SUCCESS; } command result_t StdControl.stop() { QueryListHandle qlh = mQs; mStopped = TRUE; call AbsoluteTimer.cancel(); //??? checkTime(); if (!isRoot()) { //mOldInterval = call getSimpleTimeInterval(); //call setSimpleTimeInterval(kSIMPLE_TIME_SLEEP_INTERVAL); #if defined(PLATFORM_MICA2) || defined(PLATFORM_MICA2DOT) call PowerMgmtEnable();#endif call Leds.set(0); //all off //call UartDebugger.writeLine("stop",4); call ChildControl.stop(); // must leave radio on if there is no query running, or if //we haven't heard from our parent for awhile if (mQs != NULL && !IS_SENDING_MESSAGE() /* ??? */ && mLastHeard <= kHEARD_THRESH) { call NetControl.stop(); } if (IS_SENDING_MESSAGE()) { //??? //UNSET_SENDING_MESSAGE(); outputDone(&mMsg); } if (IS_DELIVERING_TUPLES()) UNSET_DELIVERING_TUPLES(); } if (mLastHeard > kHEARD_THRESH) mLastHeard -= kHEARD_DEC; //call TimerControl.stop(); //don't stop the clock! //otherwise we'll stop when send is done // else // mRadioWaiting = TRUE;#ifdef qSNOOZE // make it so that all queries are at the same point when // we wake up while (qlh != NULL) { (**qlh).q.clockCount = WAKING_CLOCKS; qlh = (QueryListHandle)(**qlh).next; }#endif return SUCCESS; } /* --------------------------------- Query Handling ---------------------------------*/ /** Message indicating the arrival of (part of) a query */ event result_t Network.querySub(QueryMessagePtr qmsg) { ParsedQuery *q; bool oldField = TRUE; if (qmsg->msgType == DROP_TABLE_MSG) { if (IS_IN_QUERY_MSG()) return FAIL; SET_IS_IN_QUERY_MSG(); //call UartDebugger.writeLine("cleaning",8);#ifdef kMATCHBOX call DBBuffer.cleanupEEPROM();#endif if (qmsg->u.ttl-- > 0) forwardQuery(qmsg); UNSET_IS_IN_QUERY_MSG(); return FAIL; } else if (qmsg->msgType == RATE_MSG) { //is a request to change the rate of an existing query if (IS_IN_QUERY_MSG()) return SUCCESS; //ignore this! SET_IS_IN_QUERY_MSG(); //can't change the rate of a query we know nothing about! if (!getQuery(qmsg->qid, &q)) { goto done_rate; } if (qmsg->fwdNode == TOS_UART_ADDR) { forwardQuery(qmsg); } dbg(DBG_USR2, "changing rate to %d\n", qmsg->epochDuration); setRate(qmsg->qid, qmsg->epochDuration); done_rate: UNSET_IS_IN_QUERY_MSG(); return SUCCESS; } else if (qmsg->msgType == DEL_MSG) { //is a request to delete an existing query ParsedQuery *pq; bool isKnown; call Leds.yellowToggle(); if (IS_IN_QUERY_MSG() /*|| IS_REMOVING()*/) return SUCCESS; //ignore this! SET_IS_IN_QUERY_MSG(); //SET_REMOVING(); if (getQuery(qmsg->qid, &pq)) isKnown = TRUE; else isKnown = FALSE; //call Leds.redOn(); call Leds.greenOn(); call Leds.yellowOn(); //can't force -- might be in flight mQidToRemove = qmsg->qid; mForceRemove = FALSE; post removeQueryTask(); //only forward if we know about the query, or if we're the root //qmsg->u.ttl = (qmsg->u.ttl) - 1; if (isKnown) { forwardQuery(qmsg); } mStoppedQid = qmsg->qid; UNSET_IS_IN_QUERY_MSG(); return SUCCESS; } //otherwise, assume its an ADD_MSG, for now if (!IS_IN_QUERY_MSG()) { if (!getQuery(qmsg->qid, &q)) { //ignore if we already know about this query SET_IS_IN_QUERY_MSG(); mStoppedQid = 0xFF; //a new query, not one we just stopped //go ahead and time synchronize with the sender of this message doTimeSync(qmsg->timeSyncData, qmsg->clockCount); if (IS_READING_QUERY()) { if (qmsg->qid != (**mCurQuery).qid) { if (IS_SPACE_ALLOCED() || mTriedAllocWaiting) { //query is alloced, but heard about a new one //forget old one if (IS_SPACE_ALLOCED()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -