tuplerouterm.nc
来自「tinyos最新版」· NC 代码 · 共 2,130 行 · 第 1/5 页
NC
2,130 行
bool IS_AGGREGATING_RESULT() { return (mPendingMask & AGGREGATING_BIT) != 0; } void UNSET_AGGREGATING_RESULT() { (mPendingMask &= ( AGGREGATING_BIT ^ 0xFFFF)); } void SET_AGGREGATING_RESULT() { (mPendingMask |= AGGREGATING_BIT); } bool IS_SENDING_QUERY() { return (mPendingMask & SENDING_QUERY_BIT) != 0; } void UNSET_SENDING_QUERY() { (mPendingMask &= ( SENDING_QUERY_BIT ^ 0xFFFF)); } void SET_SENDING_QUERY() { (mPendingMask |= SENDING_QUERY_BIT); } bool IS_IN_QUERY_MSG() { return (mPendingMask & IN_QUERY_MSG_BIT) != 0; } void UNSET_IS_IN_QUERY_MSG() { (mPendingMask &= ( IN_QUERY_MSG_BIT ^ 0xFFFF)); } void SET_IS_IN_QUERY_MSG() { (mPendingMask |= IN_QUERY_MSG_BIT); } bool IS_SETTING_SAMPLE_RATE() { return (mPendingMask & SETTING_SAMPLE_RATE_BIT) != 0; } void UNSET_SETTING_SAMPLE_RATE() { (mPendingMask &= ( SETTING_SAMPLE_RATE_BIT ^ 0xFFFF)); } void SET_SETTING_SAMPLE_RATE() { (mPendingMask |= SETTING_SAMPLE_RATE_BIT); } bool IS_SNOOZING() { return (mPendingMask & SNOOZING_BIT) != 0; } void UNSET_SNOOZING() { (mPendingMask &= ( SNOOZING_BIT ^ 0xFFFF)); } void SET_SNOOZING() { (mPendingMask |= SNOOZING_BIT); } bool IS_OPENING_WRITE_BUF() { return (mPendingMask & OPENING_WRITE_BUF_BIT) != 0; } void UNSET_OPENING_WRITE_BUF() { (mPendingMask &= ( OPENING_WRITE_BUF_BIT ^ 0xFFFF)); } void SET_OPENING_WRITE_BUF() { (mPendingMask |= OPENING_WRITE_BUF_BIT); } /* ----------------------------- Prototypes for Internal Routines ------------------------------ */ void continueQuery(Handle *memory); bool addQueryField(QueryMessagePtr qmsg); bool allocPendingQuery(MemoryCallback callback, Query *q); bool allocQuery(MemoryCallback callback, Query *q); void parsedCallback(Handle *memory); bool parseQuery(Query *q, ParsedQuery *pq); void parsedQuery(bool success); void continueParsing(result_t success); bool queryComplete(Query q); bool reallocQueryForTuple(MemoryCallback callback, QueryListHandle qlh); void resizedCallback(Handle *memory); void setSampleRate(); void speedUpSampling(); void slowDownSampling(); void finishedOpeningWriteBuffer(ParsedQuery *pq); void finishedOpeningReadBuffer(ParsedQuery *pq, uint8_t bufferId); void continueFromBufferFetch(TinyDBError err); void setRate(uint8_t qid, uint16_t epochDur); short gcd(short a, short b); bool fetchNextAttr(); TupleFieldDesc getNextQueryField(ParsedQuery **q); QueryListHandle nextQueryToRoute(QueryListHandle curQuery); bool routeToQuery(ParsedQuery *q, Tuple *t); Expr *nextExpr(ParsedQuery *q); bool getQuery(uint8_t qid, ParsedQuery **q); void startFetchingTuples(); void resetTupleState(ParsedQuery *q); void fillInAttrVal(SchemaErrorNo errorNo); void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId); TinyDBError dequeueMessage(TOS_Msg *msg); int chooseQueueVictim(const char *data, int len); void sendWaitingMessages(); task void removeQueryTask(); TinyDBError forwardQuery(QueryMessagePtr qmsg); void finishedBufferSetup(ParsedQuery *pq); void keepRouting(); void initConsts(); void decrementQueryCounter(); void failedOpeningWriteBuffer(ParsedQuery *pq); /* Routines to adjust the sample rate based on power consumption. */ void computeRates(uint32_t lifetimeHoursRem, uint32_t curVReading, uint32_t ujSampleCost, uint32_t ujAggCost, uint32_t numMsgs, uint32_t numSamples, uint32_t *epochDur, uint16_t *aggWinSize, uint16_t *vPerDay); void updateStatistics(uint16_t deltaV, uint16_t elapsedHours, uint16_t vPerDay); void continueSetSampleRate(); SchemaErrorNo setSampleRateForLifetime(uint8_t qid, uint16_t lifetimeHours); uint16_t max(uint16_t a, uint16_t b) { return a<b?b:a; } void startQueryAttrs(ParsedQuery *pq) { short i; for (i = 0; i < pq->numFields; i++) { if (call AttrUse.startAttr(pq->queryToSchemaFieldMap[i]) == FAIL) mNumAttrs--; // won't get a startAttrDone in this case } } #define isRoot() (TOS_LOCAL_ADDRESS == 0) void startAllQueryAttrs() { QueryListHandle curq; SET_STARTING_ATTRIBUTE(); // first add up total number of attributes for (curq = mQs; curq != NULL; curq = (QueryListHandle)(**curq).next) { if ((**curq).q.clocksPerSample > 0) //this query is ready to go mNumAttrs += (**curq).q.numFields; } for (curq = mQs; curq != NULL; curq = (QueryListHandle)(**curq).next) { if ((**curq).q.clocksPerSample > 0) //this query is ready to go startQueryAttrs(&((**curq).q)); } } // void statusMessage(char *m); /* Tasks */ task void deliverTuplesTask(); task void routeTask(); task void sendQuery(); task void fillInTask(); task void mainTask();/* -----------------------------------------------------------------------------*//* --------------------------------- Functions ---------------------------------*//* -----------------------------------------------------------------------------*/ /** Intialize the tuple router */ command result_t StdControl.init() {#ifdef kLIFE_CMD ParamList params;#endif // TOSH_MAKE_BOOST_ENABLE_OUTPUT(); // port E pin 4 // TOSH_CLR_BOOST_ENABLE_PIN(); // set low mPendingMask = 0; mCycleToSend = 0; mQs = NULL; mTail = NULL; mCurQuery = NULL; mNumBlocked = 0; mOldRate = 0; mFetchTries = 0; //hangs in fetch sometimes -- retry count mTriedAllocWaiting = FALSE;#ifdef kQUERY_SHARING mTriedQueryRequest = FALSE;#endif mFixedComm = FALSE; mSendQueryNextClock = FALSE; mSending = FALSE; mClockCount = 0; mAllocState = STATE_NOT_ALLOCING; mLastQuery = NULL; mIsRunning = FALSE; mRadioWaiting = FALSE; mSendingResult = FALSE; mMustTimestamp = TS_NO; initConsts(); //set up constants for sample rate based on lifetime call ChildControl.init(); call NetControl.init(); call TimerControl.init();#ifdef kUART_DEBUGGER if (TOS_LOCAL_ADDRESS != 0) call UartDebuggerControl.init();#endif#ifdef kLIFE_CMD mLifetimeCommandPending = FALSE; setParamList(¶ms, 2, UINT8, UINT16); call SetLifetimeCommand.registerCommand("life", VOID, 0, ¶ms);#endif mStopped = TRUE; mSendFailed = 0; mCurSchedTime = 0; return SUCCESS; } command result_t StdControl.start() { mNumAttrs = 0; call ChildControl.start(); call NetControl.start(); startAllQueryAttrs(); #ifdef qSNOOZE if (IS_SNOOZING() && !isRoot()) { // UNSET_SNOOZING(); mStopped = FALSE; signal AbsoluteTimer.fired(); } else {#else mStopped = FALSE;#endif call TimerControl.start(); // mStopped = FALSE;#ifdef kUART_DEBUGGER if (TOS_LOCAL_ADDRESS != 0) call UartDebuggerControl.start();#endif //don't reinitialize this after snooze //call Leds.greenOn(); call Leds.redOn(); // Mica2Dots only have red#ifdef qSNOOZE }#endif return SUCCESS; } command result_t StdControl.stop() { QueryListHandle qlh = mQs; mStopped = TRUE; call AbsoluteTimer.cancel(); //??? //call Leds.set(0); //all off //call UartDebugger.writeLine("stop",4); call ChildControl.stop(); // must leave radio on if there is no query running if (mQs != NULL && !IS_SENDING_MESSAGE() /* ??? */) { call NetControl.stop(); } if (IS_SENDING_MESSAGE()) //??? UNSET_SENDING_MESSAGE(); if (IS_DELIVERING_TUPLES()) UNSET_DELIVERING_TUPLES(); //call TimerControl.stop(); //don't stop the clock! //otherwise we'll stop when send is done // else // mRadioWaiting = TRUE;#ifdef qSNOOZE // make it so that all queries are at the same point when // we wake up while (qlh != NULL && !isRoot()) { (**qlh).q.clockCount = WAKING_CLOCKS; qlh = (QueryListHandle)(**qlh).next; }#endif return SUCCESS; } /* --------------------------------- Query Handling ---------------------------------*/ /** Message indicating the arrival of (part of) a query */ event result_t Network.querySub(QueryMessagePtr qmsg) { ParsedQuery *q; bool oldField = TRUE; bool isRoot; if (qmsg->msgType == DROP_TABLE_MSG) { if (IS_IN_QUERY_MSG()) return FAIL; SET_IS_IN_QUERY_MSG(); //call UartDebugger.writeLine("cleaning",8);#ifdef kMATCHBOX call DBBuffer.cleanupEEPROM();#endif if (qmsg->u.ttl-- > 0) forwardQuery(qmsg); UNSET_IS_IN_QUERY_MSG(); return FAIL; } else if (qmsg->msgType == RATE_MSG) { //is a request to change the rate of an existing query if (IS_IN_QUERY_MSG()) return SUCCESS; //ignore this! SET_IS_IN_QUERY_MSG(); //can't change the rate of a query we know nothing about! if (!getQuery(qmsg->qid, &q)) { goto done_rate; } isRoot = TOS_LOCAL_ADDRESS == q->queryRoot; if (isRoot) { forwardQuery(qmsg); } dbg(DBG_USR2, "changing rate to %d\n", qmsg->epochDuration); setRate(qmsg->qid, qmsg->epochDuration); done_rate: UNSET_IS_IN_QUERY_MSG(); return SUCCESS; } else if (qmsg->msgType == DEL_MSG) { //is a request to delete an existing query ParsedQuery *pq; bool isKnown; call Leds.yellowToggle(); if (IS_IN_QUERY_MSG()) return SUCCESS; //ignore this! SET_IS_IN_QUERY_MSG(); if (getQuery(qmsg->qid, &pq)) isKnown = TRUE; else isKnown = FALSE; call Leds.redOn(); call Leds.greenOn(); call Leds.yellowOn(); //can't force -- might be in flight mQidToRemove = qmsg->qid; mForceRemove = FALSE; post removeQueryTask(); //only forward if we know about the query, or if we're the root //qmsg->u.ttl = (qmsg->u.ttl) - 1; if (isKnown) { forwardQuery(qmsg); } UNSET_IS_IN_QUERY_MSG(); return SUCCESS; } //otherwise, assume its an ADD_MSG, for now if (!IS_IN_QUERY_MSG()) { if (!getQuery(qmsg->qid, &q)) { //ignore if we already know about this query SET_IS_IN_QUERY_MSG(); //go ahead and time synchronize with the sender of this message //doTimeSync(qmsg->timeSyncData); if (IS_READING_QUERY()) { if (qmsg->qid != (**mCurQuery).qid) { if (IS_SPACE_ALLOCED() || mTriedAllocWaiting) { //query is alloced, but heard about a new one //forget old one if (IS_SPACE_ALLOCED()) call MemAlloc.free((Handle)mCurQuery); UNSET_SPACE_ALLOCED(); UNSET_READING_QUERY(); } else { mTriedAllocWaiting = TRUE; UNSET_IS_IN_QUERY_MSG(); return SUCCESS; //waiting for query to be alloced -- dont interrupt } } else if (! IS_SPACE_ALLOCED()) { UNSET_IS_IN_QUERY_MSG(); return SUCCESS; //failure -- space not alloced for this query yet! } else { oldField = addQueryField(qmsg); } //go ahead and forward this guy on, as long as it's new, or we're the root if (!oldField || TOS_LOCAL_ADDRESS == qmsg->queryRoot) forwardQuery(qmsg); } //note that we can fall through from previous clause if (!IS_READING_QUERY() /*&& !IS_SENDING_MESSAGE()*/) { Query pq; QueryMessagePtr qmsgCopy = call Network.getQueryPayLoad(&mMsg); SET_READING_QUERY(); UNSET_SPACE_ALLOCED(); mTriedAllocWaiting = FALSE; pq.qid = qmsg->qid; pq.numFields=qmsg->numFields; pq.numExprs=qmsg->numExprs; *qmsgCopy = *qmsg; //save a copy //allocate space for query if (!allocPendingQuery(&continueQuery, &pq)) { UNSET_READING_QUERY(); //barf! } } } else if (TOS_LOCAL_ADDRESS == qmsg->queryRoot) //forward on if we're the root forwardQuery(qmsg); UNSET_IS_IN_QUERY_MSG(); } return SUCCESS; }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?