⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tuplerouterm.nc

📁 用于传感器网络的节点操作系统 TinyOS 结构设计非常有意思
💻 NC
📖 第 1 页 / 共 5 页
字号:
    //rate is now the number of milliseconds between clock ticks    //need to set the clock appropriately    /*    if (rate < 255) { */    /*      TOS_CALL_COMMAND(CLOCK_INIT)(rate, ONE_MS_CLOCK_CONST); */    /*    } else { */    /*      rate >>= 3; */    /*      rate += 1; */    /*      rate &= 0x00FF; */    /*      TOS_CALL_COMMAND(CLOCK_INIT)(rate, EIGHT_MS_CLOCK_CONST); */    /*    } */    if (rate != mOldRate) { //restart the clock if rate changed      mOldRate = rate;      //WARNING: is this needed?      //      call Timer.init();      //stop timer 0      //make this a critical section too -- if a timer event goes off while this is happening, who knows what that means?      prevInt = call Interrupt.disable();      call Timer.stop();      if (prevInt) call Interrupt.enable();      //restart it at the new rate      //TOS_CALL_COMMAND(TUPLE_ROUTER_TIMER_START)(0,0,32); //timer for outputting results      //TOS_CALL_COMMAND(TUPLE_ROUTER_TIMER_START)(1,0,rate); //timer for outputting results            call Timer.start(TIMER_REPEAT, rate);    }    computeOutputRate();  }  /** Determine how many communication slots there are in the   shortest duration query -- this will determine how long we can    afford to maximally wait before sending a result.  */  void computeOutputRate() {    QueryListHandle qlh = mQs;    short minSlots = 0x7FFF;        while (qlh != NULL) {      if (minSlots > (**qlh).q.clocksPerSample)	minSlots = (**qlh).q.clocksPerSample;      qlh = (QueryListHandle)(**qlh).next;    }        mXmitSlots = minSlots;      }  /** Find the GCD of two non-negative integers      @param a The first integer      @param b The secnd integer      @return the GCD of a and b  */  short gcd(short a, short b) {    short r = -1, temp;    if (a > b) {      temp = a;      a = b;      b = temp;    }      while (TRUE) {      r = b % a;      if (r == 0) break;      b = a;      a = r;    }    return a;  }  /** Clock fired event --<br>   Works as follows:<br>   1) Output tuples from previous epochs<br>   2) Deterimine what queries fire in this epoch<br>   3) Collect samples from those queries<br>   4) Fill in the tuples in those queries<br>   5) Apply operators to those tuples<br>   <p>   While this is happening, results may arrive from other sensors   nodes representing results from the last epoch.  Those results need   to be forwarded (if we're just selection), or stored (if we're aggregating)   <p>   Question:  What to do if next time event goes off before this one is   complete?  Right now, this we'll simply ignore later events if this   one hasn't finished processing   <p>   As of 10/5/2002, moved main body into a task.*/  event result_t Timer.fired() {    post mainTask();    return SUCCESS;  }  task void mainTask() {    dbg(DBG_USR1,"IN CLOCK \n"); //fflush(stdout);    mTicksThisInterval--;    if (mTicksThisInterval <= 0) {      //numSenders is used to determine the backoff period between message sends      //xmitSlots tracks the epoch duration of the shortest query in clock ticks      //msgsThisInterval is the number of messages heard during the last NUM_TICKS_PER_INTERVAL clock ticks      //idea here is that we want to backoff more if :      // 1) there is more network traffic      // 2) there is more time between epochs      mNumSenders = ((((mMsgsThisInterval + 1) * 2) * mXmitSlots) >> 7); //(2^7 == 128 == NUM_TICKS_THIS_INTERVAL)      mMsgsThisInterval = 0;      mTicksThisInterval = NUM_TICKS_PER_INTERVAL;    }    //don't do anything if we're currently sending (shouldn't need this, but...)    if(IS_SENDING_MESSAGE())  {        call statusMessage("sending");        return;    }    sendWaitingMessages();        if (mSendQueryNextClock) {      mSendQueryNextClock = FALSE;      post sendQuery();    }    //test to see if we're already sampling, in which case we better    //not reinvoke sampling!    if (IS_FETCHING_ATTRIBUTE()) {      call statusMessage("fetching");      mFetchTries++;      //so we can escape a blocked fetch      if (mFetchTries < UDF_WAIT_LOOP)	return;      else	UNSET_FETCHING_ATTRIBUTE();    } else if (IS_ROUTING_TUPLES()) {      call statusMessage("routing");      return;    } else if ( IS_DELIVERING_TUPLES()) {       call statusMessage("delivering");      return;    }  else if (IS_AGGREGATING_RESULT()) {      call statusMessage("aggregating");      return;    }    mFetchTries = 0;    //  TOS_SIGNAL_EVENT(TUPLE_ROUTER_NEW_EPOCH)();    mCurRouteQuery = NULL; //find the first query we need to deliver results for    mCurExpr = -1;    dbg(DBG_USR1,"POSTING TASK.");//fflush(stdout);    post deliverTuplesTask();  }  /* --------------------------------- Tuple Output Routines ---------------------------------*/  /** Walk through queries, finding ones that have gone off (timer reached 0), and      where the tuples are complete.  Output said tuples to the appropriate      output buffers.      <p>      mCurRouteQuery contains the last query routed, or NULL if starting at      the first query (it's not a parameter, since this task needs to be rescheduled      as successive tuples are delivered)  */  task void deliverTuplesTask() {    bool success;    bool didAgg = FALSE;    bool pending = FALSE;      // if (IS_SENDING_MESSAGE()) return; //wait til networking send is done...    dbg(DBG_USR1,"IN DELIVER TUPLES TASK.\n");//fflush(stdout);    SET_DELIVERING_TUPLES();        mCurRouteQuery = nextQueryToRoute(mCurRouteQuery);     if (mCurRouteQuery != NULL) {      ParsedQuery *pq = &(**mCurRouteQuery).q;      Expr *e = nextExpr(pq);      TinyDBError err = err_NoError;      QueryResult qr;      uint16_t size;      //init success      success = (TOS_LOCAL_ADDRESS == pq->queryRoot)? FALSE : TRUE; //don't deliver tuples for root      call QueryResultIntf.initQueryResult(&qr);      // stamp current epoch number      qr.epoch = pq->currentEpoch;      qr.qid = pq->qid;      //scan the query, looking for an aggregate operator --      //if we find it, output all the tuples it knows about --      //otherwise, just output the tuple associated with the query      while (e != NULL) {	if (e->opType != kSEL) {	  //add all of the aggregate results to the query result data structure	  err = call addResults(&qr, pq, e);	  didAgg = TRUE;	  //break;	} else {	  if (!e->success) success = FALSE;	}	e = nextExpr(pq);      }      //then output the query result      if (didAgg && err == err_NoError && call QueryResultIntf.numRecords(&qr,pq) > 0) {	//enqueue all the results from this aggregate	//err = call RadioQueue.enqueue((const char *)&mResult, DATA_LENGTH-sizeof(DbMsgHdr));	mEnqResult = qr;	err = call DBBuffer.enqueue(pq->bufferId, &mEnqResult, &pending, pq);	//ignore result buffer busy items for now, since they just mean	//we can't keep up with the sample rate the user is requested , but we 	//shouldn't abort	if (err != err_ResultBufferBusy && err !=  err_NoError) 	  call signalError(err);      }      //just a selection query -- enqueue appropriate results      if (success && !didAgg) {	mEnqResult = qr;	call QueryResultIntf.fromTuple( &mEnqResult, pq , call ParsedQueryIntf.getTuplePtr(pq));	//err = call RadioQueue.enqueue((const char *)&mResult, DATA_LENGTH-sizeof(DbMsgHdr));		err = call DBBuffer.enqueue(pq->bufferId, &mEnqResult, &pending, pq);	if (err != err_ResultBufferBusy && err !=  err_NoError) {	  call signalError(err);	}      }      pq->clockCount = pq->clocksPerSample; //reschedule this query      //one shot queries may have finished scanning their buffer      if (pq->fromQid != kNO_QUERY && pq->epochDuration == kONE_SHOT) {	uint8_t fromBuf;	err = call DBBuffer.qidToBuffer(pq->fromQid, &fromBuf);	//stop queries that have scanned the entire buffer	err = call DBBuffer.size(fromBuf, &size);	if (err != err_NoError || pq->curResult++ >= size) {	  //bool ok,prev;	  //prev = call Interrupt.disable();	  	  //UNSET_DELIVERING_TUPLES();	  pq->markedForDeletion = EPOCHS_TIL_DELETION; //we need to destroy this query asap	  //removeQuery(pq->qid, &ok);	  //SET_DELIVERING_TUPLES();	  //if (prev) call Interrupt.enable();	  	}      }            //send tuples for next query      if (!pending) post deliverTuplesTask();      mCurExpr = -1; //reset for next query    } else {      UNSET_DELIVERING_TUPLES(); //now that tuples from last epoch are delivered, start fetching                                //new tuples      dbg(DBG_USR1,"FETCTHING TUPLES\n"); //fflush(stdout);      startFetchingTuples();    }  }  /** Called from the main timer loop -- task that drains      the internal message queue.      Uses mOutputCount to track the time until the next      message should be sent.  */  void sendWaitingMessages() {    if (mOutputCount > 0) {      mOutputCount--;    }            if (mOutputCount <= 0) {      TinyDBError err;      DbMsgHdr *hdr;      ParsedQuery *q;      if (IS_SENDING_MESSAGE()) { //don't dequeue if we're already sending ... 	call signalError(err_MSF_SendWaitingBusy);	return;      }      SET_SENDING_MESSAGE();      err = dequeueMessage(&mMsg);            if (err == err_NoError) {	getQuery(call QueryResultIntf.queryIdFromMsg((char *)mMsg.data), &q);	call Leds.redToggle();	      	hdr = (DbMsgHdr *)mMsg.data;	//hdr->xmitSlots = mNumSenders;	if (q != NULL)	  hdr->timeRemaining = (unsigned char)(q->clockCount & 0x00FF);	else	  hdr->timeRemaining = 0xFF;		if (call Network.sendDataMessage(&mMsg) != err_NoError) {	  UNSET_SENDING_MESSAGE();	  call signalError(err_MSF_SendWaiting);	}	//schedule the next result to deliver	if (TOS_LOCAL_ADDRESS == q->queryRoot) {	  mOutputCount = 4;	} else {	  if (mFixedComm) {	    mOutputCount = TOS_LOCAL_ADDRESS * 2;	  }	else {	    mOutputCount = max(4,(((call Random.rand() & 0x7FFF) % 			     ((mNumSenders >> 1)+1)) << 1)); 	  }	  //mOutputCount = 16;	}      } else {	UNSET_SENDING_MESSAGE();	if (err != err_NoMoreResults) 	  call signalError(err);      }    }  }  /** Event that's signalled when a send is complete */  event result_t Network.outputDone(TOS_MsgPtr msg, uint8_t amId) {    if (IS_SENDING_MESSAGE() ) {      UNSET_SENDING_MESSAGE();      if (/*msg == &mQmsg &&*/ IS_SENDING_QUERY()) {	mSendQueryNextClock = TRUE;      }     }    return SUCCESS;  }  void startFetchingTuples() {    QueryListHandle qlh = mQs;    bool mustSample = FALSE;    //update queries, determine if any needs to sample data this epoch    while (qlh != NULL) {      //reset queries that just restarted      if ((**qlh).q.clocksPerSample > 0 && (**qlh).q.clockCount == (**qlh).q.clocksPerSample) {	resetTupleState(&(**qlh).q); //clear it out	call Network.endOfEpoch();      }      if ((**qlh).q.clocksPerSample > 0 && --(**qlh).q.clockCount <= 0) {	call Leds.yellowToggle();	if ((**qlh).q.markedForDeletion > 0) {	  if (--(**qlh).q.markedForDeletion == 0) { //delete the query	    bool success;	    removeQuery((**qlh).q.qid, &success);	  } 	  (**qlh).q.clockCount = (**qlh).q.clocksPerSample; // just reschedule this query...	} else {	  //only actually process local tuples if we're not the root.	  if ((**qlh).q.queryRoot != TOS_LOCAL_ADDRESS) mustSample = TRUE;	  (**qlh).q.currentEpoch++;	}	break;      }      qlh = (QueryListHandle)(**qlh).next;    }    if (mustSample) {      fetchNextAttr();    }    }  void resetTupleState(ParsedQuery *q) {    short i;    Expr *e;    //clear out this tuple    call TupleIntf.tupleInit(q,call ParsedQueryIntf.getTuplePtr(q));    for (i = 0; i < q->numExprs; i++) {      e = call ParsedQueryIntf.getExprPtr(q, i);      call AggOperator.endOfEpoch(q, e);    }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -