⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 splitstreamms.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 2 页
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met:////   * Redistributions of source code must retain the above copyright//     notice, this list of conditions and the following disclaimer.//   * Redistributions in binary form must reproduce the above copyright//     notice, this list of conditions and the following disclaimer in//     the documentation and/or other materials provided with the//     distribution.//   * Neither the names of Duke University nor The University of//     California, San Diego, nor the names of its contributors//     may be used to endorse or promote products derived from//     this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./* * SplitStream.mac *  * This file contains a rough implementation of the splitstream protocol,  * as described in the SOSP paper. * */#include "bitmap_digest.h"#include "scribe-ext.h"//These macros control the additional levels of debugging included in the mac file.//#define SPARE_CAPACITY_TRACE#define DATA_TRACE//#define PUSHDOWN_TRACEprotocol splitstreamms uses scribemsaddressing hashtrace_offconstants {   int SS_BITS = 32;                   //The number of bits in the node identifiers  int MAX_SESSIONS = 10;              //The maximum number of splitstream multicast sessions to keep state for  int STRIPE_SS_BITS = 4;             //# of bits in splitstream stripe prefix (part of total bits)  int SS_PRINTER_INTERVAL = 1;        //The period (in seconds) of the splitstream printer timer  int FORWARD_STRIPES = 18;           //The number of stripes to forward data for (for experiments with uneven bandwidth capacities -- this should be replaced by a command line option via the Params class)  int NUM_STRIPES = 16;               //The number of stripes to subscribe to  int ANYCAST_ID = 45;                //The ID to be used for the spare capacity group  //These constants are used to mirror scribe data with -- to micromanage pushdowns  int SCRIBE_MAX_CHILDREN = 25;       //The max number of children to allow in any scribe tree  int SCRIBE_MAX_SESSIONS = 20;       //The max number of scribe data trees to allow (not including the spare capacity group)}states {  joined ready;}neighbor_types {  scrChildren SCRIBE_MAX_CHILDREN;    //A data type to store scribe trees in a stripe  scrTree SCRIBE_MAX_SESSIONS {       //A data type to mirror scribe state (trees)    scrChildren kids;                 //In each tree, store a set of children  }	  splitsets MAX_SESSIONS {            //A data type to store splitstream multicast session state in.    int current_stripe;               //This element stores the next stripe to send data to (round robbin)    int sequence_num;                 //The current sequence number the stripe is sending    stripes stripeIds;                //The set of stripe ids (scribe trees) for the session (NOTE: this data type defined in scribe-ext.h)  }  //The following data type is used to prevent duplicate deliveries   //(and more importantly, duplicate forwarding along the trees)  subsets MAX_SESSIONS {              //A data type to store the set of received sequence numbers for each splitstream multicast session    bitmap_digest received_filter;      //The bitmap_digest tracks received sequence numbers, and auto-ages.    }}messages {  PRIORITY_LOW data { //This message is for application data (API multicast) and will be delivered to the application.    int group_id; //this is the splitstream group id    int sequence_num; //for reordering, counting, etc.     int scribe_group; //for debugging  }  PRIORITY_HIGH joinReq { //This message is anycast to request membership in the given stripe.    int stripe_id;  //The stripe we are trying to join    int from_addr;  //The origin node of the request (the one requesting membership)  }}state_variables {  int myhash;                         //Stores the local node identifier  splitsets my_sessions;              //The set of multicast sessions this node is sending on  subsets my_subscriptions;           //The set of multicast sessions this node is receiving on  scrTree my_trees;                   //A mirror of the set of scribe trees (scribe data)  timer printer SS_PRINTER_INTERVAL;  //This timer is used to periodically print out splitstream state  //These two state variables are used for debugging the data received.  master is all data which is   //received, where master_useful is only the "useful" data (generally only counts the first time a   //sequence number is received.  adolfo_filter master;  adolfo_filter master_useful;}transitions {  //init API init (init state, API init call)  //  //initialize the local node identifier by taking the hash of my IP address.  //initialize the outdegree to 0  //schedule the spare capacity join timer, and the printer timer.  init API init {     myhash = hashof(me);    timer_resched(printer, SS_PRINTER_INTERVAL);  }  init API status_change {    if(status == STATUS_READY) {      int outdegree = compute_outdegree();      if(FORWARD_STRIPES > outdegree) {        #ifdef SPARE_CAPACITY_TRACE        debug_macro( "SplitStream initially joining spare capacity group %.8x!\n",ANYCAST_ID);        #endif        join(ANYCAST_ID);      }      state_change(joined);    }  }    //timer printer (any state, the printer timer expires)  //  //If the current time is past the time the streaming is to begin, print out the data received for   //graphing and debugging purposes.  Only print this out if I am the highest layer protocol.  timer printer {    extern MACEDON_Agent *globalmacedon;    if (globalmacedon != this)      return;     if ( ( parameters.getint("streaming_time") == -1.0 ||	   curtime > time_booted + parameters.getint("streaming_time")) )      printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", 	     get_hostname(), Scheduler::instance().clock(), pthread_self(),	     (int) master.get_value(),	     0,   // need to put in control bw	     (int) master_useful.get_value(),	     (int) master.get_value(),	     (int) master_useful.get_value(),	     0	     );  }    //API create_group (any state, the higher layer/application calls macedon_create_group() )  //  //Things to do:  //1 - increase the outdegree by NUM_STRIPES  //2 - create an array of the keys of the scribe trees for data delivery  //3 - call scribe's create group function for all the associated scribe trees  //4 - Initialize the stripe number to 0, and the sequence number  API create_group {    //PROCESS:    if( !neighbor_query(my_sessions, groupID) ) {      //1 - Generate the set of Scribe Keys based on splitstream group_id      ASSERT(NUM_STRIPES < _MAX_STRIPES);      int* keys = new int[NUM_STRIPES];      generate_keys(groupID, keys);      //2- Call Scribe create_group for each of the scribe keys      //3- Store set of scribe keys for splitstream group_id. (neighbor_add).      neighbor_add(my_sessions, groupID);      for(int i=0; i<NUM_STRIPES; i++) {        create_group(keys[i]);				        neighbor_info(my_sessions,groupID,stripeIds.stripeIds[i]) = keys[i];       }      delete[] keys;      //4- Initialize strip number to 0, and the sequence number.      neighbor_info(my_sessions,groupID,current_stripe) = 0;      neighbor_info(my_sessions,groupID,sequence_num) = 0;    }  }  //API join (any state, the higher layer/application calls macedon_join() )  //  //PROCESS:  //1- Generate the set of Scribe Keys based on splitstream group_id  //2- Call Scribe join for each of the scribe keys  //3- Create state for the joined tree (initializes the data tracking to 0 received).  API join {    //PROCESS:    if(!neighbor_query(my_subscriptions, groupID)) {      //1- Generate the set of Scribe Keys based on splitstream group_id      int* keys = new int[NUM_STRIPES];      generate_keys(groupID, keys);      //2- Call Scribe join for each of the scribe keys      for(int i=0; i<NUM_STRIPES; i++) {        join(keys[i]);      }      delete[] keys;      //3- Create state for the joined tree (initializes the data tracking to 0 received).      neighbor_add(my_subscriptions, groupID);    }  }  //API leave (any state, the higher layer/application calls macedon_leave() )  //  //PROCESS:  //1- Generate the set of Scribe Keys based on splitstream group_id  //2- Call Scribe join for each of the scribe keys  //3- Remove the state for the splitstream session  API leave {    //PROCESS:    if(neighbor_query(my_subscriptions, groupID)) {      //1- Generate the set of Scribe Keys based on splitstream group_id      int* keys = new int[NUM_STRIPES];      generate_keys(groupID, keys);      //2- Call Scribe join for each of the scribe keys      for(int i=0; i<NUM_STRIPES; i++) {        leave(keys[i]);      }      delete[] keys;      //3- Remove the state for the splitstream session      neighbor_remove(my_subscriptions, groupID);    }  }  //API multicast (any state, macedon_multicast() called by the upper layer/application)  //Only to be called by the source.  //  //PROCESS:  //1- Get session from neighbor set  //2- Call Scribe API multicast for scribe tree at stripe number X.  //3- Increment stripe number & sequence_num.  API multicast {     //PROCESS:    master.update();    master_useful.update();    if(neighbor_query(my_sessions, groupID)) {      //1- Get session from neighbor set      neighbor_splitsets *tempsess = neighbor_entry(my_sessions, groupID);      //2- Call Scribe API multicast for scribe tree at stripe number X.      multicast_data(tempsess->stripeIds.stripeIds[tempsess->current_stripe], groupID, tempsess->sequence_num, tempsess->stripeIds.stripeIds[tempsess->current_stripe], msg, size, transport);      //3- Increment stripe number & sequence_num.      tempsess->current_stripe = (tempsess->current_stripe + 1) % NUM_STRIPES;      tempsess->sequence_num++;    } // else printf( "must call create_group first\n" );  }  //API notify (any state, upcall_notify is called by a lower layer)  //  //Indicates the set of trees changed at the scribe layer.  //Actions: Update the set of scribe trees mirrored.  If there are too  //         many, ideally take corrective action, but for now just   //         display a warning  API notify {    if(type == NBR_TYPE_TREE) {      neighbor_scrTree* temptree;      bool anycast_subscribed = false; //We have to know whether we are subscribed to the anycast group,                                        //because it doesn't get added to the mirrored state, but is counted                                       //in "size."  Scribe doesn't distinguish between types of trees.      for(int i=0; i<size; i++) {        if(neighbors[i] != ANYCAST_ID && !neighbor_query(my_trees, neighbors[i])) {          neighbor_add(my_trees, neighbors[i]);        } else if(neighbors[i] == ANYCAST_ID) {          anycast_subscribed = true;        }      }      if((anycast_subscribed && neighbor_size(my_trees) != size+1) ||          (!anycast_subscribed && neighbor_size(my_trees) != size)    ) {        bool found;        foreach_neighbor(neighbor_scrTree*,temptree,my_trees) {          found = false;          for(int j=0; j<size; j++) {            if(temptree->ipaddr == neighbors[j]) {              found = true;              break;            }          }          if(!found) {            neighbor_remove(my_trees,temptree->ipaddr);          }        }      }      if(neighbor_size(my_trees) > NUM_STRIPES + (anycast_subscribed?1:0)) {        //FIXME: Take corrective action.        debug_macro( "WARNING: Too many trees subscribed!\n");      }    }  }  //forward data  //  //When data is first received, determine whether or not to forward it (and deliver it)  //by checking to see if we've already received it.  Note the even when data is dropped,   //we update the "master" data collection.  forward data {    if(neighbor_query(my_subscriptions, field(group_id))) {      master.update();       neighbor_subsets* subscription;      subscription = neighbor_entry(my_subscriptions, field(group_id));      if(subscription->received_filter.contains(field(sequence_num))) {        #ifdef DATA_TRACE        debug_macro("WARNING: SS: Dropping msg(seq:%x,ss:%.8x,scr:%.8x) because this sequence number has already been received.\n",field(sequence_num),field(group_id),field(scribe_group));        #endif        return 1;      }    }    return 0;  }  //recv data  //  //When data is delivered to splitstream, ask upper layer if we should forward it,  //update the master_useful records, and deliver it locally.  recv data {    if(neighbor_query(my_subscriptions, field(group_id))) {      neighbor_subsets* subscription;      subscription = neighbor_entry(my_subscriptions, field(group_id));      #ifdef DATA_TRACE      debug_macro("SS: Rcvd msg(seq:%x,ss:%.8x,scr:%.8x).\n",field(sequence_num),field(group_id),field(scribe_group));      #endif      if(upcall_forward(field(group_id), msg, size, COMM_TYPE_MULTICAST)==0 ) {        master_useful.update();        #ifdef DATA_TRACE        debug_macro("SS: Marking msg(seq:%x,ss:%.8x,scr:%.8x) as received.\n",field(sequence_num),field(group_id),field(scribe_group));        #endif        subscription->received_filter.insert(field(sequence_num));        upcall_deliver(msg, size, COMM_TYPE_MULTICAST);      }      else {        #ifdef DATA_TRACE        debug_macro("Dropping msg(seq:%x,ss:%.8x,scr:%.8x) because upper layer said not to foward it.\n",field(sequence_num),field(group_id),field(scribe_group));        #endif      }    }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -