⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tuplerouterm.nc

📁 用于传感器网络的节点操作系统 TinyOS 结构设计非常有意思
💻 NC
📖 第 1 页 / 共 5 页
字号:
/*									tab:4 * * * "Copyright (c) 2000-2002 The Regents of the University  of California.   * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice, the following * two paragraphs and the author appear in all copies of this software. *  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS." * */includes Attr;includes MemAlloc;/*									tab:4 *  IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.  By *  downloading, copying, installing or using the software you agree to *  this license.  If you do not agree to this license, do not download, *  install, copy or use the software. * *  Intel Open Source License  * *  Copyright (c) 2002 Intel Corporation  *  All rights reserved.  *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions are *  met: *  *	Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. *	Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in the *  documentation and/or other materials provided with the distribution. *      Neither the name of the Intel Corporation nor the names of its *  contributors may be used to endorse or promote products derived from *  this software without specific prior written permission. *   *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS *  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *  PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE INTEL OR ITS *  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, *  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, *  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR *  PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *  *  *//**  The TupleRouter is the core of the TinyDB system -- it receives  queries from the network, creates local state for them (converts them  from Queries to ParsedQueries), and then collects results from local  sensors and neighboring nodes and feeds them through local queries.<p>  Queries consist of selections and aggregates.  Results from queries  without aggregates are simply forwarded to the root of the tree to be  handled by the query processor.<p>  Queries with aggregates are processed according to the TAG approach:  each node collects partial aggregates from its children, combines  those aggregates with its own sensor readings, and forwards a partial  aggregate on to its parents.<p>  There are three main execution paths within TUPLE_ROUTER; one for  accepting new queries, one for accepting results from neighboring  nodes, and one for generating local results and deliver data to parent  nodes.<p>  QUERY ARRIVAL<p>  ------------<p><p>  1) New queries arrive in a TUPLE_ROUTER_QUERY_MESSAGE.  Each query  is assumed to be identified by a globally unique ID.  Query messages  contain a part of a query: either a single field (attribute) to  retrieve, a single selection predicate to apply, or a single  aggregation predicate to apply.  All the QUERY_MESSAGEs describing a  single query must arrive before the router will begin routing tuples  for that query.<p>  2) Once all the QUERY_MESSAGESs have arrived, the router calls  parseQuery() to generate a compact representation of the query in  which field names have been replaced with field ids that can be used  as offsets into the sensors local catalog (SCHEMA).  <p>  3) Given a parsedQuery, the tuple router allocates space at the end  of the query to hold a single, "in-flight" tuple for that query --  this tuple will be filled in with the appropriate data fields as the  query executes.  <p>  4) TupleRouter then calls setSampleRate() to start (or restart) the  mote's 32khz clock to fire at the appropriate data-delivery rate for  all of the queries currently in the system.  If there is only one  query, it will fire once per "epoch" -- if there are multiple queries,  it will fire at the GCD of the delivery intervals of all the queries.<p>  TUPLE DELIVERY<p>  --------------<p><p>  1) Whenever a clock event occurs (TUPLE_ROUTER_TIMER_EVENT), the  router must perform four actions:<p>  a) Deliver tuples which were completed on the previous clock event  (deliverTuplesTask).  If the query contains an aggregate, deliver the  aggregate data from the aggregate operator;  if not, deliver the  tuple filled out during the last iteration. Reset the counters that   indicate when these queries should be fired again.  <p>  b) Decrement the counters for all queries.  Any queries who's  counters reach 0 need to have data delivered.  Reset the  expression specific state for these queries (this is specific  to the expressions in the queries -- MAX aggregates, for instances,  will want to reset the current maximum aggregate to some large  negative number.)<p>  c) Fetch data fields for each query firing this epoch.  Loop  through all fields of all queries, fetch them (using the SCHEMA  interface), and fill in the appropriate values in the tuples  on the appropriate queries.    <p>  d) Route filled in tuples to query operators.  First route to  selections, then the aggregate (if it exists).  If any selection  rejects a tuple, stop routing it.  <p>  NEIGHBOR RESULT ARRIVAL<p>  -----------------------<p>  <p>  When a result arrives from a neighbor (TUPLE_ROUTER_RESULT_MESSAGE),  it needs to be integrated into the aggregate values being computed  locally.  If the result corresponds to an aggregate query, that result  is forwarded into the AGG_OPERATOR component, otherwise it is   simply forwarded up the routing tree towards the root.  @author Sam Madden (madden@cs.berkeley.edu)	*/module TupleRouterM {  uses {    interface Network;    interface AttrUse;    interface TupleIntf;    interface QueryIntf;    interface ParsedQueryIntf;    interface Operator as AggOperator;    interface Operator as SelOperator;    interface QueryResultIntf;    interface MemAlloc;    interface Leds;    interface Timer;    interface Random;    interface Interrupt;    interface StdControl as ChildControl;    interface StdControl as TimerControl;    interface StdControl as RadioControl;    interface DBBuffer;    interface CommandUse;    command TinyDBError addResults(QueryResult *qr, ParsedQuery *q, Expr *e);  }  provides {    interface QueryProcessor;    interface StdControl;    interface RadioQueue;        command result_t setFixedComm(bool fixed);    command void signalError(TinyDBError err);    command void statusMessage(CharPtr m);  }}implementation {  /* ----------------------------- Type definitions ------------------------------ */  /** number of clocks events that dictate size of interval -- every 3 seconds */  enum {NUM_TICKS_PER_INTERVAL = 128};  enum {UDF_WAIT_LOOP = 100}; //number of times we pass through main timer loop before giving up on a fetch...  enum {EPOCHS_TIL_DELETION = 5}; //number of epochs we wait before deleting a "ONCE" query  /* AllocState is used to track what we're currently allocing */  typedef enum {    STATE_ALLOC_PARSED_QUERY = 0,    STATE_ALLOC_IN_FLIGHT_QUERY,    STATE_RESIZE_QUERY,    STATE_NOT_ALLOCING,    STATE_ALLOC_QUERY_RESULT  } AllocState;    /** Linked list to track queries currently being processed */  typedef struct {    void **next;    ParsedQuery q;  } *QueryListPtr, **QueryListHandle, QueryListEl;    /** Completion routine for memory allocation complete */  typedef void (*MemoryCallback)(Handle *memory);    //these messages sent from remote motes requesting a query  typedef struct {    DbMsgHdr hdr;    char qid; //note that this byte must be query id  } QueryRequestMessage;    //we maintain a queue of messages waiting to be delivered  enum {MSG_Q_LEN = 8};  typedef struct {    short start;    short end;    short size;    TOS_Msg msgs[MSG_Q_LEN];  } MsgQ;  /** A data structure for tracking the next tuple field to fill      needed since some fields come from base sensors (attrs), and some      come from nested queries  */  typedef struct {      bool isAttr;      bool isNull;      union {	  AttrDescPtr attr;	  uint8_t tupleIdx;      } u;  } TupleFieldDesc;    /* ------------------- Bits used in mPendingMask to determine current state ----------------- */  enum { READING_BIT = 0x0001,  // reading fields for Query from network         PARSING_BIT = 0x0002, //parsing the query         ALLOCED_BIT = 0x0004, //reading fields, space is alloced         FETCHING_BIT = 0x0008, //fetching the value of an attribute via the schema api	 ROUTING_BIT = 0x0010, //routing tuples to queries	 DELIVERING_BIT = 0x0020, //deliver tuples to parents	 SENDING_BIT = 0x0040, //are sending a message buffer	 AGGREGATING_BIT = 0x0080, //are computing an aggregate result	 SENDING_QUERY_BIT = 0x0100, //are we sending a query	 IN_QUERY_MSG_BIT = 0x0200 //are we in the query message handler?  };  uint16_t max(uint16_t a, uint16_t b) {    return a<b?b:a;  }  /* ----------------------------- Module Variables ------------------------------- */  char mDbgMsg[20];  TOS_Msg mMsg;  MsgQ mMsgq;  uint16_t mPendingMask;  uint8_t mCycleToSend; //cycle number on which we send  QueryListHandle mQs;  QueryListHandle mTail;  Query **mCurQuery; //dynamically allocated query handle  Handle mTmpHandle;  MemoryCallback mAllocCallback; //function to call after allocation  uint8_t mFetchingFieldId; //the field we are currently fetching  char mCurExpr;  //the last operator in curRouteQuery we routed to  Tuple *mCurTuple; /* The tuple currently being routed (not the same as the tuple in the		     query, since operators may allocated new tuples!) 		  */   QueryListHandle mCurRouteQuery; //the query we are currently routing tuples for  QueryResult mResult, mEnqResult; //result we are currently delivering or enqueueing  short mOutputCount;  short mFetchTries;  AllocState mAllocState;  short mOldRate; //previous clock rate  QueryListHandle mCurSendingQuery;  char mCurSendingField;  char mCurSendingExpr;  TOS_Msg mQmsg;  bool mTriedAllocWaiting; //tried to create a new query, but allocation flag was true  bool mTriedQueryRequest;  //received a request for query from a neighbor, but was buys  unsigned char mXmitSlots;  unsigned char mNumSenders;  short mTicksThisInterval;  short mMsgsThisInterval;  bool mFixedComm;  bool mSendQueryNextClock;    bool mSending;  char *mResultBuf;  ParsedQuery *mLastQuery; //last query we fetched an attribute for  enum {kDEBUG = 0};  /* ----------------- Functions to modify pending mask --------------------- */   void SET_READING_QUERY() {(mPendingMask |= READING_BIT); }  void UNSET_READING_QUERY() { (mPendingMask &= (READING_BIT ^ 0xFFFF)); }  bool IS_READING_QUERY() { return (mPendingMask & READING_BIT) != 0; }    void SET_PARSING_QUERY() { (mPendingMask |= PARSING_BIT); }  void UNSET_PARSING_QUERY() { (mPendingMask &= (PARSING_BIT ^ 0xFFFF)); }  bool IS_PARSING_QUERY() { return (mPendingMask & PARSING_BIT) != 0; }    bool IS_SPACE_ALLOCED() { return (mPendingMask & ALLOCED_BIT) != 0; }  void UNSET_SPACE_ALLOCED() { (mPendingMask &= (ALLOCED_BIT ^ 0xFFFF)); }  void SET_SPACE_ALLOCED() { (mPendingMask |= ALLOCED_BIT); }    bool IS_FETCHING_ATTRIBUTE() { return (mPendingMask & FETCHING_BIT) != 0; }  void UNSET_FETCHING_ATTRIBUTE() { (mPendingMask &= (FETCHING_BIT ^ 0xFFFF)); }  void SET_FETCHING_ATTRIBUTE() { (mPendingMask |= FETCHING_BIT); }   bool IS_ROUTING_TUPLES() { return (mPendingMask & ROUTING_BIT) != 0; }  void UNSET_ROUTING_TUPLES() { (mPendingMask &= (ROUTING_BIT ^ 0xFFFF)); }  void SET_ROUTING_TUPLES() { (mPendingMask |= ROUTING_BIT); }    bool IS_DELIVERING_TUPLES() { return (mPendingMask & DELIVERING_BIT) != 0; }  void UNSET_DELIVERING_TUPLES() { (mPendingMask &= (DELIVERING_BIT ^ 0xFFFF)); }  void SET_DELIVERING_TUPLES() { (mPendingMask |= DELIVERING_BIT); }    bool IS_SENDING_MESSAGE() { return (mPendingMask & SENDING_BIT) != 0; }  void UNSET_SENDING_MESSAGE() { (mPendingMask &= (SENDING_BIT ^ 0xFFFF)); }  void SET_SENDING_MESSAGE() { (mPendingMask |= SENDING_BIT); }    bool IS_AGGREGATING_RESULT() { return (mPendingMask & AGGREGATING_BIT) != 0; }  void UNSET_AGGREGATING_RESULT() { (mPendingMask &= ( AGGREGATING_BIT ^ 0xFFFF)); }  void SET_AGGREGATING_RESULT() { (mPendingMask |= AGGREGATING_BIT); }    bool IS_SENDING_QUERY() { return (mPendingMask & SENDING_QUERY_BIT) != 0; }  void UNSET_SENDING_QUERY() { (mPendingMask &= ( SENDING_QUERY_BIT ^ 0xFFFF)); }  void SET_SENDING_QUERY() { (mPendingMask |= SENDING_QUERY_BIT); }    bool IS_IN_QUERY_MSG() { return (mPendingMask & IN_QUERY_MSG_BIT) != 0; }  void UNSET_IS_IN_QUERY_MSG() { (mPendingMask &= ( IN_QUERY_MSG_BIT ^ 0xFFFF)); }  void SET_IS_IN_QUERY_MSG() { (mPendingMask |= IN_QUERY_MSG_BIT); }  /* ----------------------------- Prototypes for Internal Routines ------------------------------ */  void continueQuery(Handle *memory);  bool addQueryField(TOS_MsgPtr msg);  bool allocPendingQuery(MemoryCallback callback, Query *q);  bool allocQuery(MemoryCallback callback, Query *q);  void parsedCallback(Handle *memory);  bool parseQuery(Query *q, ParsedQuery *pq);  bool queryComplete(Query q);  bool reallocQueryForTuple(MemoryCallback callback, QueryListHandle qlh);  void resizedCallback(Handle *memory);  void setSampleRate();  short gcd(short a, short b);  bool fetchNextAttr();  TupleFieldDesc getNextQueryField(ParsedQuery **q);  QueryListHandle nextQueryToRoute(QueryListHandle curQuery);  bool routeToQuery(ParsedQuery *q, Tuple *t);  Expr *nextExpr(ParsedQuery *q);  bool getQuery(uint8_t qid, ParsedQuery **q);  void startFetchingTuples();  void resetTupleState(ParsedQuery *q);  void fillInAttrVal(char *resultBuf, SchemaErrorNo errorNo);  void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId);  void computeOutputRate();  TinyDBError dequeueMessage(TOS_Msg *msg);  void sendWaitingMessages();  TinyDBError removeQuery(uint8_t qid, BoolPtr success);  TinyDBError forwardQuery(TOS_MsgPtr msg);  void finishedBufferSetup();  void keepRouting();  //  void statusMessage(char *m);  task void deliverTuplesTask();  task void routeTask();  task void sendQuery();  task void fillInTask();  task void mainTask();/* -----------------------------------------------------------------------------*//* --------------------------------- Functions ---------------------------------*//* -----------------------------------------------------------------------------*/  /** Intialize the tuple router */  command result_t StdControl.init() {      call RadioControl.init();    mPendingMask = 0;    mCycleToSend = 0;    mQs = NULL;    mTail = NULL;    mCurQuery = NULL;        mMsgq.start = 0;    mMsgq.end = 0;    mMsgq.size = 0;    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -