📄 splitstreamms.mac
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met://// * Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.// * Redistributions in binary form must reproduce the above copyright// notice, this list of conditions and the following disclaimer in// the documentation and/or other materials provided with the// distribution.// * Neither the names of Duke University nor The University of// California, San Diego, nor the names of its contributors// may be used to endorse or promote products derived from// this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./* * SplitStream.mac * * This file contains a rough implementation of the splitstream protocol, * as described in the SOSP paper. * */#include "bitmap_digest.h"#include "scribe-ext.h"//These macros control the additional levels of debugging included in the mac file.//#define SPARE_CAPACITY_TRACE#define DATA_TRACE//#define PUSHDOWN_TRACEprotocol splitstreamms uses scribemsaddressing hashtrace_offconstants { int SS_BITS = 32; //The number of bits in the node identifiers int MAX_SESSIONS = 10; //The maximum number of splitstream multicast sessions to keep state for int STRIPE_SS_BITS = 4; //# of bits in splitstream stripe prefix (part of total bits) int SS_PRINTER_INTERVAL = 1; //The period (in seconds) of the splitstream printer timer int FORWARD_STRIPES = 18; //The number of stripes to forward data for (for experiments with uneven bandwidth capacities -- this should be replaced by a command line option via the Params class) int NUM_STRIPES = 16; //The number of stripes to subscribe to int ANYCAST_ID = 45; //The ID to be used for the spare capacity group //These constants are used to mirror scribe data with -- to micromanage pushdowns int SCRIBE_MAX_CHILDREN = 25; //The max number of children to allow in any scribe tree int SCRIBE_MAX_SESSIONS = 20; //The max number of scribe data trees to allow (not including the spare capacity group)}states { joined ready;}neighbor_types { scrChildren SCRIBE_MAX_CHILDREN; //A data type to store scribe trees in a stripe scrTree SCRIBE_MAX_SESSIONS { //A data type to mirror scribe state (trees) scrChildren kids; //In each tree, store a set of children } splitsets MAX_SESSIONS { //A data type to store splitstream multicast session state in. int current_stripe; //This element stores the next stripe to send data to (round robbin) int sequence_num; //The current sequence number the stripe is sending stripes stripeIds; //The set of stripe ids (scribe trees) for the session (NOTE: this data type defined in scribe-ext.h) } //The following data type is used to prevent duplicate deliveries //(and more importantly, duplicate forwarding along the trees) subsets MAX_SESSIONS { //A data type to store the set of received sequence numbers for each splitstream multicast session bitmap_digest received_filter; //The bitmap_digest tracks received sequence numbers, and auto-ages. }}messages { PRIORITY_LOW data { //This message is for application data (API multicast) and will be delivered to the application. int group_id; //this is the splitstream group id int sequence_num; //for reordering, counting, etc. int scribe_group; //for debugging } PRIORITY_HIGH joinReq { //This message is anycast to request membership in the given stripe. int stripe_id; //The stripe we are trying to join int from_addr; //The origin node of the request (the one requesting membership) }}state_variables { int myhash; //Stores the local node identifier splitsets my_sessions; //The set of multicast sessions this node is sending on subsets my_subscriptions; //The set of multicast sessions this node is receiving on scrTree my_trees; //A mirror of the set of scribe trees (scribe data) timer printer SS_PRINTER_INTERVAL; //This timer is used to periodically print out splitstream state //These two state variables are used for debugging the data received. master is all data which is //received, where master_useful is only the "useful" data (generally only counts the first time a //sequence number is received. adolfo_filter master; adolfo_filter master_useful;}transitions { //init API init (init state, API init call) // //initialize the local node identifier by taking the hash of my IP address. //initialize the outdegree to 0 //schedule the spare capacity join timer, and the printer timer. init API init { myhash = hashof(me); timer_resched(printer, SS_PRINTER_INTERVAL); } init API status_change { if(status == STATUS_READY) { int outdegree = compute_outdegree(); if(FORWARD_STRIPES > outdegree) { #ifdef SPARE_CAPACITY_TRACE debug_macro( "SplitStream initially joining spare capacity group %.8x!\n",ANYCAST_ID); #endif join(ANYCAST_ID); } state_change(joined); } } //timer printer (any state, the printer timer expires) // //If the current time is past the time the streaming is to begin, print out the data received for //graphing and debugging purposes. Only print this out if I am the highest layer protocol. timer printer { extern MACEDON_Agent *globalmacedon; if (globalmacedon != this) return; if ( ( parameters.getint("streaming_time") == -1.0 || curtime > time_booted + parameters.getint("streaming_time")) ) printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", get_hostname(), Scheduler::instance().clock(), pthread_self(), (int) master.get_value(), 0, // need to put in control bw (int) master_useful.get_value(), (int) master.get_value(), (int) master_useful.get_value(), 0 ); } //API create_group (any state, the higher layer/application calls macedon_create_group() ) // //Things to do: //1 - increase the outdegree by NUM_STRIPES //2 - create an array of the keys of the scribe trees for data delivery //3 - call scribe's create group function for all the associated scribe trees //4 - Initialize the stripe number to 0, and the sequence number API create_group { //PROCESS: if( !neighbor_query(my_sessions, groupID) ) { //1 - Generate the set of Scribe Keys based on splitstream group_id ASSERT(NUM_STRIPES < _MAX_STRIPES); int* keys = new int[NUM_STRIPES]; generate_keys(groupID, keys); //2- Call Scribe create_group for each of the scribe keys //3- Store set of scribe keys for splitstream group_id. (neighbor_add). neighbor_add(my_sessions, groupID); for(int i=0; i<NUM_STRIPES; i++) { create_group(keys[i]); neighbor_info(my_sessions,groupID,stripeIds.stripeIds[i]) = keys[i]; } delete[] keys; //4- Initialize strip number to 0, and the sequence number. neighbor_info(my_sessions,groupID,current_stripe) = 0; neighbor_info(my_sessions,groupID,sequence_num) = 0; } } //API join (any state, the higher layer/application calls macedon_join() ) // //PROCESS: //1- Generate the set of Scribe Keys based on splitstream group_id //2- Call Scribe join for each of the scribe keys //3- Create state for the joined tree (initializes the data tracking to 0 received). API join { //PROCESS: if(!neighbor_query(my_subscriptions, groupID)) { //1- Generate the set of Scribe Keys based on splitstream group_id int* keys = new int[NUM_STRIPES]; generate_keys(groupID, keys); //2- Call Scribe join for each of the scribe keys for(int i=0; i<NUM_STRIPES; i++) { join(keys[i]); } delete[] keys; //3- Create state for the joined tree (initializes the data tracking to 0 received). neighbor_add(my_subscriptions, groupID); } } //API leave (any state, the higher layer/application calls macedon_leave() ) // //PROCESS: //1- Generate the set of Scribe Keys based on splitstream group_id //2- Call Scribe join for each of the scribe keys //3- Remove the state for the splitstream session API leave { //PROCESS: if(neighbor_query(my_subscriptions, groupID)) { //1- Generate the set of Scribe Keys based on splitstream group_id int* keys = new int[NUM_STRIPES]; generate_keys(groupID, keys); //2- Call Scribe join for each of the scribe keys for(int i=0; i<NUM_STRIPES; i++) { leave(keys[i]); } delete[] keys; //3- Remove the state for the splitstream session neighbor_remove(my_subscriptions, groupID); } } //API multicast (any state, macedon_multicast() called by the upper layer/application) //Only to be called by the source. // //PROCESS: //1- Get session from neighbor set //2- Call Scribe API multicast for scribe tree at stripe number X. //3- Increment stripe number & sequence_num. API multicast { //PROCESS: master.update(); master_useful.update(); if(neighbor_query(my_sessions, groupID)) { //1- Get session from neighbor set neighbor_splitsets *tempsess = neighbor_entry(my_sessions, groupID); //2- Call Scribe API multicast for scribe tree at stripe number X. multicast_data(tempsess->stripeIds.stripeIds[tempsess->current_stripe], groupID, tempsess->sequence_num, tempsess->stripeIds.stripeIds[tempsess->current_stripe], msg, size, transport); //3- Increment stripe number & sequence_num. tempsess->current_stripe = (tempsess->current_stripe + 1) % NUM_STRIPES; tempsess->sequence_num++; } // else printf( "must call create_group first\n" ); } //API notify (any state, upcall_notify is called by a lower layer) // //Indicates the set of trees changed at the scribe layer. //Actions: Update the set of scribe trees mirrored. If there are too // many, ideally take corrective action, but for now just // display a warning API notify { if(type == NBR_TYPE_TREE) { neighbor_scrTree* temptree; bool anycast_subscribed = false; //We have to know whether we are subscribed to the anycast group, //because it doesn't get added to the mirrored state, but is counted //in "size." Scribe doesn't distinguish between types of trees. for(int i=0; i<size; i++) { if(neighbors[i] != ANYCAST_ID && !neighbor_query(my_trees, neighbors[i])) { neighbor_add(my_trees, neighbors[i]); } else if(neighbors[i] == ANYCAST_ID) { anycast_subscribed = true; } } if((anycast_subscribed && neighbor_size(my_trees) != size+1) || (!anycast_subscribed && neighbor_size(my_trees) != size) ) { bool found; foreach_neighbor(neighbor_scrTree*,temptree,my_trees) { found = false; for(int j=0; j<size; j++) { if(temptree->ipaddr == neighbors[j]) { found = true; break; } } if(!found) { neighbor_remove(my_trees,temptree->ipaddr); } } } if(neighbor_size(my_trees) > NUM_STRIPES + (anycast_subscribed?1:0)) { //FIXME: Take corrective action. debug_macro( "WARNING: Too many trees subscribed!\n"); } } } //forward data // //When data is first received, determine whether or not to forward it (and deliver it) //by checking to see if we've already received it. Note the even when data is dropped, //we update the "master" data collection. forward data { if(neighbor_query(my_subscriptions, field(group_id))) { master.update(); neighbor_subsets* subscription; subscription = neighbor_entry(my_subscriptions, field(group_id)); if(subscription->received_filter.contains(field(sequence_num))) { #ifdef DATA_TRACE debug_macro("WARNING: SS: Dropping msg(seq:%x,ss:%.8x,scr:%.8x) because this sequence number has already been received.\n",field(sequence_num),field(group_id),field(scribe_group)); #endif return 1; } } return 0; } //recv data // //When data is delivered to splitstream, ask upper layer if we should forward it, //update the master_useful records, and deliver it locally. recv data { if(neighbor_query(my_subscriptions, field(group_id))) { neighbor_subsets* subscription; subscription = neighbor_entry(my_subscriptions, field(group_id)); #ifdef DATA_TRACE debug_macro("SS: Rcvd msg(seq:%x,ss:%.8x,scr:%.8x).\n",field(sequence_num),field(group_id),field(scribe_group)); #endif if(upcall_forward(field(group_id), msg, size, COMM_TYPE_MULTICAST)==0 ) { master_useful.update(); #ifdef DATA_TRACE debug_macro("SS: Marking msg(seq:%x,ss:%.8x,scr:%.8x) as received.\n",field(sequence_num),field(group_id),field(scribe_group)); #endif subscription->received_filter.insert(field(sequence_num)); upcall_deliver(msg, size, COMM_TYPE_MULTICAST); } else { #ifdef DATA_TRACE debug_macro("Dropping msg(seq:%x,ss:%.8x,scr:%.8x) because upper layer said not to foward it.\n",field(sequence_num),field(group_id),field(scribe_group)); #endif } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -