📄 tuplerouterm.nc
字号:
mOldRate = 0; mOutputCount = 0; mFetchTries = 0; //hangs in fetch sometimes -- retry count mTriedAllocWaiting = FALSE; mTriedQueryRequest = FALSE; mFixedComm = FALSE; mNumSenders = 16; //something fairly long mSendQueryNextClock = FALSE; mTicksThisInterval = NUM_TICKS_PER_INTERVAL; mMsgsThisInterval = 0; mSending = FALSE; mAllocState = STATE_NOT_ALLOCING; mLastQuery = NULL; call ChildControl.init(); call TimerControl.init(); call Random.init(); return SUCCESS; } command result_t StdControl.start() { call ChildControl.start(); return SUCCESS; } command result_t StdControl.stop() { call ChildControl.stop(); return SUCCESS; } /* --------------------------------- Query Handling ---------------------------------*/ /** Message indicating the arrival of (part of) a query */ event TOS_MsgPtr Network.querySub(TOS_MsgPtr msg) { QueryMessage *qmsg = (QueryMessage *)msg->data; ParsedQuery *q; short i; bool success; bool oldField = TRUE; //is a request to delete an existing query if (qmsg->msgType == DEL_MSG) { TinyDBError err; ParsedQuery *pq; bool isRoot; if (IS_IN_QUERY_MSG()) return msg; //ignore this! SET_IS_IN_QUERY_MSG(); if (getQuery(qmsg->qid, &pq)) isRoot = TOS_LOCAL_ADDRESS == pq->queryRoot; else isRoot = FALSE; call Leds.redOn(); call Leds.greenOn(); call Leds.yellowOn(); err = removeQuery(qmsg->qid, &success); if (err != err_NoError) call signalError(err); if (success || isRoot) { //only forward if we know about the query, or if we're the root if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); mMsg = *msg; if (call Network.sendQueryMessage(&mMsg) != err_NoError) { UNSET_SENDING_MESSAGE(); call signalError(err_MSF_DelMsg); } }else call signalError(err_MSF_DelMsgBusy); } UNSET_IS_IN_QUERY_MSG(); return msg; } //otherwise, assume its an ADD_MSG, for now if (!IS_IN_QUERY_MSG()) { if (!getQuery(qmsg->qid, &q)) { //ignore if we already know about this query SET_IS_IN_QUERY_MSG(); if (IS_READING_QUERY()) { if (qmsg->qid != (**mCurQuery).qid) { if (IS_SPACE_ALLOCED() || mTriedAllocWaiting) { //query is alloced, but heard about a new one //forget old one if (IS_SPACE_ALLOCED()) call MemAlloc.free((Handle)mCurQuery); UNSET_SPACE_ALLOCED(); UNSET_READING_QUERY(); } else { mTriedAllocWaiting = TRUE; UNSET_IS_IN_QUERY_MSG(); return msg; //waiting for query to be alloced -- dont interrupt } } else if (! IS_SPACE_ALLOCED()) { UNSET_IS_IN_QUERY_MSG(); return msg; //failure -- space not alloced for this query yet! } else { oldField = addQueryField(msg); } //go ahead and forward this guy on, as long as it's new, or we're the root if (!oldField || TOS_LOCAL_ADDRESS == qmsg->queryRoot) forwardQuery(msg); } //note that we can fall through from previous clause if (!IS_READING_QUERY() /*&& !IS_SENDING_MESSAGE()*/) { Query pq; SET_READING_QUERY(); UNSET_SPACE_ALLOCED(); mTriedAllocWaiting = FALSE; pq.qid = qmsg->qid; pq.numFields=qmsg->numFields; pq.numExprs=qmsg->numExprs; pq.epochDuration = qmsg->epochDuration; pq.fromQid = qmsg->fromQid; pq.bufferType = qmsg->bufferType; pq.knownFields = 0; pq.knownExprs = 0; pq.queryRoot = qmsg->queryRoot; for (i = qmsg->numFields; i < MAX_FIELDS; i++) pq.knownFields |= (1 << i); for (i = qmsg->numExprs; i < MAX_EXPRS; i++) pq.knownExprs |= (1 << i); dbg(DBG_USR1,"completeMask = %x, %x\n",pq.knownFields, pq.knownExprs);//fflush(stdout); mMsg = *msg; //save a copy //allocate space for query allocPendingQuery(&continueQuery, &pq); } } else if (TOS_LOCAL_ADDRESS == qmsg->queryRoot) //forward on if we're the root forwardQuery(msg); UNSET_IS_IN_QUERY_MSG(); } return msg; } /** Forward out a query message, setting errors as appropriate if the radio is already busy. Note, this uses the mMsg module variable. @param msg The message to send (a copy is made into mMsg, so the application can overwrite after this call) @return err_MSF_ForwardKnownQuery if message send failed, err_MSG_ForwardKnownQueryBusy if radio was busy */ TinyDBError forwardQuery(TOS_MsgPtr msg) { TinyDBError err = err_NoError; if (!IS_SENDING_MESSAGE()) { SET_SENDING_MESSAGE(); mMsg = *msg; if (call Network.sendQueryMessage(&mMsg) != err_NoError) { UNSET_SENDING_MESSAGE(); err = err_MSF_ForwardKnownQuery; goto done; } } else err = err_MSF_ForwardKnownQueryBusy; done: if (err != err_NoError) call signalError(err); return err; } /** Continuation after query is successfully alloc'ed @param memory The newly allocated handle (must to be non-null) */ void continueQuery(Handle *memory) { QueryMessage *qmsg = (QueryMessage *)(mMsg.data); short i; mCurQuery = (Query **)*memory; (**mCurQuery).qid = qmsg->qid; (**mCurQuery).numFields=qmsg->numFields; (**mCurQuery).numExprs=qmsg->numExprs; (**mCurQuery).epochDuration=qmsg->epochDuration; (**mCurQuery).fromQid = qmsg->fromQid; (**mCurQuery).bufferType = qmsg->bufferType; (**mCurQuery).queryRoot = qmsg->queryRoot; (**mCurQuery).knownFields = 0; (**mCurQuery).knownExprs = 0; if ((**mCurQuery).bufferType != kRADIO) { //special buffer info (**mCurQuery).hasBuf = FALSE; (**mCurQuery).buf.cmd.hasParam = FALSE; } dbg (DBG_USR1, "num fields = %d\n", qmsg->numFields); for (i = qmsg->numFields; i < MAX_FIELDS; i++) (**mCurQuery).knownFields |= (1 << i); for (i = qmsg->numExprs; i < MAX_EXPRS; i++) (**mCurQuery).knownExprs |= (1 << i); dbg (DBG_USR1, "completeMask = %x, %x\n",(**mCurQuery).knownFields, (**mCurQuery).knownExprs); SET_SPACE_ALLOCED(); addQueryField(&mMsg); //now forward the message on forwardQuery(&mMsg); } /** Given a query message, add the corresponding field or expression to a partially completed query @return true iff we already knew about this field @param msg The query message */ bool addQueryField(TOS_MsgPtr msg) { QueryMessage *qmsg = (QueryMessage *)msg->data; bool knewAbout = FALSE; if (qmsg->type == kFIELD) { call QueryIntf.setField(*mCurQuery, (short)qmsg->idx, qmsg->u.field); knewAbout = (**mCurQuery).knownFields & (1 << qmsg->idx); (**mCurQuery).knownFields |= (1 << qmsg->idx); dbg (DBG_USR1,"Setting field idx %d\n",qmsg->idx); //fflush(stdout); } else if (qmsg->type == kEXPR) { qmsg->u.expr.opState = NULL; //make sure we clear this out call QueryIntf.setExpr(*mCurQuery, qmsg->idx, qmsg->u.expr); //call statusMessage(((char *)*mCurQuery) + 24); dbg (DBG_USR1, "Setting expr idx %d\n",qmsg->idx); //fflush(stdout); knewAbout = (**mCurQuery).knownExprs & (1 << qmsg->idx); (**mCurQuery).knownExprs |= (1 << qmsg->idx); } else if (qmsg->type == kBUF_MSG) { knewAbout = (**mCurQuery).hasBuf; (**mCurQuery).hasBuf = TRUE; (**mCurQuery).buf = qmsg->u.buf; } if (queryComplete(**mCurQuery)) { SET_PARSING_QUERY(); //allocate a parsed query for this query, initialize it dbg(DBG_USR1,"Query is complete!\n");//fflush(stdout); //lock this down, since we'll be using it for awhile call MemAlloc.lock((Handle)mCurQuery); allocQuery(&parsedCallback, *mCurQuery); } return knewAbout; } /** Called when the buffer has been allocated */ void finishedBufferSetup() { setSampleRate(); //adjust clock rate to be gcd of rate of all queries //all done UNSET_READING_QUERY(); UNSET_PARSING_QUERY(); } /** Continuation after parsed query is successfully alloc'ed NOTE: after we setup query, need to resize for tuple at end of query... @param memory Newly allocated handle for the parsed query */ void parsedCallback(Handle *memory) { QueryListHandle h = (QueryListHandle)*memory; //this has already been allocated bool success; Expr e; dbg(DBG_USR1,"in parsed callback \n");//fflush(stdout); e = call QueryIntf.getExpr(*mCurQuery, 0); call MemAlloc.lock((Handle)h); dbg(DBG_USR1,"parsing \n");//fflush(stdout); if (!parseQuery(*mCurQuery, &((**h).q))) { //failure? call MemAlloc.unlock((Handle)h); (**h).q.qid = (**mCurQuery).qid; //cleanup removeQuery((**h).q.qid, &success); call MemAlloc.free((Handle)mCurQuery); return; } dbg(DBG_USR1,"unlocking \n");//fflush(stdout); call MemAlloc.unlock((Handle)h); //locked this earlier call MemAlloc.unlock((Handle)mCurQuery); call MemAlloc.free((Handle)mCurQuery); dbg(DBG_USR1,"finished, now resizing\n");//fflush(stdout); reallocQueryForTuple(&resizedCallback, (QueryListHandle)h); } /** Continuation after the query is realloced to include space for a tuple @param memory Resized parsed query */ void resizedCallback(Handle *memory) { QueryListHandle h = (QueryListHandle)*memory; //this has already been allocated ParsedQuery *pq = &((**h).q); bool pending = FALSE, success; TinyDBError err; dbg(DBG_USR1,"finished with resizing\n");//fflush(stdout); //set up the output buffer for this query switch (pq->bufferType) { case kRADIO: pq->bufferId = kRADIO_BUFFER; break; case kRAM: { RamBufInfo ram = pq->buf.ram; err = call DBBuffer.nextUnusedBuffer(&pq->bufferId); if (err == err_NoError) { err = call DBBuffer.alloc(pq->bufferId, kRAM, ram.numRows, ram.policy, pq , &pending, 0); } if (err != err_NoError) { call signalError(err); goto fail; } } break; case kCOMMAND: { long cmd_param; uint8_t *cmd_buf = (uint8_t *)&cmd_param; CommandDescPtr cmd = call CommandUse.getCommand(pq->buf.cmd.name); if (cmd == NULL) { call signalError(err_UnknownCommand); goto fail; } cmd_buf[0] = cmd->idx; *(short *)(&cmd_buf[1]) = pq->buf.cmd.param; err = call DBBuffer.nextUnusedBuffer(&pq->bufferId); if (err == err_NoError) err = call DBBuffer.alloc(pq->bufferId, kCOMMAND, 0, 0, NULL, &pending, cmd_param); if (err != err_NoError) { call signalError(err); goto fail; } } break; default: call signalError(err_UnknownError); goto fail; } if (!pending) finishedBufferSetup(); return; fail: removeQuery(pq->qid, &success); } /** Remove a query from the tuple router @param qid The query to remove @param success Set TRUE if the query was succesfully removed, FALSE if the query couldn't be found or an error occurred. @return err_RemoveRouterFailed if router is in a state such that the query may be use, err_NoError otherwise. */ TinyDBError removeQuery(uint8_t qid, BoolPtr success) { //remove information about the specified query id QueryListHandle curq; QueryListHandle last = NULL; *success = FALSE; if (IS_FETCHING_ATTRIBUTE() || IS_ROUTING_TUPLES() || IS_DELIVERING_TUPLES() || IS_SENDING_MESSAGE()) return err_RemoveFailedRouterBusy; curq = mQs; while (curq != NULL) { if ((**curq).q.qid == qid) { //this is the one to remove *success = TRUE; if (last != NULL) { //not the first element (**last).next = (**curq).next; } else { //the first element mQs = (QueryListHandle)(**curq).next; } if (mTail == curq) //was the last element mTail = last; //ok if this is also the first element, since this will now be NULL if (mQs == NULL) { //no more queries, stop the clock! call Timer.stop(); mOldRate = 0; //clear rate info... call Leds.redOff(); call Leds.yellowOff(); call Leds.greenOff(); } else computeOutputRate(); //adjust number of comm slots
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -