⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tuplerouterm.nc

📁 nesC写的heed算法
💻 NC
📖 第 1 页 / 共 5 页
字号:
            curq = mQs;    while (curq != NULL) {      if ((**curq).q.qid == mQidToRemove) {       //this is the one to remove	//best not remove if we're currently sending out this query!	if (IS_SENDING_QUERY() && (**mCurSendingQuery).q.qid == (**curq).q.qid) {	  SIG_ERR(err_RemoveFailedRouterBusy);	  return;	}	if (last != NULL) {       //not the first element	  (**last).next = (**curq).next;	} else {  //the first element	  atomic {	    mQs = (QueryListHandle)(**curq).next;	  }	}	if (mTail == curq) //was the last element	  mTail = last; //ok if this is also the first element, since this will now be NULL	if (mQs == NULL) { //no more queries, stop the clock!	  mIsRunning = FALSE;	  UNSET_SNOOZING(); //new queries will require us to restart scheduling	  // try to stop the service scheduler	  call ServiceScheduler.remove(kTINYDB_SERVICE_ID);	  call AbsoluteTimer.cancel();	  //call Timer.stop();	  mOldRate = 0; //clear rate info...	  	  if (mStopped) {	    call StdControl.start();	  }	  mStopped = TRUE;	  	  call Leds.redOff();	  call Leds.yellowOff();	  call Leds.greenOff();	} #ifdef kSUPPORTS_EVENTS	//delete correspondance between the command and event	//NOTE: we don't delete the command here -- command interface will allow us to	// redefine this command id later, so let's not worry about it	if ((**curq).q.hasEvent) {	  EventDescPtr e = call EventUse.getEventById((**curq).q.eventId);	  CommandDescPtr c = call CommandUse.getCommandById((**curq).q.eventCmdId);	  if (e != NULL && c != NULL) {	    dbg(DBG_USR2, "Removing correspondance between event %s and command %s\n", e->name, c->name);	    call EventUse.deleteEventCallback(e->name, c->name);	    	  }	}#endif	//notify children (e.g. AGG_OPERATOR) that this query is complete	dbg(DBG_USR2,"ENDING QUERY : %d\n", (**curq).q.qid);	signal QueryProcessor.queryComplete(&(**curq).q);#ifdef HSN_ROUTING	mHSNValue = 20; // reset to initial value, XXX again 20 is arbitrary#endif	if (curq == mCurRouteQuery)	  mCurRouteQuery=NULL;	call MemAlloc.free((Handle)curq);	return;      } else {	last = curq;	curq = (QueryListHandle)(**curq).next;      }    }  }  /** Send mCurSendingQuery to a neighbor (we assume the query is in   mCurSendingQuery because the sending must be done in multiple phases)  */  task void sendQuery() {    //this task assembles the query one field / attribute at a time,    //send each out in a separate radio message (just like they are delivered).    //task is resceduled after SEND_DONE_EVENT fires    QueryListHandle curq = mCurSendingQuery;    QueryMessage *qmsg = call Network.getQueryPayLoad(&mQmsg);    if (curq == NULL) {      UNSET_SENDING_QUERY();      return;    }    //NOTE -- we don't current share queries that aren't over the base    //sensor data!    if ((**curq).q.fromBuffer != kNO_QUERY) {      UNSET_SENDING_QUERY();      return;    }	// find the next requested query message	while (mCurSendingField < (**curq).q.numFields &&		   (mCurQMsgMask & (1 << mCurSendingField)))		mCurSendingField++;    if (mCurSendingField < (**curq).q.numFields) {      char fieldId = call ParsedQueryIntf.getFieldId(&(**curq).q, (short)mCurSendingField);      qmsg->msgType = ADD_MSG;      qmsg->qid = (**curq).q.qid;      qmsg->numFields = (**curq).q.numFields;      qmsg->numExprs = (**curq).q.numExprs;      qmsg->fromBuffer = (**curq).q.fromBuffer;      qmsg->fromCatalogBuffer = (**curq).q.fromCatalogBuffer;      qmsg->hasForClause = (**curq).q.hasForClause;      qmsg->epochDuration = (**curq).q.epochDuration;      qmsg->bufferType = (**curq).q.bufferType;      qmsg->fwdNode = TOS_LOCAL_ADDRESS;      mCurSendingField++;      if (!(call ParsedQueryIntf.queryFieldIsNull(fieldId))) {	AttrDescPtr attr = call AttrUse.getAttrById(fieldId);	qmsg->type = kFIELD;	qmsg->idx = mCurSendingField-1;	strcpy(qmsg->u.field.name, attr->name);		call Leds.greenToggle();	if (!IS_SENDING_MESSAGE()) {	  SET_SENDING_MESSAGE();		  atomic {	    mMustTimestamp = TS_QUERY_MESSAGE;	    mTimestampMsg = &mQmsg;	  }	  if (call Network.sendQueryMessage(&mQmsg) != err_NoError) {	    UNSET_SENDING_MESSAGE();	    atomic {	      mMustTimestamp = TS_NO;	    }	  }	}      } else {	//field is null (we don't know what it's name should be) -- do the next one	post sendQuery();      }    } else {		// find the next requested query message		while (mCurSendingExpr < (**curq).q.numExprs &&			   (mCurQMsgMask & (1 << (**curq).q.numFields + mCurSendingExpr)))			mCurSendingExpr++;		if (mCurSendingExpr < (**curq).q.numExprs) {		  Expr e = call ParsedQueryIntf.getExpr(&(**curq).q, mCurSendingExpr);		  mCurSendingExpr++;		  qmsg->type = kEXPR;		  qmsg->idx = mCurSendingExpr-1;		  qmsg->u.expr = e; //this could be bad! (extra bytes on end of expression might overwrite memory)		  //call Leds.redToggle();		  if (!IS_SENDING_MESSAGE()) {		SET_SENDING_MESSAGE();		atomic {		  mMustTimestamp = TS_QUERY_MESSAGE;			  mTimestampMsg = &mQmsg;		}		if (call Network.sendQueryMessage(&mQmsg) != err_NoError) {		  UNSET_SENDING_MESSAGE();		  atomic {			mMustTimestamp = TS_NO;		  }		}		  }	//send the command that should be invoked in response to this query		} else if ((**curq).q.bufferType != kRADIO && 			   mCurSendingExpr == (**curq).q.numExprs &&			   !(mCurQMsgMask & (1 << ((**curq).q.numFields + (**curq).q.numExprs)))) { 		  qmsg->type =  kBUF_MSG;		  qmsg->u.buf = (**curq).q.buf;		  mCurSendingExpr++;		  call Leds.yellowToggle();		  if (!IS_SENDING_MESSAGE()) {		SET_SENDING_MESSAGE();				if (call Network.sendQueryMessage(&mQmsg) != err_NoError)		  UNSET_SENDING_MESSAGE();		  }		}		else {		  UNSET_SENDING_QUERY();		}	}        }#ifdef kQUERY_SHARING  /** Message indicating neighbor requested a query from us       If we know about the query, and aren't otherwise      occupied with sending / routing, send the query back      @param The QueryRequestMessage from the neighbor   */  event result_t Network.queryRequestSub(QueryRequestMessagePtr qmsg) {    char qid = qmsg->qid;    QueryListHandle curq;      //if we're already sending this query, ignore duplicate    //requests...    if (IS_SENDING_QUERY() && mCurSendingQuery) {      if (qid == (**mCurSendingQuery).q.qid)	return SUCCESS;    }    //triedQueryRequest flag set to true when a neighbor requests a    //query but we're sending another one    //if we get another such request, we'll abort the current one (ick)    if (!IS_SENDING_MESSAGE() && (!IS_SENDING_QUERY() || mTriedQueryRequest)) {      mTriedQueryRequest = FALSE;      mCurSendingQuery = NULL;      SET_SENDING_QUERY();    } else {      mTriedQueryRequest = TRUE;      return SUCCESS;    }      curq = mQs;    while (curq != NULL) {      if ((**curq).q.qid == qid) {	//the query we're supposed to send	mCurSendingField = 0;	mCurSendingExpr = 0;	mCurSendingQuery = curq;	mCurQMsgMask = qmsg->qmsgMask;	post sendQuery();	break;      }      curq = (QueryListHandle)(**curq).next;    }    if (!mCurSendingQuery) UNSET_SENDING_QUERY();      return SUCCESS;  }#endif  // return a bit vector representing the query messages that have already  // be received for query sharing.  This avoids repeatedly sending all query  // messages for sharing  static uint32_t getQueryMsgMask()  {	uint32_t mask = 0;  	if (IS_READING_QUERY()) {		QueryPtr q = *mCurQuery;		short i;		mask = q->knownFields;		mask |= (q->knownExprs << q->numFields);		if (q->bufferType != kRADIO && q->hasBuf)			mask |= (1 << (q->numFields + q->numExprs));		else			mask &= ~(1 << (q->numFields + q->numExprs));		// clear the rest of the bits		for (i = q->numFields + q->numExprs + 1; i < 32; i++)			mask &= ~(1 << i);	}	return mask;  }  task void queryMsgTask() {    if (call Network.sendQueryMessage(&mMsg) != err_NoError) {      UNSET_SENDING_MESSAGE();      atomic {	mMustTimestamp = TS_NO;      }    }  }  /** A message not directly addressed to us that we overhead      Use this for time synchronization with our parent, and      to snoop on queries.      @param msg The message      @param amId The amid of the message      @param isParent If the message is from our parent  */  event result_t Network.snoopedSub(QueryResultPtr qrMsg, bool isParent, uint16_t senderid) {    ParsedQuery *q;    uint8_t qid;    QueryRequestMessage *qreq = call Network.getQueryRequestPayLoad(&mMsg);    //check and see if it has information about a query we haven't head before    //don't snoop on queries from the root (it wont reply)!#ifdef kQUERY_SHARING    //if (senderid != 0) {    call Leds.greenToggle();      qid = call QueryResultIntf.queryIdFromMsg(qrMsg);      //is this a query we've never heard of before?      if (!getQuery(qid, &q) && qid != mStoppedQid) {	if (!IS_SENDING_MESSAGE()) {	  SET_SENDING_MESSAGE();	  qreq->qid = qid;	  qreq->qmsgMask = getQueryMsgMask();	  //post queryMsgTask();	  if (call Network.sendQueryRequest(&mMsg, senderid) != err_NoError)	    UNSET_SENDING_MESSAGE();	}	      } else if (qid == mStoppedQid) {	//send a cancel message	QueryMessage *qm = call Network.getQueryPayLoad(&mMsg);	//if (!IS_SENDING_MESSAGE()) {	  SET_SENDING_MESSAGE();	  qm->qid = qid;	  qm->msgType = DEL_MSG;		  call Leds.yellowToggle();	  post queryMsgTask();	  //if (call Network.sendQueryMessage(&mMsg) != err_NoError)	  //  UNSET_SENDING_MESSAGE();	}	      //}            //}#endif    //did this message come from our parent?    if (isParent || senderid == 0) {      QueryResult qr;      //epoch sync with parent      qid = call QueryResultIntf.queryIdFromMsg(qrMsg);      if (getQuery(qid, &q)) {	call QueryResultIntf.fromBytes(qrMsg, &qr, q);	if (qr.epoch > q->currentEpoch + 1) //make sure epoch is monotonically increasing;  off by one OK?	 q->currentEpoch = qr.epoch;      }      //sync with parent      doTimeSync(qrMsg->timeSyncData, qrMsg->clockCount);      // Each node now estimates its local neighborhood size (so we don't do the following)      //      mNumSenders = hdr->xmitSlots;      mLastHeard = 0;    }      return SUCCESS;  }  /* --------------------------------- Tuple / Data Arrival ---------------------------------*/  /** Continue processing a tuple  after a selection operator      Basically, if the tuple passed the selection, we continue routing it to      additional operators.  Otherwise, we move on to the next query for routing.   @param t The tuple that has been processed by the operator,   @param q The query that this tuple belongs to   @param e The expression that processed the tuple   @param passed Indicates whether the tuple passed the operator --   if not, the tuple should not be output.   @return err_NoError*/  event TinyDBError SelOperator.processedTuple(Tuple *t,					ParsedQuery *q,					Expr *e,					bool passed)    {      if (!passed) {	e->success = FALSE;	mCurRouteQuery = nextQueryToRoute(mCurRouteQuery);      }      post routeTask();      return err_NoError;    }  event TinyDBError SelOperator.processedResult(QueryResult *qr, ParsedQuery *q, Expr *e) {      return err_NoError; //not implemented  }  /** Continue processing a tuple after an aggregation operator has been applied      @param t The tuple passed into the operator      @param q The query that the tuple belongs to      @param e The expression that processed the tuple      @param passed (Should be true for aggregates)  */  event TinyDBError AggOperator.processedTuple(Tuple *t, ParsedQuery *q,					 Expr *e, bool passed)    {      post routeTask();      return err_NoError;    }  /** Called every time we route a query result through an aggregate operator.      @param qr The query result we processed      @param q The query it belongs to      @param e The expression that processed it            Need to route to the next aggregation operator.  */  event TinyDBError AggOperator.processedResult(QueryResult *qr, ParsedQuery *q, Expr *e) {    //maybe unset a status variable?    aggregateResult(q, qr, e->idx+1);    return err_NoError;  }  /** Received a result from a neighbor -- need to      either:<p>      

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -