📄 scribe.mac
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met://// * Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.// * Redistributions in binary form must reproduce the above copyright// notice, this list of conditions and the following disclaimer in// the documentation and/or other materials provided with the// distribution.// * Neither the names of Duke University nor The University of// California, San Diego, nor the names of its contributors// may be used to endorse or promote products derived from// this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./** * Implementation of the SCRIBE protocol * * Chip Killian, Adolfo Rodriguez */#include "adolfo_filter.h" //includes the declaration of the adolfo filter.#include "scribe-ext.h" //scribe-additional declarations of types and integers.//These macros are used to turn additional debugging/trace messages on and off.//#define SCRIBE_TRACE//#define ANYCAST_TRACE//#define MEMBERSHIP_TRACE#define UNDEFINED 0 //This constant a temporary use to mark an empty macedon_key. Requires nodes not to have this value as their id.protocol scribe uses pastry//protocol scribe uses chordaddressing hashtrace_offconstants { int BITS = 32; //Number of bits in a node identifier. int MAX_SESSIONS = 20; //Number of scribe trees to maintain state for. int MAX_CHILDREN = 25; //Number of children per tree to maintain state for. int MAX_ROOTPATH = 50; //Maximum depth of a path-to-root for a scribe tree. int MAX_VNODES = 101; //Maximum number of visited nodes to maintain during an anycast traversal int DEBUG_INTERVAL = 4; //The interval of the printer timer. int MAX_PROBING = MAX_CHILDREN*MAX_SESSIONS+MAX_SESSIONS; //The max number of nodes to track for link failure. int JOIN_TIMEOUT_INT = 2; int JOIN_TIMER_PERIOD = 2;}neighbor_types { //Note that definitions here are of _types_, not actual variables. This is similar to // defining classes. children MAX_CHILDREN; //A set of children not to exceed size MAX_CHILDREN. Defined for use in "session". uppath MAX_ROOTPATH; //A set of the nodes to the root of the tree. Defined for use in "session". session MAX_SESSIONS { //The state type for sessions (scribe trees). Each tree is a neighbor wiht these contents. children kids; //The children of this node in the tree. macedon_key parent; //The parent of this node in the tree. BUG: Until a general macedon_key class is defined, this approach uses 0 as NULL or undefined. Thus a node with id 0 will be confused. uppath pathtoroot; //The path to the root of this tree. int subscribed; //If subscribed==1, then this node should deliver data to the upper layer/app. bool isRoot; //Stores whether this node is the root of the tree (to prevent downcalls all the time. } visited MAX_VNODES { //Stores a list of nodes visited during an anycast traversal. int color; //0-white, 1-grey, 2-black } monitored MAX_PROBING { //The set of nodes to monitor for link-lavel failures (instead of heartbeats) int count; //The number of things we are watching this node for. } joining MAX_SESSIONS; //The state type for sessions which are joining. This to rejoin things which fail.}messages { PRIORITY_HIGH create_group { int group_id; } PRIORITY_HIGHEST join { int group_id; } PRIORITY_HIGHEST leave { int group_id; } PRIORITY_LOW data { int group_id; int comm_type; int priority; } PRIORITY_HIGH pushdown { //Sent to a child when it is being "kicked out" of this node's children list. // Contains a list of the pushed down node's siblings. It will try to join // under one of these. int group_id; children siblings; } PRIORITY_HIGH membership { //Heartbeats will be sent to maintain each node's path to root to prevent loops. int group_id; uppath pathtoroot; } PRIORITY_HIGH anycast { //Message type to be sent for anycast requests. This is performed via a DFS // of the tree. Each node in the tree will pass the message up to the // higher layer/app to determine if the anycast should be continued or // stopped using upcall_forward. int group_id; int from_addr; visited dfsState; } PRIORITY_MED unicast_data { }}state_variables { //These are not type definitions but actual variable declarations. int myhash; //used to store the local node's node identifier session my_sessions; //The set of scribe trees we are members of. fail_detect monitored watched; //The set of nodes which are to be monitored for failure. adolfo_filter master; //This filter collects stats on all data received. (for bookkeeping & evaluation) adolfo_filter master_useful; //This filter collects stats on useful data received. ("" "" "" "" "") timer printer DEBUG_INTERVAL; //This is the printer timer, used to debug and dump state periodically. timer joinTimer; //This timer fires periodically to rejoin failed joins. joining outstanding_joins;//This keeps track of the outstanding joins for the system} transitions { //The automatic transition which occurs at the beginning. init API init { myhash = hashof(me); timer_resched(printer, DEBUG_INTERVAL); } //Message: create_group //Action: recv // //If state already exists for this group, simply set isRoot to true, upstream // maintenance state to 0, and clear out data about parents. //Otherwise, first add state for the group. // //Known Problems: Does not verify there is room for the new session. recv create_group { int mygroup = field(group_id); neighbor_session* temp; if(!neighbor_query(my_sessions, mygroup)) { neighbor_add( my_sessions, mygroup ); upcall_notify(my_sessions, NBR_TYPE_TREE); temp = neighbor_entry(my_sessions, mygroup); temp->subscribed=0; } else { temp = neighbor_entry(my_sessions, mygroup); neighbor_clear(temp->pathtoroot); unwatch(temp->parent); temp->parent = UNDEFINED; } temp->isRoot=true; } //Message: join //Action: forward // //Comments inline. //CASE 1: Forward join because this node sent it //CASE 2: Reject joining node based on loop test. //CASE 3: Accept new child //CASE 4: Reject child because there are too many children. // //Known problems: See FIXME forward join { int mygroup; neighbor_session *temp; //CASE 1: Forward join because this node sent it //upcall_forward will be called on all nodes, including origin node. Thus, if this node //is the origin node, automatically forward it. if (from == myhash) return 0; mygroup = field(group_id); if (neighbor_query( my_sessions, mygroup )) { //retrieve state for use in this transition temp = neighbor_entry( my_sessions, mygroup ); if(source_ == me && !temp->isRoot) { return 0; } if(neighbor_query(temp->pathtoroot, from)) { //CASE 2: Reject joining node based on loop test. //If the joining node is in our path to the root, forward it on, because we cannot allow loops. debug_macro("WARNING: Forwarding a join (fr:%.8x to:%.8x) because it is in our path to root.\n", from, dest); return 0; //Continue forwarding the join, because we cannot take the node. } } else { //add state for the new session, joining the group neighbor_add( my_sessions, mygroup ); upcall_notify(my_sessions, NBR_TYPE_TREE); //notify the upper layer of the state change. //FIXME: If upcall_notify causes the group to be removed -- what happens here? temp = neighbor_entry( my_sessions, mygroup ); temp->subscribed = 0; downcall_ext(temp->isRoot, TEST_OWNER, (void*)&mygroup); if(temp->isRoot <= 0) { route_join(mygroup,mygroup,0,0,-1); dojoin(mygroup); } } if (!neighbor_query( temp->kids, from )) { if(neighbor_space(temp->kids)) { //CASE 3: Accept new child neighbor_add( temp->kids, from ); watch(from); routeIP_membership(from,mygroup,temp->pathtoroot, 0, 0, -1); debug_macro("Scribe: added kid %.8x for group %.8x, now have %d\n", from, mygroup, neighbor_size(temp->kids)); if(true) { //FIXME: To be replced by if(upper layer registered for this type of message) //group_change defined in scribe_ext.h group_change *gchg = new group_change; gchg->groupId = mygroup; gchg->childId = from; gchg->newJoin = true; //Notify upper layer that the child has been added to the given group. upcall_ext(GROUP_CHANGE, (void*)gchg); delete gchg; } } else { //CASE 4: Reject child because there are too many children. route_pushdown(from,temp->ipaddr,temp->kids, 0, 0, -1); } } return 1; } //Message: join //Action: recv // //If a join message is received at a node, this means it has already been forwarded by all // intermediate nodes, including this one. If this happens frequently, it may be an // indicator of a more serious problem. Likely indicates stale state. recv join { debug_macro("Scribe: WARNING: join failed! Join msg not accepted by any node. (fr: %.8x gr: %.8x)\n", from, field(group_id)); } //Message: leave //Action: forward // //Process: //1 - Remove child if found (otherwise stale state somewhere) //2 - Conditionally notify higher layer //3 - If we have no remaining children and are not subscribed, leave the group. (notify upper layer) forward leave { // returns 0 if it should be forwarded int mygroup; neighbor_session *temp; if (from == myhash) return 0; //forward message if we sent it. mygroup = field(group_id); if (neighbor_query(my_sessions, mygroup)) { temp = neighbor_entry(my_sessions, mygroup); //remove "from" from the list of neighbor kids if (neighbor_query(temp->kids, from)) { if(true) { //will be replaced by if(upper layer is subscribed to this event type) group_change *gchg = new group_change; gchg->groupId = mygroup; gchg->childId = from; gchg->newJoin = false; upcall_ext(GROUP_CHANGE, (void*)gchg); delete gchg; } neighbor_remove(temp->kids, from); unwatch(from); } if (neighbor_size(temp->kids) <= 0 && temp->subscribed == 0) { // now we check both for children and our own subscription. if( temp->parent != UNDEFINED ) { routeIP_leave (temp->parent, mygroup, 0, 0, -1); unwatch(temp->parent); temp->parent = UNDEFINED; } neighbor_remove(my_sessions, mygroup); upcall_notify(my_sessions, NBR_TYPE_TREE); } } //else no processing done here (probably stale data) return 1; } //Message: leave //Action: recv // //ASSERTion error. Should not happen. recv leave { ASSERT(false); } //Message: data //Action: recv // //Process: //1 - Update bookkeeping filter //2 - If we do not have state for this tree, assume stale data, and route a leave to the source. //3 - Ask upper layer if we should forward this message (else done) //4 - Reset counters //5 - Deliver message to upper layer //6 - forward message to children. recv data { neighbor_session *tempsess; if(field(comm_type) == COMM_TYPE_MULTICAST) { master.update(); if (dest != field(group_id) && dest != myhash) { debug_macro("Scribe: WARNING: Dropped packet cause dest %.8x, group %.8x, me %.8x\n", dest, field(group_id), myhash); } else { if (neighbor_query(my_sessions, field(group_id))) { if(upcall_forward(field(group_id), msg, size, COMM_TYPE_MULTICAST) == 0) { tempsess = neighbor_entry(my_sessions, field(group_id)); if(tempsess->subscribed) { master_useful.update(); upcall_deliver(msg, size, COMM_TYPE_MULTICAST); } //Update the parent neighbor type, if needed (need lock inside). if(field(group_id) != dest && (tempsess->parent==UNDEFINED || tempsess->parent != from) ) { //tempsess->parent = from; //NOTE: This was to handle the case where a join timed out -- and we were taken by two parents. debug_macro("Scribe: WARNING: Received data from wrong parent. (group: %.8x, expected: %.8x, observed: %.8x)\n", field(group_id), tempsess->parent, from); } foreach_neighbor (neighbor_children*, afriend, tempsess->kids) { routeIP_data( afriend->ipaddr, field(group_id), COMM_TYPE_MULTICAST, field(priority), msg, size, field(priority) ); } } } else { //note: else message is dropped, and a leave is returned to the sender! //WARNING: What if a failure caused this, and I just haven't been notified? if(field(group_id) != dest) routeIP_leave(from, field(group_id), 0, 0, -1); } } } else if(field(comm_type) == COMM_TYPE_COLLECT) { tempsess = neighbor_entry(my_sessions, field(group_id)); if(tempsess != NULL && upcall_forward(tempsess->parent, msg, size, COMM_TYPE_COLLECT) == 0) { if(tempsess->isRoot) { upcall_deliver(msg, size, COMM_TYPE_COLLECT); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -