📄 tuplerouterm.nc
字号:
curq = mQs; while (curq != NULL) { if ((**curq).q.qid == mQidToRemove) { //this is the one to remove //best not remove if we're currently sending out this query! if (IS_SENDING_QUERY() && (**mCurSendingQuery).q.qid == (**curq).q.qid) { SIG_ERR(err_RemoveFailedRouterBusy); return; } if (last != NULL) { //not the first element (**last).next = (**curq).next; } else { //the first element atomic { mQs = (QueryListHandle)(**curq).next; } } if (mTail == curq) //was the last element mTail = last; //ok if this is also the first element, since this will now be NULL if (mQs == NULL) { //no more queries, stop the clock! mIsRunning = FALSE; UNSET_SNOOZING(); //new queries will require us to restart scheduling // try to stop the service scheduler call ServiceScheduler.remove(kTINYDB_SERVICE_ID); call AbsoluteTimer.cancel(); //call Timer.stop(); mOldRate = 0; //clear rate info... if (mStopped) { call StdControl.start(); } mStopped = TRUE; call Leds.redOff(); call Leds.yellowOff(); call Leds.greenOff(); } #ifdef kSUPPORTS_EVENTS //delete correspondance between the command and event //NOTE: we don't delete the command here -- command interface will allow us to // redefine this command id later, so let's not worry about it if ((**curq).q.hasEvent) { EventDescPtr e = call EventUse.getEventById((**curq).q.eventId); CommandDescPtr c = call CommandUse.getCommandById((**curq).q.eventCmdId); if (e != NULL && c != NULL) { dbg(DBG_USR2, "Removing correspondance between event %s and command %s\n", e->name, c->name); call EventUse.deleteEventCallback(e->name, c->name); } }#endif //notify children (e.g. AGG_OPERATOR) that this query is complete dbg(DBG_USR2,"ENDING QUERY : %d\n", (**curq).q.qid); signal QueryProcessor.queryComplete(&(**curq).q);#ifdef HSN_ROUTING mHSNValue = 20; // reset to initial value, XXX again 20 is arbitrary#endif if (curq == mCurRouteQuery) mCurRouteQuery=NULL; call MemAlloc.free((Handle)curq); return; } else { last = curq; curq = (QueryListHandle)(**curq).next; } } } /** Send mCurSendingQuery to a neighbor (we assume the query is in mCurSendingQuery because the sending must be done in multiple phases) */ task void sendQuery() { //this task assembles the query one field / attribute at a time, //send each out in a separate radio message (just like they are delivered). //task is resceduled after SEND_DONE_EVENT fires QueryListHandle curq = mCurSendingQuery; QueryMessage *qmsg = call Network.getQueryPayLoad(&mQmsg); if (curq == NULL) { UNSET_SENDING_QUERY(); return; } //NOTE -- we don't current share queries that aren't over the base //sensor data! if ((**curq).q.fromBuffer != kNO_QUERY) { UNSET_SENDING_QUERY(); return; } // find the next requested query message while (mCurSendingField < (**curq).q.numFields && (mCurQMsgMask & (1 << mCurSendingField))) mCurSendingField++; if (mCurSendingField < (**curq).q.numFields) { char fieldId = call ParsedQueryIntf.getFieldId(&(**curq).q, (short)mCurSendingField); qmsg->msgType = ADD_MSG; qmsg->qid = (**curq).q.qid; qmsg->numFields = (**curq).q.numFields; qmsg->numExprs = (**curq).q.numExprs; qmsg->fromBuffer = (**curq).q.fromBuffer; qmsg->fromCatalogBuffer = (**curq).q.fromCatalogBuffer; qmsg->hasForClause = (**curq).q.hasForClause; qmsg->epochDuration = (**curq).q.epochDuration; qmsg->bufferType = (**curq).q.bufferType; qmsg->fwdNode = TOS_LOCAL_ADDRESS; mCurSendingField++; if (!(call ParsedQueryIntf.queryFieldIsNull(fieldId))) { AttrDescPtr attr = call AttrUse.getAttrById(fieldId); qmsg->type = kFIELD; qmsg->idx = mCurSendingField-1; strcpy(qmsg->u.field.name, attr->name); call Leds.greenToggle(); if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); atomic { mMustTimestamp = TS_QUERY_MESSAGE; mTimestampMsg = &mQmsg; } if (call Network.sendQueryMessage(&mQmsg) != err_NoError) { UNSET_SENDING_MESSAGE(); atomic { mMustTimestamp = TS_NO; } } } } else { //field is null (we don't know what it's name should be) -- do the next one post sendQuery(); } } else { // find the next requested query message while (mCurSendingExpr < (**curq).q.numExprs && (mCurQMsgMask & (1 << (**curq).q.numFields + mCurSendingExpr))) mCurSendingExpr++; if (mCurSendingExpr < (**curq).q.numExprs) { Expr e = call ParsedQueryIntf.getExpr(&(**curq).q, mCurSendingExpr); mCurSendingExpr++; qmsg->type = kEXPR; qmsg->idx = mCurSendingExpr-1; qmsg->u.expr = e; //this could be bad! (extra bytes on end of expression might overwrite memory) //call Leds.redToggle(); if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); atomic { mMustTimestamp = TS_QUERY_MESSAGE; mTimestampMsg = &mQmsg; } if (call Network.sendQueryMessage(&mQmsg) != err_NoError) { UNSET_SENDING_MESSAGE(); atomic { mMustTimestamp = TS_NO; } } } //send the command that should be invoked in response to this query } else if ((**curq).q.bufferType != kRADIO && mCurSendingExpr == (**curq).q.numExprs && !(mCurQMsgMask & (1 << ((**curq).q.numFields + (**curq).q.numExprs)))) { qmsg->type = kBUF_MSG; qmsg->u.buf = (**curq).q.buf; mCurSendingExpr++; call Leds.yellowToggle(); if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); if (call Network.sendQueryMessage(&mQmsg) != err_NoError) UNSET_SENDING_MESSAGE(); } } else { UNSET_SENDING_QUERY(); } } }#ifdef kQUERY_SHARING /** Message indicating neighbor requested a query from us If we know about the query, and aren't otherwise occupied with sending / routing, send the query back @param The QueryRequestMessage from the neighbor */ event result_t Network.queryRequestSub(QueryRequestMessagePtr qmsg) { char qid = qmsg->qid; QueryListHandle curq; //if we're already sending this query, ignore duplicate //requests... if (IS_SENDING_QUERY() && mCurSendingQuery) { if (qid == (**mCurSendingQuery).q.qid) return SUCCESS; } //triedQueryRequest flag set to true when a neighbor requests a //query but we're sending another one //if we get another such request, we'll abort the current one (ick) if (!IS_SENDING_MESSAGE() && (!IS_SENDING_QUERY() || mTriedQueryRequest)) { mTriedQueryRequest = FALSE; mCurSendingQuery = NULL; SET_SENDING_QUERY(); } else { mTriedQueryRequest = TRUE; return SUCCESS; } curq = mQs; while (curq != NULL) { if ((**curq).q.qid == qid) { //the query we're supposed to send mCurSendingField = 0; mCurSendingExpr = 0; mCurSendingQuery = curq; mCurQMsgMask = qmsg->qmsgMask; post sendQuery(); break; } curq = (QueryListHandle)(**curq).next; } if (!mCurSendingQuery) UNSET_SENDING_QUERY(); return SUCCESS; }#endif // return a bit vector representing the query messages that have already // be received for query sharing. This avoids repeatedly sending all query // messages for sharing static uint32_t getQueryMsgMask() { uint32_t mask = 0; if (IS_READING_QUERY()) { QueryPtr q = *mCurQuery; short i; mask = q->knownFields; mask |= (q->knownExprs << q->numFields); if (q->bufferType != kRADIO && q->hasBuf) mask |= (1 << (q->numFields + q->numExprs)); else mask &= ~(1 << (q->numFields + q->numExprs)); // clear the rest of the bits for (i = q->numFields + q->numExprs + 1; i < 32; i++) mask &= ~(1 << i); } return mask; } task void queryMsgTask() { if (call Network.sendQueryMessage(&mMsg) != err_NoError) { UNSET_SENDING_MESSAGE(); atomic { mMustTimestamp = TS_NO; } } } /** A message not directly addressed to us that we overhead Use this for time synchronization with our parent, and to snoop on queries. @param msg The message @param amId The amid of the message @param isParent If the message is from our parent */ event result_t Network.snoopedSub(QueryResultPtr qrMsg, bool isParent, uint16_t senderid) { ParsedQuery *q; uint8_t qid; QueryRequestMessage *qreq = call Network.getQueryRequestPayLoad(&mMsg); //check and see if it has information about a query we haven't head before //don't snoop on queries from the root (it wont reply)!#ifdef kQUERY_SHARING //if (senderid != 0) { call Leds.greenToggle(); qid = call QueryResultIntf.queryIdFromMsg(qrMsg); //is this a query we've never heard of before? if (!getQuery(qid, &q) && qid != mStoppedQid) { if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); qreq->qid = qid; qreq->qmsgMask = getQueryMsgMask(); //post queryMsgTask(); if (call Network.sendQueryRequest(&mMsg, senderid) != err_NoError) UNSET_SENDING_MESSAGE(); } } else if (qid == mStoppedQid) { //send a cancel message QueryMessage *qm = call Network.getQueryPayLoad(&mMsg); //if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); qm->qid = qid; qm->msgType = DEL_MSG; call Leds.yellowToggle(); post queryMsgTask(); //if (call Network.sendQueryMessage(&mMsg) != err_NoError) // UNSET_SENDING_MESSAGE(); } //} //}#endif //did this message come from our parent? if (isParent || senderid == 0) { QueryResult qr; //epoch sync with parent qid = call QueryResultIntf.queryIdFromMsg(qrMsg); if (getQuery(qid, &q)) { call QueryResultIntf.fromBytes(qrMsg, &qr, q); if (qr.epoch > q->currentEpoch + 1) //make sure epoch is monotonically increasing; off by one OK? q->currentEpoch = qr.epoch; } //sync with parent doTimeSync(qrMsg->timeSyncData, qrMsg->clockCount); // Each node now estimates its local neighborhood size (so we don't do the following) // mNumSenders = hdr->xmitSlots; mLastHeard = 0; } return SUCCESS; } /* --------------------------------- Tuple / Data Arrival ---------------------------------*/ /** Continue processing a tuple after a selection operator Basically, if the tuple passed the selection, we continue routing it to additional operators. Otherwise, we move on to the next query for routing. @param t The tuple that has been processed by the operator, @param q The query that this tuple belongs to @param e The expression that processed the tuple @param passed Indicates whether the tuple passed the operator -- if not, the tuple should not be output. @return err_NoError*/ event TinyDBError SelOperator.processedTuple(Tuple *t, ParsedQuery *q, Expr *e, bool passed) { if (!passed) { e->success = FALSE; mCurRouteQuery = nextQueryToRoute(mCurRouteQuery); } post routeTask(); return err_NoError; } event TinyDBError SelOperator.processedResult(QueryResult *qr, ParsedQuery *q, Expr *e) { return err_NoError; //not implemented } /** Continue processing a tuple after an aggregation operator has been applied @param t The tuple passed into the operator @param q The query that the tuple belongs to @param e The expression that processed the tuple @param passed (Should be true for aggregates) */ event TinyDBError AggOperator.processedTuple(Tuple *t, ParsedQuery *q, Expr *e, bool passed) { post routeTask(); return err_NoError; } /** Called every time we route a query result through an aggregate operator. @param qr The query result we processed @param q The query it belongs to @param e The expression that processed it Need to route to the next aggregation operator. */ event TinyDBError AggOperator.processedResult(QueryResult *qr, ParsedQuery *q, Expr *e) { //maybe unset a status variable? aggregateResult(q, qr, e->idx+1); return err_NoError; } /** Received a result from a neighbor -- need to either:<p>
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -