📄 tuplerouterm.nc
字号:
/* --------------------------------- Tuple Building Routines ---------------------------------*/ /** Fetch the next needed attribute, and @return TRUE if an attribute was found and (possibly) more attribute exist. Does this by scanning the current queries, finding those that have fired and need fields filled in, and filling in those fields. Uses mLastQuery to track the last query that results were fetched for, and mFetchingField to track the last field that was filled in. Note that a return value of TRUE doesn't mean the recent attribute was actually found */ bool fetchNextAttr() { AttrDescPtr queryField; TupleFieldDesc fieldDesc; SchemaErrorNo errorNo; TinyDBError err; char *resultBuf; ParsedQuery *q; short i, fieldId = -1; bool pending; TinyDBError tdberr; uint8_t bufId; dbg(DBG_USR1,"in fetchNextAttr\n"); //fflush(stdout); //at least one query needs samples -- but which ones? fieldDesc = getNextQueryField(&q); if (fieldDesc.isNull == FALSE) { //get the result that we'll disassemble if this query fetches its //result from a buffer if ((mLastQuery == NULL || mLastQuery != q) && q->fromQid != kNO_QUERY) { mLastQuery = q; tdberr = call DBBuffer.qidToBuffer(q->fromQid, &bufId); if (tdberr != err_NoError) return FALSE; if (q->epochDuration == kONE_SHOT) { tdberr = call DBBuffer.getResult(bufId, q->curResult, &mResult, &pending); } else tdberr = call DBBuffer.peek(bufId, &mResult, &pending); if (tdberr == err_ResultBufferBusy) return TRUE; //try again? if (tdberr != err_NoError) return FALSE; } if (q->fromQid == kNO_QUERY) { queryField = fieldDesc.u.attr; mFetchingFieldId = queryField->idx; //figure out this field's local query index //CAREFUL: Invoke command can return very quickly, such that //we best have set this value before we call it, since if we do it //afterwards, it may completion before we have a chance to set the flag //So, we have to make sure to unset the flag below, when needed. SET_FETCHING_ATTRIBUTE(); for (i = 0; i < q->numFields;i++) { if (q->queryToSchemaFieldMap[i] == queryField->idx) fieldId = i; } if (fieldId != -1) { // use pre-allocated tuple space resultBuf = call TupleIntf.getFieldPtr(q, call ParsedQueryIntf.getTuplePtr(q), fieldId); if (call AttrUse.getAttrValue(queryField->name, resultBuf, &errorNo) == SUCCESS) { if (errorNo != SCHEMA_RESULT_PENDING) { mResultBuf = resultBuf; post fillInTask(); } if (errorNo != SCHEMA_ERROR) return TRUE; } } } else if (q->fromQid != kNO_QUERY) { ParsedQuery **fromPq = NULL; Tuple *t = call ParsedQueryIntf.getTuplePtr(q); resultBuf = call TupleIntf.getFieldPtr(q,t, fieldDesc.u.tupleIdx); err = call DBBuffer.qidToBuffer(q->fromQid, &bufId); if (err != err_NoError) return FALSE; fromPq = call DBBuffer.getSchema(bufId); err = call ParsedQueryIntf.getResultField(*fromPq, &mResult, q->queryToSchemaFieldMap[fieldDesc.u.tupleIdx], resultBuf); //already wrote the data there, but need to twiddle bits //these two lines instead of fillInAttrVal() like above above... call TupleIntf.setField(q, t, fieldDesc.u.tupleIdx, resultBuf); keepRouting(); } } mLastQuery = NULL; return FALSE; } /** Scan queries, looking for fields that haven't been defined yet */ TupleFieldDesc getNextQueryField(ParsedQuery **pq) { QueryListHandle qlh = mQs; AttrDescPtr attr = NULL; TupleFieldDesc d; TinyDBError err; uint8_t idx; d.isNull = TRUE; while (qlh != NULL) { if ((**qlh).q.clocksPerSample > 0 && (**qlh).q.clockCount <= 0) { //is this query's time up? Tuple *t = call ParsedQueryIntf.getTuplePtr(&(**qlh).q); ParsedQuery *q = &(**qlh).q; dbg(DBG_USR1,"q->qid = %d, t->qid = %d, t->numFields = %d\n", q->qid, t->qid, t->numFields); dbg(DBG_USR1,"calling GET_NEXT_QUERY_FIELD\n"); //fflush(stdout); if (q->fromQid == kNO_QUERY) { attr = call TupleIntf.getNextQueryField(q,t); if (attr != NULL) { d.isAttr = TRUE; d.u.attr = attr; d.isNull = FALSE; break; } } else { err = call TupleIntf.getNextEmptyFieldIdx(q,t, &idx); if (err == err_NoError) { d.isAttr = FALSE; d.u.tupleIdx = idx; d.isNull = FALSE; break; } } } qlh = (QueryListHandle)(**qlh).next; } if (qlh == NULL) *pq = NULL; else *pq = &(**qlh).q; return d; } /** Continue filling in tuple attributes and (if done with that) routing completed tuples to operators */ void keepRouting() { if (! fetchNextAttr()) { UNSET_FETCHING_ATTRIBUTE(); //clear, and try again SET_ROUTING_TUPLES(); //no more attributes to fetch, start processing tuples.... mCurRouteQuery = nextQueryToRoute(NULL); mCurExpr = -1; post routeTask(); } } /** Set the value of field mFetchingFieldId (which is the id of an attribute in the schema) in all queries that need the field to the data contained in resultBuf @param resultBuf The data to be placed in the field @param errorNo An error (if any) returned by the schema in response to the getAttr command that generated resultBuf */ void fillInAttrVal(char *resultBuf, SchemaErrorNo errorNo) { short id = mFetchingFieldId; //the mote-specific field this command has data for short i; QueryListHandle qlh = mQs; dbg(DBG_USR1,"GOT DATA, COMMAND data = %d, errorNo = %d\n", *(short *)resultBuf, errorNo); while (qlh != NULL) { if ((**qlh).q.clocksPerSample > 0 && (**qlh).q.clockCount <= 0) { //this query needs data ParsedQuery *q = &(**qlh).q; Tuple *t = call ParsedQueryIntf.getTuplePtr(&(**qlh).q); for (i = 0; i < q->numFields; i++) { if (q->queryToSchemaFieldMap[i] == id) { //the correct field in this query call TupleIntf.setField(q, t, i, resultBuf); dbg(DBG_USR1,"SET QUERY FIELD : %d\n", i); } } } qlh = (QueryListHandle)(**qlh).next; } keepRouting(); } /** Used to make attribute setting split phase even when its not ... */ task void fillInTask() { fillInAttrVal(mResultBuf, SCHEMA_RESULT_READY); } /** Completion event after some data was fetched Params should be filled out with the result of the command @param name The name of the attribute that was fetched @param resultBuf The value of the attribute @param errorNo Errors that occurred while fetching the result */ event result_t AttrUse.getAttrDone(char *name, char *resultBuf, SchemaErrorNo errorNo) { fillInAttrVal(resultBuf, errorNo); return SUCCESS; } /* --------------------------------- Tuple Routing Routines ---------------------------------*/ /** routeTask does the heavy lifting of routing tuples to queries. It assumes the tuples stored in each query that needs to be routed during this epoch have been initialized. It then iterates through the operators in each query, routing tuples to them in succession. Tuples are routed through a query at a time, and always in a fixed order. mCurRouteQuery is set to the query for which tuples are currently being routed. */ task void routeTask() { if (mCurRouteQuery != NULL) { ParsedQuery *q = &(**mCurRouteQuery).q; if (!routeToQuery(q, mCurTuple)) { //false here means move on to the next query mCurRouteQuery = nextQueryToRoute(mCurRouteQuery); post routeTask(); } } else { UNSET_ROUTING_TUPLES(); //all done routing } } /** @return the next query in the query list that needs to be output<br> Aassumes that all attributes have already been filled out (e.g. fetchNextAttr() returned false) mCurTuple is changed to point at the tuple corresponding to the returned query. */ QueryListHandle nextQueryToRoute(QueryListHandle curQuery) { mCurTuple = NULL; if (curQuery == NULL) { curQuery = mQs; } else curQuery = (QueryListHandle)(**curQuery).next; while (curQuery != NULL) { if ((**curQuery).q.clocksPerSample > 0 && (**curQuery).q.clockCount <= 0) { //this query is ready to go mCurTuple = call ParsedQueryIntf.getTuplePtr(&(**curQuery).q); break; } else { curQuery = (QueryListHandle)(**curQuery).next; } } return curQuery; } /** Route the specified tuple to the first operator of the specified query. This will send the tuple to an operator, which will return the tuple when it is done. @param q The query that t should be routed to @param t The tuple to route @return TRUE if the tuple was routed, FALSE otherwise */ bool routeToQuery(ParsedQuery *q, Tuple *t) { Expr *e = nextExpr(q); if (e != NULL) { //assume expressions are listed in the order e->success = TRUE; //they should be executed! (e.g. selections before aggs) if (e->opType != kSEL) { call AggOperator.processTuple(q,t,e); } else { call SelOperator.processTuple(q,t,e); } return TRUE; //more routing to be done } else { return FALSE; //routing all done } } /** Uses mCurExpr to track the current expression in q that is being applied. <br> mCurExpr should be set to -1 to get the first expression in q. <br> The expression id is not an explicit parameter since expression routing needs to be resumed after the previous split phase expression application. <br> @return the next expression in q, or null if no such expression exists. */ Expr *nextExpr(ParsedQuery *q) { if (++mCurExpr >= q->numExprs) { mCurExpr = -1; return NULL; } else { Expr *e; e = (call ParsedQueryIntf.getExprPtr(q,mCurExpr)); return e; } } /* --------------------------------- Query Utility Routines ---------------------------------*/ /** @return TRUE if the query exists. @param qid The query to fetch @param q Will point to the query pointer upon return if the return value is TRUE. */ bool getQuery(uint8_t qid, ParsedQuery **q) { QueryListHandle curq; curq = mQs; while (curq != NULL) { if ((**curq).q.qid == qid) { *q = &(**curq).q; return TRUE; } else curq = (QueryListHandle)(**curq).next; } return FALSE; } command ParsedQueryPtr QueryProcessor.getQueryCmd(uint8_t qid) { ParsedQueryPtr pq; if (getQuery(qid,&pq)) return pq; else return NULL; } /** Given a processor message return the owner (origninating node) of the query, or -1 if the query is unknown or the message is a query processor message. @param msg The query for which the root is sought */ command short QueryProcessor.msgToQueryRoot(TOS_MsgPtr msg) { uint8_t msgType = msg->type; uint8_t qid; short root; ParsedQueryPtr pq; if (msgType != kDATA_MESSAGE_ID && msgType != kQUERY_MESSAGE_ID && msgType != kQUERY_REQUEST_MESSAGE_ID) return -1; //hack : assume first byte after header is query id! qid = (uint8_t)msg->data[sizeof(DbMsgHdr)]; pq = call QueryProcessor.getQueryCmd(qid); if (pq == NULL) { root = -1; } else { root =pq->queryRoot; } return root; } /** Given a query, parse it into pq @param q The query to convert @param pq The parsed query to fill in. Assumes that pq has been allocated with ParsedQueryIntf.pqSize(q) bytes. @return TRUE if successful */ bool parseQuery(Query *q, ParsedQuery *pq) { AttrDesc *attr; int i; uint8_t bufferId; ParsedQuery **fromPq = NULL; TinyDBError err; pq->qid = q->qid; pq->numFields = q->numFields; pq->numExprs = q->numExprs; pq->epochDuration = q->epochDuration; pq->fromQid = q->fromQid; pq->bufferType = q->bufferType; pq->bufferId = q->bufferId; pq->markedForDeletion = 0; pq->currentEpoch = 0; pq->buf = q->buf; pq->queryRoot = q->queryRoot; if ((uint8_t)pq->fromQid != kNO_QUERY) { err = call DBBuffer.qidToBuffer(pq->fromQid, &bufferId); if (err != err_NoError) return FALSE; fromPq = call DBBuffer.getSchema(bufferId); if (fromPq == NULL) return FALSE; } for (i = 0; i < q->numFields; i++) { Field f = call QueryIntf.getField(q,i); dbg(DBG_USR1,"Setting field %d (%s)\n", i, f.name);//fflush(stdout); if (pq->fromQid == kNO_QUERY) { attr = call AttrUse.getAttr(f.name); if (attr != N
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -