⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tuplerouterm.nc

📁 nesC写的heed算法
💻 NC
📖 第 1 页 / 共 5 页
字号:
// $Id: TupleRouterM.nc,v 1.33.4.26 2003/09/18 06:21:21 whong Exp $/*									tab:4 * "Copyright (c) 2000-2003 The Regents of the University  of California.   * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice, the following * two paragraphs and the author appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS." * * Copyright (c) 2002-2003 Intel Corporation * All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE      * file. If you do not find these files, copies can be found by writing to * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, Berkeley, CA,  * 94704.  Attention:  Intel License Inquiry. */includes Attr;includes MemAlloc;includes TosTime;includes TosServiceSchedule;/**  The TupleRouter is the core of the TinyDB system -- it receives  queries from the network, creates local state for them (converts them  from Queries to ParsedQueries), and then collects results from local  sensors and neighboring nodes and feeds them through local queries.<p>  Queries consist of selections and aggregates.  Results from queries  without aggregates are simply forwarded to the root of the tree to be  handled by the query processor.<p>  Queries with aggregates are processed according to the TAG approach:  each node collects partial aggregates from its children, combines  those aggregates with its own sensor readings, and forwards a partial  aggregate on to its parents.<p>  There are three main execution paths within TUPLE_ROUTER; one for  accepting new queries, one for accepting results from neighboring  nodes, and one for generating local results and deliver data to parent  nodes.<p>  QUERY ARRIVAL<p>  ------------<p><p>  1) New queries arrive in a TUPLE_ROUTER_QUERY_MESSAGE.  Each query  is assumed to be identified by a globally unique ID.  Query messages  contain a part of a query: either a single field (attribute) to  retrieve, a single selection predicate to apply, or a single  aggregation predicate to apply.  All the QUERY_MESSAGEs describing a  single query must arrive before the router will begin routing tuples  for that query.<p>  2) Once all the QUERY_MESSAGESs have arrived, the router calls  parseQuery() to generate a compact representation of the query in  which field names have been replaced with field ids that can be used  as offsets into the sensors local catalog (SCHEMA).  <p>  3) Given a parsedQuery, the tuple router allocates space at the end  of the query to hold a single, "in-flight" tuple for that query --  this tuple will be filled in with the appropriate data fields as the  query executes.  <p>  4) TupleRouter then calls setSampleRate() to start (or restart) the  mote's 32khz clock to fire at the appropriate data-delivery rate for  all of the queries currently in the system.  If there is only one  query, it will fire once per "epoch" -- if there are multiple queries,  it will fire at the GCD of the delivery intervals of all the queries.<p>  TUPLE DELIVERY<p>  --------------<p><p>  1) Whenever a clock event occurs (TUPLE_ROUTER_TIMER_EVENT), the  router must perform four actions:<p>  a) Deliver tuples which were completed on the previous clock event  (deliverTuplesTask).  If the query contains an aggregate, deliver the  aggregate data from the aggregate operator;  if not, deliver the  tuple filled out during the last iteration. Reset the counters that  indicate when these queries should be fired again.  <p>  b) Decrement the counters for all queries.  Any queries who's  counters reach 0 need to have data delivered.  Reset the  expression specific state for these queries (this is specific  to the expressions in the queries -- MAX aggregates, for instances,  will want to reset the current maximum aggregate to some large  negative number.)<p>  c) Fetch data fields for each query firing this epoch.  Loop  through all fields of all queries, fetch them (using the SCHEMA  interface), and fill in the appropriate values in the tuples  on the appropriate queries.  <p>  d) Route filled in tuples to query operators.  First route to  selections, then the aggregate (if it exists).  If any selection  rejects a tuple, stop routing it.  <p>  NEIGHBOR RESULT ARRIVAL<p>  -----------------------<p>  <p>  When a result arrives from a neighbor (TUPLE_ROUTER_RESULT_MESSAGE),  it needs to be integrated into the aggregate values being computed  locally.  If the result corresponds to an aggregate query, that result  is forwarded into the AGG_OPERATOR component, otherwise it is  simply forwarded up the routing tree towards the root.  @author Sam Madden (madden@cs.berkeley.edu)*/includes Attr;includes MemAlloc;includes TinyDB;module TupleRouterM {  uses {    interface Network;    interface AttrUse;    interface TupleIntf;    interface QueryIntf;    interface ParsedQueryIntf;    interface Operator as AggOperator;    interface Operator as SelOperator;    interface QueryResultIntf;    interface MemAlloc;    interface Leds;    interface AbsoluteTimer;    interface TimeUtil;    interface Time;    interface TimeSet;    interface StdControl as ChildControl;    interface StdControl as NetControl;    interface StdControl as TimerControl;    interface Random;#ifdef kUART_DEBUGGER    interface StdControl as UartDebuggerControl;#endif    interface DBBuffer;    interface CommandUse;#ifdef kSUPPORTS_EVENTS    interface EventUse;#endif#ifdef kSUPPORTS_EVENTS    interface CommandRegister as EventFiredCommand;#endif#ifdef kLIFE_CMD    interface CommandRegister as SetLifetimeCommand;#endif    interface NetworkMonitor;    interface Table;#ifdef kUART_DEBUGGER    interface Debugger as UartDebugger;#endif    interface ServiceScheduler;    command TinyDBError addResults(QueryResult *qr, ParsedQuery *q, Expr *e);#if defined(PLATFORM_MICA2) || defined(PLATFORM_MICA2DOT)    interface RadioCoordinator as RadioSendCoordinator;    interface RadioCoordinator as RadioReceiveCoordinator;    command result_t PowerMgmtEnable();    command result_t PowerMgmtDisable();#endif#ifdef HAS_ROUTECONTROL    interface RouteControl;#endif    async command void setSimpleTimeInterval(uint16_t new_interval);    async command uint16_t getSimpleTimeInterval();  }  provides {    interface QueryProcessor;    interface StdControl;    interface RadioQueue;#ifdef HSN_ROUTING	interface HSNValue;#endif    command void signalError(TinyDBError err, int lineNo);    command void statusMessage(CharPtr m);  }}implementation {    #define SIG_ERR(errNo) call signalError((errNo), __LINE__)  /* ----------------------------- Type definitions ------------------------------ */  /** number of clocks events that dictate size of interval -- every 3 seconds */  enum {kMS_PER_CLOCK_EVENT = 64};  enum {UDF_WAIT_LOOP = 100}; //number of times we pass through main timer loop before giving up on a fetch...  enum {EPOCHS_TIL_DELETION = 5}; //number of epochs we wait before deleting a "ONCE" query  /* AllocState is used to track what we're currently allocing */  typedef enum {    STATE_ALLOC_PARSED_QUERY = 0,    STATE_ALLOC_IN_FLIGHT_QUERY,    STATE_RESIZE_QUERY,    STATE_NOT_ALLOCING,    STATE_ALLOC_QUERY_RESULT  } AllocState;    /** Linked list to track queries currently being processed */  typedef struct {    void **next;    ParsedQuery q;  } *QueryListPtr, **QueryListHandle, QueryListEl;    /** Completion routine for memory allocation complete */  typedef void (*MemoryCallback)(Handle *memory);  /** A data structure for tracking the next tuple field to fill      needed since some fields come from base sensors (attrs), and some      come from nested queries  */  typedef struct {      bool isAttr;      bool isNull;      union {	  AttrDescPtr attr;	  uint8_t tupleIdx;      } u;  } TupleFieldDesc;    /* ------------------- Bits used in mPendingMask to determine current state ----------------- */  enum { READING_BIT = 0x0001,  // reading fields for Query from network         PARSING_BIT = 0x0002, //parsing the query         ALLOCED_BIT = 0x0004, //reading fields, space is alloced         FETCHING_BIT = 0x0008, //fetching the value of an attribute via the schema api	 ROUTING_BIT = 0x0010, //routing tuples to queries	 DELIVERING_BIT = 0x0020, //deliver tuples to parents	 SENDING_BIT = 0x0040, //are sending a message buffer	 AGGREGATING_BIT = 0x0080, //are computing an aggregate result	 SENDING_QUERY_BIT = 0x0100, //are we sending a query	 IN_QUERY_MSG_BIT = 0x0200, //are we in the query message handler?	 SETTING_SAMPLE_RATE_BIT = 0x0400, //are we setting the sample rate	 SNOOZING_BIT = 0x0800, //are we snoozing 	 ATTR_STARTING_BIT = 0x1000, // are we starting attributes?	 OPENING_WRITE_BUF_BIT = 0x2000, //are we opening the write buffer?	 REMOVING_BIT = 0x4000 //are we removing a query?  };#define qSNOOZE //using snoozing for long epoch durations#undef qADAPTIVE_RATE //adapt sample rate based on contention  //can't snooze when running in simulator!#if defined(PLATFORM_PC) || defined(PLATFORM_MICA) # undef qSNOOZE#endif #ifdef qSNOOZE  enum {WAKING_CLOCKS = 4096/kMS_PER_CLOCK_EVENT}; //time we're awake between sleeps#endif  /* Minimum number of clock ticks per epoch */  enum {kBASE_EPOCH_RATE = 10};    enum {kMAX_WAIT_CLOCKS = 16}; // maximum number of clocks to wait before data is sent out...  enum {kMIN_SLEEP_CLOCKS_PER_SAMPLE = 1024/kMS_PER_CLOCK_EVENT};  //enum {kSIMPLE_TIME_SLEEP_INTERVAL = 512};  /* ----------------------------- Module Variables ------------------------------- */#ifdef kUART_DEBUGGER  char mDbgMsg[20];#endif  TOS_Msg mMsg;  uint16_t mPendingMask;  uint8_t mCycleToSend; //cycle number on which we send  uint32_t mQMsgMask; // bit mask for query msgs that have been received  QueryListHandle mQs;  QueryListHandle mTail;  Query **mCurQuery; //dynamically allocated query handle  Query *mCurQueryPtr;  Handle mTmpHandle;  MemoryCallback mAllocCallback; //function to call after allocation  uint8_t mFetchingFieldId; //the field we are currently fetching  char mCurExpr;  //the last operator in curRouteQuery we routed to  Tuple *mCurTuple; /* The tuple currently being routed (not the same as the tuple in the		     query, since operators may allocated new tuples!)		  */  QueryListHandle mCurRouteQuery; //the query we are currently routing tuples for  QueryResult mResult, mEnqResult; //result we are currently delivering or enqueueing  short mFetchTries;  AllocState mAllocState;  short mOldRate; //previous clock rate  QueryListHandle mCurSendingQuery;  char mCurSendingField;  char mCurSendingExpr;  uint32_t mCurQMsgMask;  TOS_Msg mQmsg;  bool mTriedAllocWaiting; //tried to create a new query, but allocation flag was true#ifdef kQUERY_SHARING  bool mTriedQueryRequest;  //received a request for query from a neighbor, but was buys#endif#ifdef qSNOOZE  bool mAllQueriesSameRate; // just one query running?#endif  bool mSendQueryNextClock;    char *mResultBuf;  ParsedQuery *mLastQuery; //last query we fetched an attribute for  ParsedQuery *mTempPQ;  uint8_t mCurField;  //constants for split-phase voltage reading  //before setting the sample rate based on a lifetime  //goal  uint16_t mLifetime;  uint8_t mCurSampleRateQuery;  uint16_t mVoltage;  #ifdef kLIFE_CMD  bool mLifetimeCommandPending;#endif  uint16_t mNumBlocked; //number of cycles we haven't been able to send over the radio for  uint16_t mClockCount;    bool mIsRunning ; //have some queries that are running  bool mStopped ; //service scheduler called "stop"  bool mRadioWaiting;  bool mSendingResult;  uint8_t mCurTupleIdx;  uint8_t mQidToRemove;  bool mForceRemove;  uint8_t mNumAttrs; // number of attributes to start    typedef enum {    TS_NO = 0,    TS_QUERY_MESSAGE = 1,    TS_QUERY_RESULT_MESSAGE = 2  } TimeStampState;  TimeStampState mMustTimestamp;  TOS_MsgPtr mTimestampMsg;  uint16_t mCurSchedTime;  norace int16_t mLastDiff;    #define kHEARD_DEC 2  #define kHEARD_THRESH 10  uint16_t mLastHeard;    uint8_t mSendFailed;  //number of consecutive sends that have failed  #define MAX_FAILURES 10 //number of sends that must fail before we reset  uint8_t mStoppedQid;  uint8_t mDeliverWait;  //  uint16_t mOldInterval;  bool mWaitIsDummy;#ifdef HSN_ROUTING  uint16_t mHSNValue;  uint16_t mNumMerges;#endif  /* ----------------- Functions to modify pending mask --------------------- */   void SET_READING_QUERY() {(mPendingMask |= READING_BIT); (mQMsgMask = 0x0); }  void UNSET_READING_QUERY() { (mPendingMask &= (READING_BIT ^ 0xFFFF)); (mQMsgMask = 0x0); }  bool IS_READING_QUERY() { return (mPendingMask & READING_BIT) != 0; }    void SET_PARSING_QUERY() { (mPendingMask |= PARSING_BIT); }  void UNSET_PARSING_QUERY() { (mPendingMask &= (PARSING_BIT ^ 0xFFFF)); }  bool IS_PARSING_QUERY() { return (mPendingMask & PARSING_BIT) != 0; }    bool IS_SPACE_ALLOCED() { return (mPendingMask & ALLOCED_BIT) != 0; }  void UNSET_SPACE_ALLOCED() { (mPendingMask &= (ALLOCED_BIT ^ 0xFFFF)); }  void SET_SPACE_ALLOCED() { (mPendingMask |= ALLOCED_BIT); }    bool IS_FETCHING_ATTRIBUTE() { return (mPendingMask & FETCHING_BIT) != 0; }  void UNSET_FETCHING_ATTRIBUTE() { (mPendingMask &= (FETCHING_BIT ^ 0xFFFF)); }  void SET_FETCHING_ATTRIBUTE() { (mPendingMask |= FETCHING_BIT); }   bool IS_STARTING_ATTRIBUTE() { return (mPendingMask & ATTR_STARTING_BIT) != 0; }  void UNSET_STARTING_ATTRIBUTE() { (mPendingMask &= (ATTR_STARTING_BIT ^ 0xFFFF)); }  void SET_STARTING_ATTRIBUTE() { (mPendingMask |= ATTR_STARTING_BIT); }   bool IS_ROUTING_TUPLES() { return (mPendingMask & ROUTING_BIT) != 0; }  void UNSET_ROUTING_TUPLES() { (mPendingMask &= (ROUTING_BIT ^ 0xFFFF)); }  void SET_ROUTING_TUPLES() { (mPendingMask |= ROUTING_BIT); }    bool IS_DELIVERING_TUPLES() { return (mPendingMask & DELIVERING_BIT) != 0; }  void UNSET_DELIVERING_TUPLES() { (mPendingMask &= (DELIVERING_BIT ^ 0xFFFF)); }  void SET_DELIVERING_TUPLES() { (mPendingMask |= DELIVERING_BIT); }    bool IS_SENDING_MESSAGE() { return (mPendingMask & SENDING_BIT) != 0; }  void UNSET_SENDING_MESSAGE() { (mPendingMask &= (SENDING_BIT ^ 0xFFFF)); }  void SET_SENDING_MESSAGE() { (mPendingMask |= SENDING_BIT); }    bool IS_AGGREGATING_RESULT() { return (mPendingMask & AGGREGATING_BIT) != 0; }  void UNSET_AGGREGATING_RESULT() { (mPendingMask &= ( AGGREGATING_BIT ^ 0xFFFF)); }  void SET_AGGREGATING_RESULT() { (mPendingMask |= AGGREGATING_BIT); }  

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -