⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribe.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met:////   * Redistributions of source code must retain the above copyright//     notice, this list of conditions and the following disclaimer.//   * Redistributions in binary form must reproduce the above copyright//     notice, this list of conditions and the following disclaimer in//     the documentation and/or other materials provided with the//     distribution.//   * Neither the names of Duke University nor The University of//     California, San Diego, nor the names of its contributors//     may be used to endorse or promote products derived from//     this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./** *  Implementation of the SCRIBE protocol * *  Chip Killian, Adolfo Rodriguez */#include "adolfo_filter.h" //includes the declaration of the adolfo filter.#include "scribe-ext.h" //scribe-additional declarations of types and integers.//These macros are used to turn additional debugging/trace messages on and off.//#define SCRIBE_TRACE//#define ANYCAST_TRACE//#define MEMBERSHIP_TRACE#define UNDEFINED 0 //This constant a temporary use to mark an empty macedon_key.  Requires nodes not to have this value as their id.protocol scribe uses pastry//protocol scribe uses chordaddressing hashtrace_offconstants {  int BITS = 32; //Number of bits in a node identifier.  int MAX_SESSIONS = 20; //Number of scribe trees to maintain state for.  int MAX_CHILDREN = 25; //Number of children per tree to maintain state for.  int MAX_ROOTPATH = 50; //Maximum depth of a path-to-root for a scribe tree.  int MAX_VNODES = 101;   //Maximum number of visited nodes to maintain during an anycast traversal  int DEBUG_INTERVAL = 4; //The interval of the printer timer.  int MAX_PROBING = MAX_CHILDREN*MAX_SESSIONS+MAX_SESSIONS; //The max number of nodes to track for link failure.  int JOIN_TIMEOUT_INT = 2;  int JOIN_TIMER_PERIOD = 2;}neighbor_types {  //Note that definitions here are of _types_, not actual variables.  This is similar to   //  defining classes.  children MAX_CHILDREN;  //A set of children not to exceed size MAX_CHILDREN.  Defined for use in "session".  uppath MAX_ROOTPATH;    //A set of the nodes to the root of the tree.  Defined for use in "session".  session MAX_SESSIONS {  //The state type for sessions (scribe trees).  Each tree is a neighbor wiht these contents.    children kids;        //The children of this node in the tree.    macedon_key parent;   //The parent of this node in the tree. BUG: Until a general macedon_key class is defined, this approach uses 0 as NULL or undefined.  Thus a node with id 0 will be confused.    uppath pathtoroot;    //The path to the root of this tree.    int subscribed;       //If subscribed==1, then this node should deliver data to the upper layer/app.    bool isRoot;          //Stores whether this node is the root of the tree (to prevent downcalls all the time.  }  visited MAX_VNODES {     //Stores a list of nodes visited during an anycast traversal.    int color;              //0-white, 1-grey, 2-black  }  monitored MAX_PROBING { //The set of nodes to monitor for link-lavel failures (instead of heartbeats)    int count;            //The number of things we are watching this node for.  }  joining MAX_SESSIONS;   //The state type for sessions which are joining.  This to rejoin things which fail.}messages {  PRIORITY_HIGH create_group {    int group_id;  }  PRIORITY_HIGHEST join {    int group_id;  }  PRIORITY_HIGHEST leave {    int group_id;  }  PRIORITY_LOW data {    int group_id;    int comm_type;    int priority;  }  PRIORITY_HIGH pushdown {  //Sent to a child when it is being "kicked out" of this node's children list.                            //  Contains a list of the pushed down node's siblings.  It will try to join                              //  under one of these.    int group_id;    children siblings;  }  PRIORITY_HIGH membership { //Heartbeats will be sent to maintain each node's path to root to prevent loops.    int group_id;    uppath pathtoroot;  }  PRIORITY_HIGH anycast {   //Message type to be sent for anycast requests.  This is performed via a DFS                            //  of the tree.  Each node in the tree will pass the message up to the                             //  higher layer/app to determine if the anycast should be continued or                             //  stopped using upcall_forward.      int group_id;    int from_addr;    visited dfsState;  }  PRIORITY_MED unicast_data {  }}state_variables {	          //These are not type definitions but actual variable declarations.  int myhash;               //used to store the local node's node identifier  session my_sessions;      //The set of scribe trees we are members of.  fail_detect monitored watched; //The set of nodes which are to be monitored for failure.  adolfo_filter master;     //This filter collects stats on all data received. (for bookkeeping & evaluation)  adolfo_filter master_useful;  //This filter collects stats on useful data received. (""  ""    ""  ""   "")  timer printer DEBUG_INTERVAL; //This is the printer timer, used to debug and dump state periodically.  timer joinTimer;         //This timer fires periodically to rejoin failed joins.  joining outstanding_joins;//This keeps track of the outstanding joins for the system}	  transitions {  //The automatic transition which occurs at the beginning.  init API init {    myhash = hashof(me);    timer_resched(printer, DEBUG_INTERVAL);  }  //Message: create_group  //Action: recv  //  //If state already exists for this group, simply set isRoot to true, upstream   //  maintenance state to 0, and clear out data about parents.  //Otherwise, first add state for the group.  //  //Known Problems: Does not verify there is room for the new session.  recv create_group {    int mygroup = field(group_id);    neighbor_session* temp;    if(!neighbor_query(my_sessions, mygroup)) {      neighbor_add( my_sessions, mygroup );      upcall_notify(my_sessions, NBR_TYPE_TREE);      temp = neighbor_entry(my_sessions, mygroup);      temp->subscribed=0;    } else {      temp = neighbor_entry(my_sessions, mygroup);      neighbor_clear(temp->pathtoroot);      unwatch(temp->parent);      temp->parent = UNDEFINED;    }    temp->isRoot=true;  }  //Message: join  //Action: forward  //  //Comments inline.  //CASE 1: Forward join because this node sent it  //CASE 2: Reject joining node based on loop test.  //CASE 3: Accept new child  //CASE 4: Reject child because there are too many children.  //  //Known problems: See FIXME  forward join {    int mygroup;    neighbor_session *temp;    //CASE 1: Forward join because this node sent it    //upcall_forward will be called on all nodes, including origin node.  Thus, if this node    //is the origin node, automatically forward it.    if (from == myhash)       return 0;     mygroup = field(group_id);    if (neighbor_query( my_sessions, mygroup )) {      //retrieve state for use in this transition      temp = neighbor_entry( my_sessions, mygroup );      if(source_ == me && !temp->isRoot) { return 0; }      if(neighbor_query(temp->pathtoroot, from)) {        //CASE 2: Reject joining node based on loop test.        //If the joining node is in our path to the root, forward it on, because we cannot allow loops.        debug_macro("WARNING: Forwarding a join (fr:%.8x to:%.8x) because it is in our path to root.\n", from, dest);        return 0; //Continue forwarding the join, because we cannot take the node.      }    }    else {      //add state for the new session, joining the group      neighbor_add( my_sessions, mygroup );      upcall_notify(my_sessions, NBR_TYPE_TREE);  //notify the upper layer of the state change.      //FIXME: If upcall_notify causes the group to be removed -- what happens here?      temp = neighbor_entry( my_sessions, mygroup );      temp->subscribed = 0;      downcall_ext(temp->isRoot, TEST_OWNER, (void*)&mygroup);      if(temp->isRoot <= 0) {        route_join(mygroup,mygroup,0,0,-1);        dojoin(mygroup);      }    }    if (!neighbor_query( temp->kids, from )) {      if(neighbor_space(temp->kids)) {        //CASE 3: Accept new child        neighbor_add( temp->kids, from );        watch(from);        routeIP_membership(from,mygroup,temp->pathtoroot, 0, 0, -1);        debug_macro("Scribe: added kid %.8x for group %.8x, now have %d\n", from, mygroup, neighbor_size(temp->kids));        if(true) { //FIXME: To be replced by if(upper layer registered for this type of message)          //group_change defined in scribe_ext.h          group_change *gchg = new group_change;          gchg->groupId = mygroup;          gchg->childId = from;          gchg->newJoin = true;          //Notify upper layer that the child has been added to the given group.          upcall_ext(GROUP_CHANGE, (void*)gchg);          delete gchg;        }	      } else {        //CASE 4: Reject child because there are too many children.        route_pushdown(from,temp->ipaddr,temp->kids, 0, 0, -1);      }    }    return 1;  }	   //Message: join  //Action: recv  //  //If a join message is received at a node, this means it has already been forwarded by all   //  intermediate nodes, including this one.  If this happens frequently, it may be an   //  indicator of a more serious problem.  Likely indicates stale state.  recv join {    debug_macro("Scribe: WARNING: join failed! Join msg not accepted by any node. (fr: %.8x gr: %.8x)\n", from, field(group_id));  }  //Message: leave  //Action: forward  //  //Process:  //1 - Remove child if found (otherwise stale state somewhere)  //2 - Conditionally notify higher layer  //3 - If we have no remaining children and are not subscribed, leave the group. (notify upper layer)  forward leave {    // returns 0 if it should be forwarded    int mygroup;    neighbor_session *temp;    if (from == myhash) return 0; //forward message if we sent it.    mygroup = field(group_id);    if (neighbor_query(my_sessions, mygroup)) {      temp = neighbor_entry(my_sessions, mygroup);      //remove "from" from the list of neighbor kids      if (neighbor_query(temp->kids, from)) {        if(true) { //will be replaced by if(upper layer is subscribed to this event type)          group_change *gchg = new group_change;          gchg->groupId = mygroup;          gchg->childId = from;          gchg->newJoin = false;          upcall_ext(GROUP_CHANGE, (void*)gchg);          delete gchg;        }	        neighbor_remove(temp->kids, from);        unwatch(from);      }	      if (neighbor_size(temp->kids) <= 0 && temp->subscribed == 0)      { // now we check both for children and our own subscription.        if( temp->parent != UNDEFINED ) {          routeIP_leave (temp->parent, mygroup, 0, 0, -1);          unwatch(temp->parent);          temp->parent = UNDEFINED;        }        neighbor_remove(my_sessions, mygroup);        upcall_notify(my_sessions, NBR_TYPE_TREE);      }    } //else no processing done here (probably stale data)    return 1;  }  //Message: leave  //Action: recv   //  //ASSERTion error.  Should not happen.  recv leave {    ASSERT(false);  }  //Message: data  //Action: recv  //  //Process:  //1 - Update bookkeeping filter  //2 - If we do not have state for this tree, assume stale data, and route a leave to the source.  //3 - Ask upper layer if we should forward this message (else done)  //4 - Reset counters  //5 - Deliver message to upper layer  //6 - forward message to children.  recv data {    neighbor_session *tempsess;    if(field(comm_type) == COMM_TYPE_MULTICAST) {      master.update();      if (dest != field(group_id) && dest != myhash) {        debug_macro("Scribe: WARNING: Dropped packet cause dest %.8x, group %.8x, me %.8x\n", dest, field(group_id), myhash);      } else {        if (neighbor_query(my_sessions, field(group_id))) {          if(upcall_forward(field(group_id), msg, size, COMM_TYPE_MULTICAST) == 0) {            tempsess = neighbor_entry(my_sessions, field(group_id));            if(tempsess->subscribed) {              master_useful.update();              upcall_deliver(msg, size, COMM_TYPE_MULTICAST);            }            //Update the parent neighbor type, if needed (need lock inside).            if(field(group_id) != dest && (tempsess->parent==UNDEFINED || tempsess->parent != from) ) {              //tempsess->parent = from;              //NOTE: This was to handle the case where a join timed out -- and we were taken by two parents.                debug_macro("Scribe: WARNING: Received data from wrong parent. (group: %.8x, expected: %.8x, observed: %.8x)\n", field(group_id), tempsess->parent, from);            }            foreach_neighbor (neighbor_children*, afriend, tempsess->kids) {              routeIP_data( afriend->ipaddr, field(group_id), COMM_TYPE_MULTICAST, field(priority), msg, size, field(priority) );            }          }        } else { //note: else message is dropped, and a leave is returned to the sender!          //WARNING: What if a failure caused this, and I just haven't been notified?           if(field(group_id) != dest) routeIP_leave(from, field(group_id), 0, 0, -1);        }      }    } else if(field(comm_type) == COMM_TYPE_COLLECT) {      tempsess = neighbor_entry(my_sessions, field(group_id));      if(tempsess != NULL && upcall_forward(tempsess->parent, msg, size, COMM_TYPE_COLLECT) == 0) {        if(tempsess->isRoot) { upcall_deliver(msg, size, COMM_TYPE_COLLECT); }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -