⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tuplerouterm.nc

📁 用于传感器网络的节点操作系统 TinyOS 结构设计非常有意思
💻 NC
📖 第 1 页 / 共 5 页
字号:
	//notify children (e.g. AGG_OPERATOR) that this query is complete	signal QueryProcessor.queryComplete(&(**curq).q);	call MemAlloc.free((Handle)curq);	return err_NoError;      } else {	last = curq;	curq = (QueryListHandle)(**curq).next;      }    }    return err_NoError; //not an error if query doesn't exist  }  /** Send mCurSendingQuery to a neighbor (we assume the query is in   mCurSendingQuery because the sending must be done in multiple phases)  */  task void sendQuery() {    //this task assembles the query one field / attribute at a time,    //send each out in a separate radio message (just like they are delivered).    //task is resceduled after SEND_DONE_EVENT fires    QueryListHandle curq = mCurSendingQuery;    QueryMessage *qmsg = (QueryMessage *)mQmsg.data;    if (curq == NULL) return;    if (mCurSendingField < (**curq).q.numFields) {      char fieldId = call ParsedQueryIntf.getFieldId(&(**curq).q, (short)mCurSendingField);      qmsg->msgType = ADD_MSG;      qmsg->qid = (**curq).q.qid;      qmsg->numFields = (**curq).q.numFields;      qmsg->numExprs = (**curq).q.numExprs;      qmsg->fromQid = (**curq).q.fromQid;      qmsg->epochDuration = (**curq).q.epochDuration;      qmsg->bufferType = (**curq).q.bufferType;      qmsg->queryRoot = (**curq).q.queryRoot;      mCurSendingField++;      if (!(call ParsedQueryIntf.queryFieldIsNull(fieldId))) {	AttrDescPtr attr = call AttrUse.getAttrById(fieldId);	qmsg->type = kFIELD;	qmsg->idx = mCurSendingField-1;	strcpy(qmsg->u.field.name, attr->name);		call Leds.greenToggle();	if (!IS_SENDING_MESSAGE()) {	  SET_SENDING_MESSAGE();		  if (call Network.sendQueryMessage(&mQmsg) != err_NoError)	    UNSET_SENDING_MESSAGE();	}      } else {	//field is null (we don't know what it's name should be) -- do the next one	post sendQuery();      }    } else if (mCurSendingExpr < (**curq).q.numExprs) {      Expr e = call ParsedQueryIntf.getExpr(&(**curq).q, mCurSendingExpr);      mCurSendingExpr++;      qmsg->type = kEXPR;      qmsg->idx = mCurSendingExpr-1;      qmsg->u.expr = e; //this could be bad! (extra bytes on end of expression might overwrite memory)      call Leds.redToggle();      if (!IS_SENDING_MESSAGE()) {	SET_SENDING_MESSAGE();		if (call Network.sendQueryMessage(&mQmsg) != err_NoError)	  UNSET_SENDING_MESSAGE();      }    } else if ((**curq).q.bufferType != kRADIO) { //send the command that should be invoked in response to this query      qmsg->type =  kBUF_MSG;      qmsg->u.buf = (**curq).q.buf;      call Leds.yellowToggle();      if (!IS_SENDING_MESSAGE()) {	SET_SENDING_MESSAGE();		if (call Network.sendQueryMessage(&mQmsg) != err_NoError)	  UNSET_SENDING_MESSAGE();      }    }    else {      UNSET_SENDING_QUERY();    }        }  /** Message indicating neighbor requested a query from us       If we know about the query, and aren't otherwise      occupied with sending / routing, send the query back      @param The QueryResultMessage from the neighbor   */  event TOS_MsgPtr Network.queryRequestSub(TOS_MsgPtr msg) {    QueryRequestMessage *qmsg = (QueryRequestMessage *)(msg->data);    char qid = qmsg->qid;    QueryListHandle curq;      //if we're already sending this query, ignore duplicate    //requests...    if (IS_SENDING_QUERY() && mCurSendingQuery) {      if (qid == (**mCurSendingQuery).q.qid)	return msg;    }    //triedQueryRequest flag set to true when a neighbor requests a     //query but we're sending another one    //if we get another such request, we'll abort the current one (ick)    if (!IS_SENDING_MESSAGE() && (!IS_SENDING_QUERY() || mTriedQueryRequest)) {      mTriedQueryRequest = FALSE;      mCurSendingQuery = NULL;      SET_SENDING_QUERY();    } else {      mTriedQueryRequest = TRUE;      return msg;    }      curq = mQs;    while (curq != NULL) {      if ((**curq).q.qid == qid) {	//the query we're supposed to send	mCurSendingField = 0;	mCurSendingExpr = 0;	mCurSendingQuery = curq;	post sendQuery();	break;      }      curq = (QueryListHandle)(**curq).next;    }    if (!mCurSendingQuery) UNSET_SENDING_QUERY();      return msg;  }  /** A message not directly addressed to us that we overhead      Use this for time synchronization with our parent, and      to snoop on queries.      @param msg The message      @param amId The amid of the message      @param isParent If the message is from our parent   */  event TOS_MsgPtr Network.snoopedSub(TOS_MsgPtr msg, uint8_t amId, bool isParent) {    ParsedQuery *q;    char qid;    DbMsgHdr *hdr = (DbMsgHdr *)msg->data;    //check and see if it has information about a query we haven't head before    //don't snoop on queries from the root (it wont reply)!    mMsgsThisInterval++;    if (amId == kDATA_MESSAGE_ID && hdr->senderid != 0) {      QueryRequestMessage *qreq = (QueryRequestMessage *)mMsg.data;      qid = call QueryResultIntf.queryIdFromMsg((char *)msg->data);      //is this a query we've never heard of before?      if (!getQuery(qid, &q)) {	qreq->qid = qid;	if (!IS_SENDING_MESSAGE()) {	  SET_SENDING_MESSAGE();	  if (call Network.sendQueryRequest(&mMsg, hdr->senderid) != err_NoError)	    UNSET_SENDING_MESSAGE();	}      }    }    //did this message come from our parent?    if (isParent) {      QueryResult qr;      //epoch sync with parent      qid = call QueryResultIntf.queryIdFromMsg((char *)msg->data);      if (getQuery(qid, &q)) {	call QueryResultIntf.fromMsgBytes((char *)msg->data, &qr, q);	if (qr.epoch > q->currentEpoch + 1) //make sure epoch is monotonically increasing;  off by one OK?	  q->currentEpoch = qr.epoch;	if (hdr->timeRemaining != 0xFF) {	  if (q->clockCount > hdr->timeRemaining + 1 ||	      q->clockCount < hdr->timeRemaining - 1) 	    {	      q->clockCount = (q->clockCount & 0xFF00) | hdr->timeRemaining;	    }	}      }      // Each node now estimates its local neighborhood size (so we don't do the following)      //      mNumSenders = hdr->xmitSlots;    }      return msg;  }  /* --------------------------------- Tuple / Data Arrival ---------------------------------*/  /** Continue processing a tuple  after a selection operator      Basically, if the tuple passed the selection, we continue routing it to      additional operators.  Otherwise, we move on to the next query for routing.   @param t The tuple that has been processed by the operator,   @param q The query that this tuple belongs to   @param e The expression that processed the tuple   @param passed Indicates whether the tuple passed the operator --    if not, the tuple should not be output.   @return err_NoError*/  event TinyDBError SelOperator.processedTuple(Tuple *t,					ParsedQuery *q,					Expr *e, 					bool passed)     {      if (!passed) {	e->success = FALSE;	mCurRouteQuery = nextQueryToRoute(mCurRouteQuery);      }      post routeTask();      return err_NoError;    }  event TinyDBError SelOperator.processedResult(QueryResult *qr, ParsedQuery *q, Expr *e) {      return err_NoError; //not implemented  }  /** Continue processing a tuple after an aggregation operator has been applied      @param t The tuple passed into the operator       @param q The query that the tuple belongs to      @param e The expression that processed the tuple      @param passed (Should be true for aggregates)  */  event TinyDBError AggOperator.processedTuple(Tuple *t, ParsedQuery *q,					 Expr *e, bool passed)    {      post routeTask();      return err_NoError;    }  /** Called every time we route a query result through an aggregate operator.      @param qr The query result we processed      @param q The query it belongs to      @param e The expression that processed it            Need to route to the next aggregation operator.  */  event TinyDBError AggOperator.processedResult(QueryResult *qr, ParsedQuery *q, Expr *e) {    //maybe unset a status variable?    aggregateResult(q, qr, e->idx+1);    return err_NoError;  }  /** Received a result from a neighbor -- need to       either:<p>            1) process it, if is an aggregate result<p>        or<p>      2) forward it, if it is a non-aggregate result<p>      @param msg The message that was received  */  event TOS_MsgPtr Network.dataSub(TOS_MsgPtr msg) {    QueryResult qr;    ParsedQuery *q;    short i;    bool gotAgg = FALSE;    bool pending;    call Leds.greenToggle();    mMsgsThisInterval++;    if (getQuery(call QueryResultIntf.queryIdFromMsg((char *)msg->data), &q)) {      //if this query is going to be deleted, reset the counter until      //deletion since we're still hearing neighbors results about it...      if (q->markedForDeletion) {	q->markedForDeletion = EPOCHS_TIL_DELETION;      }      call QueryResultIntf.fromMsgBytes((char *)msg->data, &qr, q);      //now determine where to route this result to -- either an      //aggregation operator or to our parent      for (i = 0; i < q->numExprs; i++) {	Expr e = call ParsedQueryIntf.getExpr(q, i);	if (e.opType != kSEL) {	  gotAgg = TRUE;	  break;	}      }      if (!gotAgg) { //didn't give to an aggregate, so just pass it on...	TinyDBError err;	mEnqResult = *(QueryResultPtr)(msg->data + sizeof(DbMsgHdr));      	err = call DBBuffer.enqueue(q->bufferId, &mEnqResult, &pending, q);		//ignore result buffer busy items for now, since they just mean	//we can't keep up with the sample rate the user is requested , but we 	//shouldn't abort	if (err != err_ResultBufferBusy && err != err_NoError) call signalError(err);      } else { //got an agg -- do all the aggregation expressions	mResult = qr;	if (!IS_AGGREGATING_RESULT()) //don't double aggregate!	  aggregateResult(q, &mResult, 0);      }    } else {      signal Network.snoopedSub(msg,kDATA_MESSAGE_ID, FALSE);      //    statusMessage("unknown neighbor q!");    }    return msg;  }  /** Apply all aggregate operators to this result.    Apply them one after another, starting with exprId.    <p>    This is called from TUPLE_ROUTER_RESULT_MESSAGE and from     AGGREGATED_RESULT_EVENT    @param q The query that the result applies to    @param qr The query result    @param exprID The expression to apply to qr*/  void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId) {    Expr *e;    if (exprId >= q->numExprs) { //no more aggregation expressions      UNSET_AGGREGATING_RESULT();      return;    }    e = call ParsedQueryIntf.getExprPtr(q,exprId);    if (e->opType != kSEL) {      SET_AGGREGATING_RESULT();      if (call AggOperator.processPartialResult(qr, q, e) != err_NoError) {	UNSET_AGGREGATING_RESULT(); //error, just do the next one 	//(errors may just mean the result doesn't apply to the agg)	aggregateResult(q,qr,exprId+1);      }    } else      aggregateResult(q,qr,exprId+1); //move on to the next one  }  /* --------------------------------- Timer Events ---------------------------------*/  /** Adjust the rate that the main tuple router clock fires at based      on EPOCH DURATION of all of the queries that are currently installed  */  void setSampleRate() {    QueryListHandle qlh;    short rate = -1;    bool prevInt;    //walk through queries, choose lowest sample rate/*      qlh = mQs; *//*      while (qlh != NULL) { *//*        if (rate == -1)  *//*  	rate = (**qlh).q.epochDuration; *//*        else  *//*  	rate = gcd((**qlh).q.epochDuration,rate); *//*        qlh = (QueryListHandle)(**qlh).next; *//*      } */  /*      //throttle rate to maximum *//*      if (rate <= MIN_SAMPLE_RATE) { *//*        //    rate = gcd(MIN_SAMPLE_RATE,rate); *//*        rate = MIN_SAMPLE_RATE; *//*      } *//*      dbg(DBG_USR1,"rate = %d\n", rate); //fflush(stdout); */      //HACK    rate = 32; //hardcode!    //now set the rate at which we have to deliver tuples to each query    //as a multiple of this rate    qlh = mQs;    while (qlh != NULL) {      if ((**qlh).q.epochDuration == kONE_SHOT) { //read it fast!	(**qlh).q.clocksPerSample = 16;	(**qlh).q.curResult = 0;      } else	(**qlh).q.clocksPerSample = (**qlh).q.epochDuration / rate;            prevInt = call Interrupt.disable();      (**qlh).q.clockCount = (**qlh).q.clocksPerSample; //reset counter      if (prevInt) call Interrupt.enable();      qlh = (QueryListHandle)(**qlh).next;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -