tuplerouterm.nc

来自「tinyos最新版」· NC 代码 · 共 2,130 行 · 第 1/5 页

NC
2,130
字号
    bool IS_AGGREGATING_RESULT() { return (mPendingMask & AGGREGATING_BIT) != 0; }  void UNSET_AGGREGATING_RESULT() { (mPendingMask &= ( AGGREGATING_BIT ^ 0xFFFF)); }  void SET_AGGREGATING_RESULT() { (mPendingMask |= AGGREGATING_BIT); }    bool IS_SENDING_QUERY() { return (mPendingMask & SENDING_QUERY_BIT) != 0; }  void UNSET_SENDING_QUERY() { (mPendingMask &= ( SENDING_QUERY_BIT ^ 0xFFFF)); }  void SET_SENDING_QUERY() { (mPendingMask |= SENDING_QUERY_BIT); }    bool IS_IN_QUERY_MSG() { return (mPendingMask & IN_QUERY_MSG_BIT) != 0; }  void UNSET_IS_IN_QUERY_MSG() { (mPendingMask &= ( IN_QUERY_MSG_BIT ^ 0xFFFF)); }  void SET_IS_IN_QUERY_MSG() { (mPendingMask |= IN_QUERY_MSG_BIT); }  bool IS_SETTING_SAMPLE_RATE() { return (mPendingMask & SETTING_SAMPLE_RATE_BIT) != 0; }  void UNSET_SETTING_SAMPLE_RATE() { (mPendingMask &= ( SETTING_SAMPLE_RATE_BIT ^ 0xFFFF)); }  void SET_SETTING_SAMPLE_RATE() { (mPendingMask |= SETTING_SAMPLE_RATE_BIT); } bool IS_SNOOZING() { return (mPendingMask & SNOOZING_BIT) != 0; }  void UNSET_SNOOZING() { (mPendingMask &= ( SNOOZING_BIT ^ 0xFFFF)); }  void SET_SNOOZING() { (mPendingMask |= SNOOZING_BIT); }  bool IS_OPENING_WRITE_BUF() { return (mPendingMask & OPENING_WRITE_BUF_BIT) != 0; }  void UNSET_OPENING_WRITE_BUF() { (mPendingMask &= ( OPENING_WRITE_BUF_BIT ^ 0xFFFF)); }  void SET_OPENING_WRITE_BUF() { (mPendingMask |= OPENING_WRITE_BUF_BIT); }  /* ----------------------------- Prototypes for Internal Routines ------------------------------ */  void continueQuery(Handle *memory);  bool addQueryField(QueryMessagePtr qmsg);  bool allocPendingQuery(MemoryCallback callback, Query *q);  bool allocQuery(MemoryCallback callback, Query *q);  void parsedCallback(Handle *memory);  bool parseQuery(Query *q, ParsedQuery *pq);  void parsedQuery(bool success);  void continueParsing(result_t success);  bool queryComplete(Query q);  bool reallocQueryForTuple(MemoryCallback callback, QueryListHandle qlh);  void resizedCallback(Handle *memory);  void setSampleRate();  void speedUpSampling();  void slowDownSampling();  void finishedOpeningWriteBuffer(ParsedQuery *pq);  void finishedOpeningReadBuffer(ParsedQuery *pq, uint8_t bufferId);  void continueFromBufferFetch(TinyDBError err);  void setRate(uint8_t qid, uint16_t epochDur);  short gcd(short a, short b);  bool fetchNextAttr();  TupleFieldDesc getNextQueryField(ParsedQuery **q);  QueryListHandle nextQueryToRoute(QueryListHandle curQuery);  bool routeToQuery(ParsedQuery *q, Tuple *t);  Expr *nextExpr(ParsedQuery *q);  bool getQuery(uint8_t qid, ParsedQuery **q);  void startFetchingTuples();  void resetTupleState(ParsedQuery *q);  void fillInAttrVal(SchemaErrorNo errorNo);  void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId);  TinyDBError dequeueMessage(TOS_Msg *msg);  int chooseQueueVictim(const char *data, int len);  void sendWaitingMessages();  task void removeQueryTask();  TinyDBError forwardQuery(QueryMessagePtr qmsg);  void finishedBufferSetup(ParsedQuery *pq);  void keepRouting();  void initConsts();  void decrementQueryCounter();  void failedOpeningWriteBuffer(ParsedQuery *pq);  /* Routines to adjust the sample rate based on     power consumption.  */  void computeRates(uint32_t lifetimeHoursRem,		    uint32_t curVReading,		    uint32_t ujSampleCost,		    uint32_t ujAggCost,		    uint32_t numMsgs,		    uint32_t numSamples,		    uint32_t *epochDur,		    uint16_t *aggWinSize,		    uint16_t *vPerDay);  void updateStatistics(uint16_t deltaV, uint16_t elapsedHours, uint16_t vPerDay);  void continueSetSampleRate();  SchemaErrorNo setSampleRateForLifetime(uint8_t qid, uint16_t lifetimeHours);    uint16_t max(uint16_t a, uint16_t b) {    return a<b?b:a;  }  void startQueryAttrs(ParsedQuery *pq)  {  	short i;	for (i = 0; i < pq->numFields; i++)	{		if (call AttrUse.startAttr(pq->queryToSchemaFieldMap[i]) == FAIL)			mNumAttrs--; // won't get a startAttrDone in this case	}  }  #define isRoot() (TOS_LOCAL_ADDRESS == 0)  void startAllQueryAttrs()  {	QueryListHandle curq;	SET_STARTING_ATTRIBUTE();	// first add up total number of attributes	for (curq = mQs; curq != NULL; curq = (QueryListHandle)(**curq).next)	{        if ((**curq).q.clocksPerSample > 0) //this query is ready to go			mNumAttrs += (**curq).q.numFields;	}	for (curq = mQs; curq != NULL; curq = (QueryListHandle)(**curq).next)	{        if ((**curq).q.clocksPerSample > 0) //this query is ready to go			startQueryAttrs(&((**curq).q));	}  }  //  void statusMessage(char *m);  /* Tasks */  task void deliverTuplesTask();  task void routeTask();  task void sendQuery();  task void fillInTask();  task void mainTask();/* -----------------------------------------------------------------------------*//* --------------------------------- Functions ---------------------------------*//* -----------------------------------------------------------------------------*/  /** Intialize the tuple router */  command result_t StdControl.init() {#ifdef kLIFE_CMD    ParamList params;#endif    // TOSH_MAKE_BOOST_ENABLE_OUTPUT();      // port E pin 4    // TOSH_CLR_BOOST_ENABLE_PIN();  // set low    mPendingMask = 0;    mCycleToSend = 0;    mQs = NULL;    mTail = NULL;    mCurQuery = NULL;    mNumBlocked = 0;    mOldRate = 0;    mFetchTries = 0; //hangs in fetch sometimes -- retry count        mTriedAllocWaiting = FALSE;#ifdef kQUERY_SHARING    mTriedQueryRequest = FALSE;#endif    mFixedComm = FALSE;        mSendQueryNextClock = FALSE;        mSending = FALSE;        mClockCount = 0;        mAllocState = STATE_NOT_ALLOCING;    mLastQuery = NULL;    mIsRunning = FALSE;    mRadioWaiting = FALSE;    mSendingResult = FALSE;    mMustTimestamp = TS_NO;    initConsts(); //set up constants for sample rate based on lifetime        call ChildControl.init();    call NetControl.init();    call TimerControl.init();#ifdef kUART_DEBUGGER    if (TOS_LOCAL_ADDRESS != 0)      call UartDebuggerControl.init();#endif#ifdef kLIFE_CMD    mLifetimeCommandPending = FALSE;    setParamList(&params, 2, UINT8, UINT16);    call SetLifetimeCommand.registerCommand("life", VOID, 0, &params);#endif    mStopped = TRUE;    mSendFailed = 0;    mCurSchedTime = 0;    return SUCCESS;  }  command result_t StdControl.start() {    mNumAttrs = 0;    call ChildControl.start();    call NetControl.start();    startAllQueryAttrs();    #ifdef qSNOOZE       if (IS_SNOOZING() && !isRoot()) {	//      UNSET_SNOOZING();	mStopped = FALSE;		signal AbsoluteTimer.fired();    } else {#else      mStopped = FALSE;#endif      call TimerControl.start();      // mStopped = FALSE;#ifdef kUART_DEBUGGER    if (TOS_LOCAL_ADDRESS != 0)      call UartDebuggerControl.start();#endif      //don't reinitialize this after snooze        //call Leds.greenOn();      call Leds.redOn(); // Mica2Dots only have red#ifdef qSNOOZE    }#endif    return SUCCESS;  }  command result_t StdControl.stop() {    QueryListHandle qlh = mQs;        mStopped = TRUE;    call AbsoluteTimer.cancel(); //???          //call Leds.set(0); //all off    //call UartDebugger.writeLine("stop",4);    call ChildControl.stop();    // must leave radio on if there is no query running    if (mQs != NULL && !IS_SENDING_MESSAGE() /* ??? */) {      call NetControl.stop();    }    if (IS_SENDING_MESSAGE()) //???      UNSET_SENDING_MESSAGE();    if (IS_DELIVERING_TUPLES())      UNSET_DELIVERING_TUPLES();    //call TimerControl.stop(); //don't stop the clock!    //otherwise we'll stop when send is done        //    else    //      mRadioWaiting = TRUE;#ifdef qSNOOZE    // make it so that all queries are at the same point when    // we wake up    while (qlh != NULL && !isRoot()) {      (**qlh).q.clockCount = WAKING_CLOCKS;      qlh = (QueryListHandle)(**qlh).next;    }#endif    return SUCCESS;  }  /* --------------------------------- Query Handling ---------------------------------*/  /** Message indicating the arrival of (part of) a query */  event result_t Network.querySub(QueryMessagePtr qmsg) {    ParsedQuery *q;    bool oldField = TRUE;    bool isRoot;    if (qmsg->msgType == DROP_TABLE_MSG) {      if (IS_IN_QUERY_MSG())	return FAIL;      SET_IS_IN_QUERY_MSG();      //call UartDebugger.writeLine("cleaning",8);#ifdef kMATCHBOX      call DBBuffer.cleanupEEPROM();#endif      if (qmsg->u.ttl-- > 0)	forwardQuery(qmsg);      UNSET_IS_IN_QUERY_MSG();      return FAIL;    } else    if (qmsg->msgType == RATE_MSG) {    //is a request to change the rate of an existing query      if (IS_IN_QUERY_MSG())	return SUCCESS; //ignore this!      SET_IS_IN_QUERY_MSG();      //can't change the rate of a query we know nothing about!      if (!getQuery(qmsg->qid, &q)) {	goto done_rate;      }      isRoot = TOS_LOCAL_ADDRESS == q->queryRoot;      if (isRoot) {	forwardQuery(qmsg);      }      dbg(DBG_USR2, "changing rate to %d\n", qmsg->epochDuration);      setRate(qmsg->qid, qmsg->epochDuration);    done_rate:      UNSET_IS_IN_QUERY_MSG();      return SUCCESS;     } else if (qmsg->msgType == DEL_MSG) {    //is a request to delete an existing query      ParsedQuery *pq;	  bool isKnown;      call Leds.yellowToggle();            if (IS_IN_QUERY_MSG())	return SUCCESS; //ignore this!            SET_IS_IN_QUERY_MSG();      if (getQuery(qmsg->qid, &pq))		isKnown = TRUE;      else		isKnown = FALSE;      call Leds.redOn();      call Leds.greenOn();      call Leds.yellowOn();      //can't force -- might be in flight      mQidToRemove = qmsg->qid;      mForceRemove = FALSE;      post removeQueryTask();      //only forward if we know about the query, or if we're the root      //qmsg->u.ttl = (qmsg->u.ttl) - 1;      if (isKnown) {	  forwardQuery(qmsg);      }      UNSET_IS_IN_QUERY_MSG();      return SUCCESS;    }    //otherwise, assume its an ADD_MSG, for now    if (!IS_IN_QUERY_MSG()) {      if (!getQuery(qmsg->qid, &q)) { //ignore if we already know about this query	SET_IS_IN_QUERY_MSG();	//go ahead and time synchronize with the sender of this message	//doTimeSync(qmsg->timeSyncData);	if (IS_READING_QUERY()) {	  if (qmsg->qid != (**mCurQuery).qid) {	    if (IS_SPACE_ALLOCED() || mTriedAllocWaiting) {	      //query is alloced, but heard about a new one	      //forget old one	      if (IS_SPACE_ALLOCED()) call MemAlloc.free((Handle)mCurQuery);	      UNSET_SPACE_ALLOCED();	      UNSET_READING_QUERY();	    } else {	      mTriedAllocWaiting = TRUE;	      UNSET_IS_IN_QUERY_MSG();	      return SUCCESS; //waiting for query to be alloced -- dont interrupt	    }	  } else if (! IS_SPACE_ALLOCED()) {	    UNSET_IS_IN_QUERY_MSG();	    return SUCCESS; //failure -- space not alloced for this query yet!	  }  else {  	    oldField = addQueryField(qmsg);	  }		  //go ahead and forward this guy on, as long as it's new, or we're the root	  if (!oldField || TOS_LOCAL_ADDRESS == qmsg->queryRoot)	    forwardQuery(qmsg);	}	//note that we can fall through from previous clause	if (!IS_READING_QUERY() /*&& !IS_SENDING_MESSAGE()*/) {		  Query pq;	  QueryMessagePtr qmsgCopy = call Network.getQueryPayLoad(&mMsg);			  SET_READING_QUERY();	  UNSET_SPACE_ALLOCED();	  mTriedAllocWaiting = FALSE;	  pq.qid = qmsg->qid;	  pq.numFields=qmsg->numFields;	  pq.numExprs=qmsg->numExprs;		  *qmsgCopy = *qmsg; //save a copy	  //allocate space for query	  if (!allocPendingQuery(&continueQuery, &pq)) {	    UNSET_READING_QUERY(); //barf!	  }	}      }  else if (TOS_LOCAL_ADDRESS == qmsg->queryRoot)  //forward on if we're the root		forwardQuery(qmsg);      UNSET_IS_IN_QUERY_MSG();    }    return SUCCESS;  }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?