📄 tuplerouterm.nc
字号:
UNSET_SPACE_ALLOCED(); dbg(DBG_USR3, "freeing query in READING_QUERY\n"); call MemAlloc.free((Handle)mCurQuery); //mCurQuery } UNSET_READING_QUERY(); } else { mTriedAllocWaiting = TRUE; UNSET_IS_IN_QUERY_MSG(); return SUCCESS; //waiting for query to be alloced -- dont interrupt } } else if (! IS_SPACE_ALLOCED()) { UNSET_IS_IN_QUERY_MSG(); return SUCCESS; //failure -- space not alloced for this query yet! } else { oldField = addQueryField(qmsg); } //go ahead and forward this guy on, as long as it's new, or we're the root if (!oldField || qmsg->fwdNode == TOS_UART_ADDR) forwardQuery(qmsg); } //note that we can fall through from previous clause if (!IS_READING_QUERY() /*&& !IS_SENDING_MESSAGE()*/) { Query pq; QueryMessagePtr qmsgCopy = call Network.getQueryPayLoad(&mMsg); SET_READING_QUERY(); UNSET_SPACE_ALLOCED(); mTriedAllocWaiting = FALSE; pq.qid = qmsg->qid; pq.numFields=qmsg->numFields; pq.numExprs=qmsg->numExprs; *qmsgCopy = *qmsg; //save a copy //allocate space for query if (!allocPendingQuery(&continueQuery, &pq)) { UNSET_READING_QUERY(); //barf! } } } else if (qmsg->fwdNode == TOS_UART_ADDR) //forward on if it comes from UART forwardQuery(qmsg); UNSET_IS_IN_QUERY_MSG(); } return SUCCESS; } /** Forward out a query message, setting errors as appropriate if the radio is already busy. Note, this uses the mMsg module variable. @param msg The message to send (a copy is made into mMsg, so the application can overwrite after this call) @return err_MSF_ForwardKnownQuery if message send failed, err_MSG_ForwardKnownQueryBusy if radio was busy */ TinyDBError forwardQuery(QueryMessagePtr qmsg) { TinyDBError err = err_NoError; if (!IS_SENDING_MESSAGE()) { QueryMessagePtr qmsgCopy; SET_SENDING_MESSAGE(); qmsgCopy = call Network.getQueryPayLoad(&mMsg); *qmsgCopy = *qmsg; qmsgCopy->fwdNode = TOS_LOCAL_ADDRESS; mMustTimestamp = TS_QUERY_MESSAGE; mTimestampMsg = &mMsg; post queryMsgTask(); //if (call Network.sendQueryMessage(&mMsg) != err_NoError) { //atomic { //mMustTimestamp = TS_NO; //} //UNSET_SENDING_MESSAGE(); //err = err_MSF_ForwardKnownQuery; //goto done; // } } else err = err_MSF_ForwardKnownQueryBusy; if (err != err_NoError) SIG_ERR(err); return err; } /** Continuation after query is successfully alloc'ed @param memory The newly allocated handle (must to be non-null) */ void continueQuery(Handle *memory) { QueryMessage *qmsg = call Network.getQueryPayLoad(&mMsg); short i; mCurQuery = (Query **)*memory; (**mCurQuery).qid = qmsg->qid; (**mCurQuery).numFields=qmsg->numFields; (**mCurQuery).numExprs=qmsg->numExprs; (**mCurQuery).epochDuration=qmsg->epochDuration; (**mCurQuery).fromBuffer = qmsg->fromBuffer; (**mCurQuery).fromCatalogBuffer = qmsg->fromCatalogBuffer; (**mCurQuery).bufferType = qmsg->bufferType; (**mCurQuery).queryRoot = 0; // obsoleted (**mCurQuery).knownFields = 0; (**mCurQuery).knownExprs = 0; (**mCurQuery).needsEvent = qmsg->hasEvent; dbg(DBG_USR2, "needsEvent = %d, qms.hasEvent = %d", (**mCurQuery).needsEvent,qmsg->hasEvent); (**mCurQuery).hasForClause = qmsg->hasForClause; if ((**mCurQuery).bufferType != kRADIO) { //special buffer info (**mCurQuery).hasBuf = FALSE; (**mCurQuery).buf.cmd.hasParam = FALSE; } (**mCurQuery).hasEvent = FALSE; dbg (DBG_USR3, "num fields = %d\n", qmsg->numFields); for (i = qmsg->numFields; i < MAX_FIELDS; i++) (**mCurQuery).knownFields |= (1 << i); for (i = qmsg->numExprs; i < MAX_EXPRS; i++) (**mCurQuery).knownExprs |= (1 << i); dbg (DBG_USR3, "completeMask = %x, %x\n",(**mCurQuery).knownFields, (**mCurQuery).knownExprs); SET_SPACE_ALLOCED(); addQueryField(qmsg); //now forward the message on forwardQuery(qmsg); } /** Given a query message, add the corresponding field or expression to a partially completed query @return true iff we already knew about this field @param msg The query message */ bool addQueryField(QueryMessagePtr qmsg) { bool knewAbout = FALSE; if (qmsg->type == kFIELD) { call QueryIntf.setField(*mCurQuery, (short)qmsg->idx, qmsg->u.field); knewAbout = (**mCurQuery).knownFields & (1 << qmsg->idx); (**mCurQuery).knownFields |= (1 << qmsg->idx); //dbg (DBG_USR3,"Setting field idx %d\n",qmsg->idx); //fflush(stdout); } else if (qmsg->type == kEXPR) { qmsg->u.expr.opState = NULL; //make sure we clear this out call QueryIntf.setExpr(*mCurQuery, qmsg->idx, qmsg->u.expr); //call statusMessage(((char *)*mCurQuery) + 24); //dbg (DBG_USR3, "Setting expr idx %d\n",qmsg->idx); //fflush(stdout); knewAbout = (**mCurQuery).knownExprs & (1 << qmsg->idx); (**mCurQuery).knownExprs |= (1 << qmsg->idx); } else if (qmsg->type == kBUF_MSG) { knewAbout = (**mCurQuery).hasBuf; (**mCurQuery).hasBuf = TRUE; (**mCurQuery).buf = qmsg->u.buf; } else if (qmsg->type == kEVENT_MSG) { knewAbout = (**mCurQuery).hasEvent; (**mCurQuery).hasEvent = TRUE; strcpy((**mCurQuery).eventName, qmsg->u.eventName); dbg(DBG_USR2, "GOT EVENT: %s\n", (**mCurQuery).eventName); } else if (qmsg->type == kN_EPOCHS_MSG) { knewAbout = (**mCurQuery).hasForClause; (**mCurQuery).hasForClause = TRUE; (**mCurQuery).numEpochs = qmsg->u.numEpochs; dbg(DBG_USR2, "GOT NumEpochs: %d\n", (**mCurQuery).numEpochs); } if (queryComplete(**mCurQuery)) { SET_PARSING_QUERY(); //allocate a parsed query for this query, initialize it dbg(DBG_USR3,"Query is complete!\n");//fflush(stdout); //lock this down, since we'll be using it for awhile call MemAlloc.lock((Handle)mCurQuery); allocQuery(&parsedCallback, *mCurQuery); } return knewAbout; } /** Called when the buffer has been allocated */ void finishedBufferSetup(ParsedQuery *pq) { SET_STARTING_ATTRIBUTE(); mNumAttrs += pq->numFields; startQueryAttrs(pq); setSampleRate(); //adjust clock rate to be gcd of rate of all queries //all done UNSET_READING_QUERY(); UNSET_PARSING_QUERY(); } /** Continuation after parsed query is successfully alloc'ed NOTE: after we setup query, need to resize for tuple at end of query... @param memory Newly allocated handle for the parsed query */ void parsedCallback(Handle *memory) { QueryListHandle h = (QueryListHandle)*memory; //this has already been allocated Expr e; //dbg(DBG_USR3,"in parsed callback \n");//fflush(stdout); e = call QueryIntf.getExpr(*mCurQuery, 0); call MemAlloc.lock((Handle)h); dbg(DBG_USR3,"parsing \n");//fflush(stdout); mTmpHandle = (Handle)h; if (!parseQuery(*mCurQuery, &((**h).q))) { //failure? (**h).q.qid = (**mCurQuery).qid; //cleanup //we know it's idle, so force mQidToRemove = (**h).q.qid; mForceRemove = TRUE; post removeQueryTask(); call MemAlloc.unlock((Handle)h); call MemAlloc.unlock((Handle)mCurQuery); call MemAlloc.free((Handle)mCurQuery); UNSET_READING_QUERY(); UNSET_PARSING_QUERY(); return; } } /** Callback routine that indicates parsing is complete */ void parsedQuery(bool success) { QueryListHandle h = (QueryListHandle)mTmpHandle; int qid = (**mCurQuery).qid; TinyDBError err = err_NoError; //dbg(DBG_USR3,"unlocking \n");//fflush(stdout); call MemAlloc.unlock((Handle)h); //locked this earlier call MemAlloc.unlock((Handle)mCurQuery); call MemAlloc.free((Handle)mCurQuery); mCurQuery = NULL; if (success) { dbg(DBG_USR3,"finished, now resizing\n");//fflush(stdout); if (reallocQueryForTuple(&resizedCallback, (QueryListHandle)h) == FALSE) { err = err_OutOfMemory; goto fail; } } else { err = err_UnknownError; goto fail; } return; fail: if (err != err_UnknownError) SIG_ERR(err); (**h).q.qid = qid; //cleanup //we know it's idle, so force mQidToRemove = qid; mForceRemove = TRUE; post removeQueryTask(); UNSET_READING_QUERY(); UNSET_PARSING_QUERY(); } /** Continuation after the query is realloced to include space for a tuple @param memory Resized parsed query */ void resizedCallback(Handle *memory) { QueryListHandle h = (QueryListHandle)*memory; //this has already been allocated ParsedQuery *pq = &((**h).q); bool pending = FALSE; TinyDBError err = err_NoError; dbg(DBG_USR3,"finished with resizing\n");//fflush(stdout); //set up the output buffer for this query switch (pq->bufferType) { case kRADIO: pq->bufferId = kRADIO_BUFFER; break; case kRAM: case kEEPROM: { RamBufInfo *ram = &pq->buf.ram; //look to see if we're logging to an already created (named) buffer if (ram->hasOutput) { mTempPQ = pq; if (strlen(ram->outBufName) != 0 && !ram->create ) { //we'll get an error from this call if the buffer doesn't exist err = call DBBuffer.getBufferIdFromName(ram->outBufName,&pq->bufferId); if (err != err_NoError) { //if it doesn't exist, look for it in the header SET_OPENING_WRITE_BUF();#ifdef kMATCHBOX if (pq->bufferType == kEEPROM) err = call DBBuffer.loadEEPROMBuffer(ram->outBufName);#else err = err_NoMatchbox;#endif if (err != err_NoError) { SIG_ERR(err); goto fail; } else pending = TRUE; } if (!pending) { call DBBuffer.openBuffer(pq->bufferId, &pending); } } else { err = call DBBuffer.nextUnusedBuffer(&pq->bufferId); if (err == err_NoError) { //call UartDebugger.writeLine("CREATE BUF", 10); err = call DBBuffer.alloc(pq->bufferId, pq->bufferType, ram->numRows, ram->policy, pq , ram->outBufName, &pending, 0); } } mTempPQ = pq; //--SRM-- // err = call DBBuffer.openBufferForWriting(pq->bufferId, &pending); //next state -- call finishedOpeningBuffer if (!pending) { //call UartDebugger.writeLine("nopending", 9); finishedOpeningWriteBuffer(pq); } return; } else { pq->bufferId = kRADIO_BUFFER; } } break; case kCOMMAND: { long cmd_param; uint8_t *cmd_buf = (uint8_t *)&cmd_param; CommandDescPtr cmd = call CommandUse.getCommand(pq->buf.cmd.name); if (cmd == NULL) { SIG_ERR(err_UnknownCommand); goto fail; } cmd_buf[0] = cmd->idx; *(short *)(&cmd_buf[1]) = pq->buf.cmd.param; err = call DBBuffer.nextUnusedBuffer(&pq->bufferId); if (err == err_NoError) err = call DBBuffer.alloc(pq->bufferId, kCOMMAND, 0, 0, NULL, NULL, &pending, cmd_param); if (err != err_NoError) { SIG_ERR(err); goto fail; } } break; default: SIG_ERR(err_UnknownError); goto fail; } if (!pending) finishedBufferSetup(pq); return; fail: failedOpeningWriteBuffer(pq); return; } /* Called when the write buffer fails to open properly */ void failedOpeningWriteBuffer(ParsedQuery *pq) { //we know it's idle, so force mQidToRemove = pq->qid; mForceRemove = TRUE; post removeQueryTask(); UNSET_READING_QUERY(); UNSET_PARSING_QUERY(); } /* Should be called after a RAM or EEPROM buffer has been successfully opened for writing. Typechecks the buffer and complete query allocation. */ void finishedOpeningWriteBuffer(ParsedQuery *pq) { dbg(DBG_USR2,"TYPECHECKING\n"); //now, we need to type check the buffer with our schema if (!(call ParsedQueryIntf.typeCheck(*(call DBBuffer.getSchema(pq->bufferId)), pq))) { //SIG_ERR(err_Typecheck); goto fail; } else { //call UartDebugger.writeLine("typechecked", 11); } dbg(DBG_USR2,"TYPECHECKED\n"); finishedBufferSetup(pq); return; fail: failedOpeningWriteBuffer(pq); return; } /** Remove a query from the tuple router @param qid The query to remove @param success Set TRUE if the query was succesfully removed, FALSE if the query couldn't be found or an error occurred. @param force Remove this query even if it may be in flight @return err_RemoveRouterFailed if router is in a state such that the query may be use, err_NoError otherwise. */ task void removeQueryTask() { //remove information about the specified query id QueryListHandle curq; QueryListHandle last = NULL; if (!mForceRemove && (IS_FETCHING_ATTRIBUTE() || IS_ROUTING_TUPLES() || IS_DELIVERING_TUPLES() || IS_SENDING_MESSAGE()) || IS_STARTING_ATTRIBUTE()) { SIG_ERR(err_RemoveFailedRouterBusy); //return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -