📄 tuplerouterm.nc
字号:
// $Id: TupleRouterM.nc,v 1.33.4.26 2003/09/18 06:21:21 whong Exp $/* tab:4 * "Copyright (c) 2000-2003 The Regents of the University of California. * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice, the following * two paragraphs and the author appear in all copies of this software. * * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS." * * Copyright (c) 2002-2003 Intel Corporation * All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE * file. If you do not find these files, copies can be found by writing to * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, Berkeley, CA, * 94704. Attention: Intel License Inquiry. */includes Attr;includes MemAlloc;includes TosTime;includes TosServiceSchedule;/** The TupleRouter is the core of the TinyDB system -- it receives queries from the network, creates local state for them (converts them from Queries to ParsedQueries), and then collects results from local sensors and neighboring nodes and feeds them through local queries.<p> Queries consist of selections and aggregates. Results from queries without aggregates are simply forwarded to the root of the tree to be handled by the query processor.<p> Queries with aggregates are processed according to the TAG approach: each node collects partial aggregates from its children, combines those aggregates with its own sensor readings, and forwards a partial aggregate on to its parents.<p> There are three main execution paths within TUPLE_ROUTER; one for accepting new queries, one for accepting results from neighboring nodes, and one for generating local results and deliver data to parent nodes.<p> QUERY ARRIVAL<p> ------------<p><p> 1) New queries arrive in a TUPLE_ROUTER_QUERY_MESSAGE. Each query is assumed to be identified by a globally unique ID. Query messages contain a part of a query: either a single field (attribute) to retrieve, a single selection predicate to apply, or a single aggregation predicate to apply. All the QUERY_MESSAGEs describing a single query must arrive before the router will begin routing tuples for that query.<p> 2) Once all the QUERY_MESSAGESs have arrived, the router calls parseQuery() to generate a compact representation of the query in which field names have been replaced with field ids that can be used as offsets into the sensors local catalog (SCHEMA). <p> 3) Given a parsedQuery, the tuple router allocates space at the end of the query to hold a single, "in-flight" tuple for that query -- this tuple will be filled in with the appropriate data fields as the query executes. <p> 4) TupleRouter then calls setSampleRate() to start (or restart) the mote's 32khz clock to fire at the appropriate data-delivery rate for all of the queries currently in the system. If there is only one query, it will fire once per "epoch" -- if there are multiple queries, it will fire at the GCD of the delivery intervals of all the queries.<p> TUPLE DELIVERY<p> --------------<p><p> 1) Whenever a clock event occurs (TUPLE_ROUTER_TIMER_EVENT), the router must perform four actions:<p> a) Deliver tuples which were completed on the previous clock event (deliverTuplesTask). If the query contains an aggregate, deliver the aggregate data from the aggregate operator; if not, deliver the tuple filled out during the last iteration. Reset the counters that indicate when these queries should be fired again. <p> b) Decrement the counters for all queries. Any queries who's counters reach 0 need to have data delivered. Reset the expression specific state for these queries (this is specific to the expressions in the queries -- MAX aggregates, for instances, will want to reset the current maximum aggregate to some large negative number.)<p> c) Fetch data fields for each query firing this epoch. Loop through all fields of all queries, fetch them (using the SCHEMA interface), and fill in the appropriate values in the tuples on the appropriate queries. <p> d) Route filled in tuples to query operators. First route to selections, then the aggregate (if it exists). If any selection rejects a tuple, stop routing it. <p> NEIGHBOR RESULT ARRIVAL<p> -----------------------<p> <p> When a result arrives from a neighbor (TUPLE_ROUTER_RESULT_MESSAGE), it needs to be integrated into the aggregate values being computed locally. If the result corresponds to an aggregate query, that result is forwarded into the AGG_OPERATOR component, otherwise it is simply forwarded up the routing tree towards the root. @author Sam Madden (madden@cs.berkeley.edu)*/includes Attr;includes MemAlloc;includes TinyDB;module TupleRouterM { uses { interface Network; interface AttrUse; interface TupleIntf; interface QueryIntf; interface ParsedQueryIntf; interface Operator as AggOperator; interface Operator as SelOperator; interface QueryResultIntf; interface MemAlloc; interface Leds; interface AbsoluteTimer; interface TimeUtil; interface Time; interface TimeSet; interface StdControl as ChildControl; interface StdControl as NetControl; interface StdControl as TimerControl; interface Random;#ifdef kUART_DEBUGGER interface StdControl as UartDebuggerControl;#endif interface DBBuffer; interface CommandUse;#ifdef kSUPPORTS_EVENTS interface EventUse;#endif#ifdef kSUPPORTS_EVENTS interface CommandRegister as EventFiredCommand;#endif#ifdef kLIFE_CMD interface CommandRegister as SetLifetimeCommand;#endif interface NetworkMonitor; interface Table;#ifdef kUART_DEBUGGER interface Debugger as UartDebugger;#endif interface ServiceScheduler; command TinyDBError addResults(QueryResult *qr, ParsedQuery *q, Expr *e);#if defined(PLATFORM_MICA2) || defined(PLATFORM_MICA2DOT) interface RadioCoordinator as RadioSendCoordinator; interface RadioCoordinator as RadioReceiveCoordinator; command result_t PowerMgmtEnable(); command result_t PowerMgmtDisable();#endif#ifdef HAS_ROUTECONTROL interface RouteControl;#endif async command void setSimpleTimeInterval(uint16_t new_interval); async command uint16_t getSimpleTimeInterval(); } provides { interface QueryProcessor; interface StdControl; interface RadioQueue;#ifdef HSN_ROUTING interface HSNValue;#endif command void signalError(TinyDBError err, int lineNo); command void statusMessage(CharPtr m); }}implementation { #define SIG_ERR(errNo) call signalError((errNo), __LINE__) /* ----------------------------- Type definitions ------------------------------ */ /** number of clocks events that dictate size of interval -- every 3 seconds */ enum {kMS_PER_CLOCK_EVENT = 64}; enum {UDF_WAIT_LOOP = 100}; //number of times we pass through main timer loop before giving up on a fetch... enum {EPOCHS_TIL_DELETION = 5}; //number of epochs we wait before deleting a "ONCE" query /* AllocState is used to track what we're currently allocing */ typedef enum { STATE_ALLOC_PARSED_QUERY = 0, STATE_ALLOC_IN_FLIGHT_QUERY, STATE_RESIZE_QUERY, STATE_NOT_ALLOCING, STATE_ALLOC_QUERY_RESULT } AllocState; /** Linked list to track queries currently being processed */ typedef struct { void **next; ParsedQuery q; } *QueryListPtr, **QueryListHandle, QueryListEl; /** Completion routine for memory allocation complete */ typedef void (*MemoryCallback)(Handle *memory); /** A data structure for tracking the next tuple field to fill needed since some fields come from base sensors (attrs), and some come from nested queries */ typedef struct { bool isAttr; bool isNull; union { AttrDescPtr attr; uint8_t tupleIdx; } u; } TupleFieldDesc; /* ------------------- Bits used in mPendingMask to determine current state ----------------- */ enum { READING_BIT = 0x0001, // reading fields for Query from network PARSING_BIT = 0x0002, //parsing the query ALLOCED_BIT = 0x0004, //reading fields, space is alloced FETCHING_BIT = 0x0008, //fetching the value of an attribute via the schema api ROUTING_BIT = 0x0010, //routing tuples to queries DELIVERING_BIT = 0x0020, //deliver tuples to parents SENDING_BIT = 0x0040, //are sending a message buffer AGGREGATING_BIT = 0x0080, //are computing an aggregate result SENDING_QUERY_BIT = 0x0100, //are we sending a query IN_QUERY_MSG_BIT = 0x0200, //are we in the query message handler? SETTING_SAMPLE_RATE_BIT = 0x0400, //are we setting the sample rate SNOOZING_BIT = 0x0800, //are we snoozing ATTR_STARTING_BIT = 0x1000, // are we starting attributes? OPENING_WRITE_BUF_BIT = 0x2000, //are we opening the write buffer? REMOVING_BIT = 0x4000 //are we removing a query? };#define qSNOOZE //using snoozing for long epoch durations#undef qADAPTIVE_RATE //adapt sample rate based on contention //can't snooze when running in simulator!#if defined(PLATFORM_PC) || defined(PLATFORM_MICA) # undef qSNOOZE#endif #ifdef qSNOOZE enum {WAKING_CLOCKS = 4096/kMS_PER_CLOCK_EVENT}; //time we're awake between sleeps#endif /* Minimum number of clock ticks per epoch */ enum {kBASE_EPOCH_RATE = 10}; enum {kMAX_WAIT_CLOCKS = 16}; // maximum number of clocks to wait before data is sent out... enum {kMIN_SLEEP_CLOCKS_PER_SAMPLE = 1024/kMS_PER_CLOCK_EVENT}; //enum {kSIMPLE_TIME_SLEEP_INTERVAL = 512}; /* ----------------------------- Module Variables ------------------------------- */#ifdef kUART_DEBUGGER char mDbgMsg[20];#endif TOS_Msg mMsg; uint16_t mPendingMask; uint8_t mCycleToSend; //cycle number on which we send uint32_t mQMsgMask; // bit mask for query msgs that have been received QueryListHandle mQs; QueryListHandle mTail; Query **mCurQuery; //dynamically allocated query handle Query *mCurQueryPtr; Handle mTmpHandle; MemoryCallback mAllocCallback; //function to call after allocation uint8_t mFetchingFieldId; //the field we are currently fetching char mCurExpr; //the last operator in curRouteQuery we routed to Tuple *mCurTuple; /* The tuple currently being routed (not the same as the tuple in the query, since operators may allocated new tuples!) */ QueryListHandle mCurRouteQuery; //the query we are currently routing tuples for QueryResult mResult, mEnqResult; //result we are currently delivering or enqueueing short mFetchTries; AllocState mAllocState; short mOldRate; //previous clock rate QueryListHandle mCurSendingQuery; char mCurSendingField; char mCurSendingExpr; uint32_t mCurQMsgMask; TOS_Msg mQmsg; bool mTriedAllocWaiting; //tried to create a new query, but allocation flag was true#ifdef kQUERY_SHARING bool mTriedQueryRequest; //received a request for query from a neighbor, but was buys#endif#ifdef qSNOOZE bool mAllQueriesSameRate; // just one query running?#endif bool mSendQueryNextClock; char *mResultBuf; ParsedQuery *mLastQuery; //last query we fetched an attribute for ParsedQuery *mTempPQ; uint8_t mCurField; //constants for split-phase voltage reading //before setting the sample rate based on a lifetime //goal uint16_t mLifetime; uint8_t mCurSampleRateQuery; uint16_t mVoltage; #ifdef kLIFE_CMD bool mLifetimeCommandPending;#endif uint16_t mNumBlocked; //number of cycles we haven't been able to send over the radio for uint16_t mClockCount; bool mIsRunning ; //have some queries that are running bool mStopped ; //service scheduler called "stop" bool mRadioWaiting; bool mSendingResult; uint8_t mCurTupleIdx; uint8_t mQidToRemove; bool mForceRemove; uint8_t mNumAttrs; // number of attributes to start typedef enum { TS_NO = 0, TS_QUERY_MESSAGE = 1, TS_QUERY_RESULT_MESSAGE = 2 } TimeStampState; TimeStampState mMustTimestamp; TOS_MsgPtr mTimestampMsg; uint16_t mCurSchedTime; norace int16_t mLastDiff; #define kHEARD_DEC 2 #define kHEARD_THRESH 10 uint16_t mLastHeard; uint8_t mSendFailed; //number of consecutive sends that have failed #define MAX_FAILURES 10 //number of sends that must fail before we reset uint8_t mStoppedQid; uint8_t mDeliverWait; // uint16_t mOldInterval; bool mWaitIsDummy;#ifdef HSN_ROUTING uint16_t mHSNValue; uint16_t mNumMerges;#endif /* ----------------- Functions to modify pending mask --------------------- */ void SET_READING_QUERY() {(mPendingMask |= READING_BIT); (mQMsgMask = 0x0); } void UNSET_READING_QUERY() { (mPendingMask &= (READING_BIT ^ 0xFFFF)); (mQMsgMask = 0x0); } bool IS_READING_QUERY() { return (mPendingMask & READING_BIT) != 0; } void SET_PARSING_QUERY() { (mPendingMask |= PARSING_BIT); } void UNSET_PARSING_QUERY() { (mPendingMask &= (PARSING_BIT ^ 0xFFFF)); } bool IS_PARSING_QUERY() { return (mPendingMask & PARSING_BIT) != 0; } bool IS_SPACE_ALLOCED() { return (mPendingMask & ALLOCED_BIT) != 0; } void UNSET_SPACE_ALLOCED() { (mPendingMask &= (ALLOCED_BIT ^ 0xFFFF)); } void SET_SPACE_ALLOCED() { (mPendingMask |= ALLOCED_BIT); } bool IS_FETCHING_ATTRIBUTE() { return (mPendingMask & FETCHING_BIT) != 0; } void UNSET_FETCHING_ATTRIBUTE() { (mPendingMask &= (FETCHING_BIT ^ 0xFFFF)); } void SET_FETCHING_ATTRIBUTE() { (mPendingMask |= FETCHING_BIT); } bool IS_STARTING_ATTRIBUTE() { return (mPendingMask & ATTR_STARTING_BIT) != 0; } void UNSET_STARTING_ATTRIBUTE() { (mPendingMask &= (ATTR_STARTING_BIT ^ 0xFFFF)); } void SET_STARTING_ATTRIBUTE() { (mPendingMask |= ATTR_STARTING_BIT); } bool IS_ROUTING_TUPLES() { return (mPendingMask & ROUTING_BIT) != 0; } void UNSET_ROUTING_TUPLES() { (mPendingMask &= (ROUTING_BIT ^ 0xFFFF)); } void SET_ROUTING_TUPLES() { (mPendingMask |= ROUTING_BIT); } bool IS_DELIVERING_TUPLES() { return (mPendingMask & DELIVERING_BIT) != 0; } void UNSET_DELIVERING_TUPLES() { (mPendingMask &= (DELIVERING_BIT ^ 0xFFFF)); } void SET_DELIVERING_TUPLES() { (mPendingMask |= DELIVERING_BIT); } bool IS_SENDING_MESSAGE() { return (mPendingMask & SENDING_BIT) != 0; } void UNSET_SENDING_MESSAGE() { (mPendingMask &= (SENDING_BIT ^ 0xFFFF)); } void SET_SENDING_MESSAGE() { (mPendingMask |= SENDING_BIT); } bool IS_AGGREGATING_RESULT() { return (mPendingMask & AGGREGATING_BIT) != 0; } void UNSET_AGGREGATING_RESULT() { (mPendingMask &= ( AGGREGATING_BIT ^ 0xFFFF)); } void SET_AGGREGATING_RESULT() { (mPendingMask |= AGGREGATING_BIT); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -