📄 tuplerouterm.nc
字号:
//notify children (e.g. AGG_OPERATOR) that this query is complete signal QueryProcessor.queryComplete(&(**curq).q); call MemAlloc.free((Handle)curq); return err_NoError; } else { last = curq; curq = (QueryListHandle)(**curq).next; } } return err_NoError; //not an error if query doesn't exist } /** Send mCurSendingQuery to a neighbor (we assume the query is in mCurSendingQuery because the sending must be done in multiple phases) */ task void sendQuery() { //this task assembles the query one field / attribute at a time, //send each out in a separate radio message (just like they are delivered). //task is resceduled after SEND_DONE_EVENT fires QueryListHandle curq = mCurSendingQuery; QueryMessage *qmsg = (QueryMessage *)mQmsg.data; if (curq == NULL) return; if (mCurSendingField < (**curq).q.numFields) { char fieldId = call ParsedQueryIntf.getFieldId(&(**curq).q, (short)mCurSendingField); qmsg->msgType = ADD_MSG; qmsg->qid = (**curq).q.qid; qmsg->numFields = (**curq).q.numFields; qmsg->numExprs = (**curq).q.numExprs; qmsg->fromQid = (**curq).q.fromQid; qmsg->epochDuration = (**curq).q.epochDuration; qmsg->bufferType = (**curq).q.bufferType; qmsg->queryRoot = (**curq).q.queryRoot; mCurSendingField++; if (!(call ParsedQueryIntf.queryFieldIsNull(fieldId))) { AttrDescPtr attr = call AttrUse.getAttrById(fieldId); qmsg->type = kFIELD; qmsg->idx = mCurSendingField-1; strcpy(qmsg->u.field.name, attr->name); call Leds.greenToggle(); if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); if (call Network.sendQueryMessage(&mQmsg) != err_NoError) UNSET_SENDING_MESSAGE(); } } else { //field is null (we don't know what it's name should be) -- do the next one post sendQuery(); } } else if (mCurSendingExpr < (**curq).q.numExprs) { Expr e = call ParsedQueryIntf.getExpr(&(**curq).q, mCurSendingExpr); mCurSendingExpr++; qmsg->type = kEXPR; qmsg->idx = mCurSendingExpr-1; qmsg->u.expr = e; //this could be bad! (extra bytes on end of expression might overwrite memory) call Leds.redToggle(); if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); if (call Network.sendQueryMessage(&mQmsg) != err_NoError) UNSET_SENDING_MESSAGE(); } } else if ((**curq).q.bufferType != kRADIO) { //send the command that should be invoked in response to this query qmsg->type = kBUF_MSG; qmsg->u.buf = (**curq).q.buf; call Leds.yellowToggle(); if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); if (call Network.sendQueryMessage(&mQmsg) != err_NoError) UNSET_SENDING_MESSAGE(); } } else { UNSET_SENDING_QUERY(); } } /** Message indicating neighbor requested a query from us If we know about the query, and aren't otherwise occupied with sending / routing, send the query back @param The QueryResultMessage from the neighbor */ event TOS_MsgPtr Network.queryRequestSub(TOS_MsgPtr msg) { QueryRequestMessage *qmsg = (QueryRequestMessage *)(msg->data); char qid = qmsg->qid; QueryListHandle curq; //if we're already sending this query, ignore duplicate //requests... if (IS_SENDING_QUERY() && mCurSendingQuery) { if (qid == (**mCurSendingQuery).q.qid) return msg; } //triedQueryRequest flag set to true when a neighbor requests a //query but we're sending another one //if we get another such request, we'll abort the current one (ick) if (!IS_SENDING_MESSAGE() && (!IS_SENDING_QUERY() || mTriedQueryRequest)) { mTriedQueryRequest = FALSE; mCurSendingQuery = NULL; SET_SENDING_QUERY(); } else { mTriedQueryRequest = TRUE; return msg; } curq = mQs; while (curq != NULL) { if ((**curq).q.qid == qid) { //the query we're supposed to send mCurSendingField = 0; mCurSendingExpr = 0; mCurSendingQuery = curq; post sendQuery(); break; } curq = (QueryListHandle)(**curq).next; } if (!mCurSendingQuery) UNSET_SENDING_QUERY(); return msg; } /** A message not directly addressed to us that we overhead Use this for time synchronization with our parent, and to snoop on queries. @param msg The message @param amId The amid of the message @param isParent If the message is from our parent */ event TOS_MsgPtr Network.snoopedSub(TOS_MsgPtr msg, uint8_t amId, bool isParent) { ParsedQuery *q; char qid; DbMsgHdr *hdr = (DbMsgHdr *)msg->data; //check and see if it has information about a query we haven't head before //don't snoop on queries from the root (it wont reply)! mMsgsThisInterval++; if (amId == kDATA_MESSAGE_ID && hdr->senderid != 0) { QueryRequestMessage *qreq = (QueryRequestMessage *)mMsg.data; qid = call QueryResultIntf.queryIdFromMsg((char *)msg->data); //is this a query we've never heard of before? if (!getQuery(qid, &q)) { qreq->qid = qid; if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); if (call Network.sendQueryRequest(&mMsg, hdr->senderid) != err_NoError) UNSET_SENDING_MESSAGE(); } } } //did this message come from our parent? if (isParent) { QueryResult qr; //epoch sync with parent qid = call QueryResultIntf.queryIdFromMsg((char *)msg->data); if (getQuery(qid, &q)) { call QueryResultIntf.fromMsgBytes((char *)msg->data, &qr, q); if (qr.epoch > q->currentEpoch + 1) //make sure epoch is monotonically increasing; off by one OK? q->currentEpoch = qr.epoch; if (hdr->timeRemaining != 0xFF) { if (q->clockCount > hdr->timeRemaining + 1 || q->clockCount < hdr->timeRemaining - 1) { q->clockCount = (q->clockCount & 0xFF00) | hdr->timeRemaining; } } } // Each node now estimates its local neighborhood size (so we don't do the following) // mNumSenders = hdr->xmitSlots; } return msg; } /* --------------------------------- Tuple / Data Arrival ---------------------------------*/ /** Continue processing a tuple after a selection operator Basically, if the tuple passed the selection, we continue routing it to additional operators. Otherwise, we move on to the next query for routing. @param t The tuple that has been processed by the operator, @param q The query that this tuple belongs to @param e The expression that processed the tuple @param passed Indicates whether the tuple passed the operator -- if not, the tuple should not be output. @return err_NoError*/ event TinyDBError SelOperator.processedTuple(Tuple *t, ParsedQuery *q, Expr *e, bool passed) { if (!passed) { e->success = FALSE; mCurRouteQuery = nextQueryToRoute(mCurRouteQuery); } post routeTask(); return err_NoError; } event TinyDBError SelOperator.processedResult(QueryResult *qr, ParsedQuery *q, Expr *e) { return err_NoError; //not implemented } /** Continue processing a tuple after an aggregation operator has been applied @param t The tuple passed into the operator @param q The query that the tuple belongs to @param e The expression that processed the tuple @param passed (Should be true for aggregates) */ event TinyDBError AggOperator.processedTuple(Tuple *t, ParsedQuery *q, Expr *e, bool passed) { post routeTask(); return err_NoError; } /** Called every time we route a query result through an aggregate operator. @param qr The query result we processed @param q The query it belongs to @param e The expression that processed it Need to route to the next aggregation operator. */ event TinyDBError AggOperator.processedResult(QueryResult *qr, ParsedQuery *q, Expr *e) { //maybe unset a status variable? aggregateResult(q, qr, e->idx+1); return err_NoError; } /** Received a result from a neighbor -- need to either:<p> 1) process it, if is an aggregate result<p> or<p> 2) forward it, if it is a non-aggregate result<p> @param msg The message that was received */ event TOS_MsgPtr Network.dataSub(TOS_MsgPtr msg) { QueryResult qr; ParsedQuery *q; short i; bool gotAgg = FALSE; bool pending; call Leds.greenToggle(); mMsgsThisInterval++; if (getQuery(call QueryResultIntf.queryIdFromMsg((char *)msg->data), &q)) { //if this query is going to be deleted, reset the counter until //deletion since we're still hearing neighbors results about it... if (q->markedForDeletion) { q->markedForDeletion = EPOCHS_TIL_DELETION; } call QueryResultIntf.fromMsgBytes((char *)msg->data, &qr, q); //now determine where to route this result to -- either an //aggregation operator or to our parent for (i = 0; i < q->numExprs; i++) { Expr e = call ParsedQueryIntf.getExpr(q, i); if (e.opType != kSEL) { gotAgg = TRUE; break; } } if (!gotAgg) { //didn't give to an aggregate, so just pass it on... TinyDBError err; mEnqResult = *(QueryResultPtr)(msg->data + sizeof(DbMsgHdr)); err = call DBBuffer.enqueue(q->bufferId, &mEnqResult, &pending, q); //ignore result buffer busy items for now, since they just mean //we can't keep up with the sample rate the user is requested , but we //shouldn't abort if (err != err_ResultBufferBusy && err != err_NoError) call signalError(err); } else { //got an agg -- do all the aggregation expressions mResult = qr; if (!IS_AGGREGATING_RESULT()) //don't double aggregate! aggregateResult(q, &mResult, 0); } } else { signal Network.snoopedSub(msg,kDATA_MESSAGE_ID, FALSE); // statusMessage("unknown neighbor q!"); } return msg; } /** Apply all aggregate operators to this result. Apply them one after another, starting with exprId. <p> This is called from TUPLE_ROUTER_RESULT_MESSAGE and from AGGREGATED_RESULT_EVENT @param q The query that the result applies to @param qr The query result @param exprID The expression to apply to qr*/ void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId) { Expr *e; if (exprId >= q->numExprs) { //no more aggregation expressions UNSET_AGGREGATING_RESULT(); return; } e = call ParsedQueryIntf.getExprPtr(q,exprId); if (e->opType != kSEL) { SET_AGGREGATING_RESULT(); if (call AggOperator.processPartialResult(qr, q, e) != err_NoError) { UNSET_AGGREGATING_RESULT(); //error, just do the next one //(errors may just mean the result doesn't apply to the agg) aggregateResult(q,qr,exprId+1); } } else aggregateResult(q,qr,exprId+1); //move on to the next one } /* --------------------------------- Timer Events ---------------------------------*/ /** Adjust the rate that the main tuple router clock fires at based on EPOCH DURATION of all of the queries that are currently installed */ void setSampleRate() { QueryListHandle qlh; short rate = -1; bool prevInt; //walk through queries, choose lowest sample rate/* qlh = mQs; *//* while (qlh != NULL) { *//* if (rate == -1) *//* rate = (**qlh).q.epochDuration; *//* else *//* rate = gcd((**qlh).q.epochDuration,rate); *//* qlh = (QueryListHandle)(**qlh).next; *//* } */ /* //throttle rate to maximum *//* if (rate <= MIN_SAMPLE_RATE) { *//* // rate = gcd(MIN_SAMPLE_RATE,rate); *//* rate = MIN_SAMPLE_RATE; *//* } *//* dbg(DBG_USR1,"rate = %d\n", rate); //fflush(stdout); */ //HACK rate = 32; //hardcode! //now set the rate at which we have to deliver tuples to each query //as a multiple of this rate qlh = mQs; while (qlh != NULL) { if ((**qlh).q.epochDuration == kONE_SHOT) { //read it fast! (**qlh).q.clocksPerSample = 16; (**qlh).q.curResult = 0; } else (**qlh).q.clocksPerSample = (**qlh).q.epochDuration / rate; prevInt = call Interrupt.disable(); (**qlh).q.clockCount = (**qlh).q.clocksPerSample; //reset counter if (prevInt) call Interrupt.enable(); qlh = (QueryListHandle)(**qlh).next; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -