📄 splitstream.mac
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met://// * Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.// * Redistributions in binary form must reproduce the above copyright// notice, this list of conditions and the following disclaimer in// the documentation and/or other materials provided with the// distribution.// * Neither the names of Duke University nor The University of// California, San Diego, nor the names of its contributors// may be used to endorse or promote products derived from// this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./* * SplitStream.mac * * Chip Killian * * This file contains a rough implementation of the splitstream protocol, * as described in the SOSP paper. * * 20031214: Updating the file to include changes described to me at the * SOSP conference by Ansley Post. * */#include "bitmap_digest.h"#include "scribe-ext.h"//These macros control the additional levels of debugging included in the mac file.//#define SPARE_CAPACITY_TRACE//#define DATA_TRACE//#define PUSHDOWN_TRACEprotocol splitstream uses scribeaddressing hashtrace_offconstants { int SS_BITS = 32; //The number of bits in the node identifiers int MAX_SESSIONS = 10; //The maximum number of splitstream multicast sessions to keep state for int STRIPE_SS_BITS = 4; //# of bits in splitstream stripe prefix (part of total bits) int SS_PRINTER_INTERVAL = 1; //The period (in seconds) of the splitstream printer timer int FORWARD_STRIPES = 18; //The number of stripes to forward data for (for experiments with uneven bandwidth capacities -- this should be replaced by a command line option via the Params class) int NUM_STRIPES = 16; //The number of stripes to subscribe to //These constants are used to mirror scribe data with -- to micromanage pushdowns int SCRIBE_MAX_CHILDREN = 25; //The max number of children to allow in any scribe tree int SCRIBE_MAX_SESSIONS = 20; //The max number of scribe data trees to allow }neighbor_types { scrChildren SCRIBE_MAX_CHILDREN; //A data type to store scribe trees in a stripe scrTree SCRIBE_MAX_SESSIONS { //A data type to mirror scribe state (trees) scrChildren kids; //In each tree, store a set of children } splitsets MAX_SESSIONS { //A data type to store splitstream multicast session state in. int current_stripe; //This element stores the next stripe to send data to (round robbin) int sequence_num; //The current sequence number the stripe is sending stripes stripeIds; //The set of stripe ids (scribe trees) for the session (NOTE: this data type defined in scribe-ext.h) } //The following data type is used to prevent duplicate deliveries //(and more importantly, duplicate forwarding along the trees) subsets MAX_SESSIONS { //A data type to store the set of received sequence numbers for each splitstream multicast session bitmap_digest received_filter; //The bitmap_digest tracks received sequence numbers, and auto-ages. }}messages { PRIORITY_LOW data { //This message is for application data (API multicast) and will be delivered to the application. int group_id; //this is the splitstream group id int sequence_num; //for reordering, counting, etc. int scribe_group; //for debugging } PRIORITY_HIGH joinReq { //This message is anycast to request membership in the destination stripe. } PRIORITY_MED unicast_data { }}state_variables { int myhash; //Stores the local node identifier splitsets my_sessions; //The set of multicast sessions this node is sending on subsets my_subscriptions; //The set of multicast sessions this node is receiving on scrTree my_trees; //A mirror of the set of scribe trees (scribe data) timer printer SS_PRINTER_INTERVAL; //This timer is used to periodically print out splitstream state //These two state variables are used for debugging the data received. master is all data which is //received, where master_useful is only the "useful" data (generally only counts the first time a //sequence number is received. adolfo_filter master; adolfo_filter master_useful;}transitions { //init API init (init state, API init call) // //initialize the local node identifier by taking the hash of my IP address. //initialize the outdegree to 0 //schedule the printer timer. init API init { myhash = hashof(me); timer_resched(printer, SS_PRINTER_INTERVAL); } //timer printer (any state, the printer timer expires) // //If the current time is past the time the streaming is to begin, print out the data received for //graphing and debugging purposes. Only print this out if I am the highest layer protocol. timer printer { extern MACEDON_Agent *globalmacedon; if (globalmacedon != this) return; if ( ( parameters.getint("streaming_time") == -1.0 || curtime > time_booted + parameters.getint("streaming_time")) ) printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", get_hostname(), Scheduler::instance().clock(), pthread_self(), (int) master.get_value(), 0, // need to put in control bw (int) master_useful.get_value(), (int) master.get_value(), (int) master_useful.get_value(), 0 ); } //API create_group (any state, the higher layer/application calls macedon_create_group() ) // //Things to do: //1 - increase the outdegree by NUM_STRIPES //2 - create an array of the keys of the scribe trees for data delivery //3 - call scribe's create group function for all the associated scribe trees //4 - Initialize the stripe number to 0, and the sequence number API create_group { //PROCESS: if( !neighbor_query(my_sessions, groupID) ) { //1 - Generate the set of Scribe Keys based on splitstream group_id ASSERT(NUM_STRIPES < _MAX_STRIPES); int* keys = new int[NUM_STRIPES]; generate_keys(groupID, keys); //2- Call Scribe create_group for each of the scribe keys //3- Store set of scribe keys for splitstream group_id. (neighbor_add). neighbor_add(my_sessions, groupID); for(int i=0; i<NUM_STRIPES; i++) { create_group(keys[i]); neighbor_info(my_sessions,groupID,stripeIds.stripeIds[i]) = keys[i]; } delete[] keys; //4- Initialize strip number to 0, and the sequence number. neighbor_info(my_sessions,groupID,current_stripe) = 0; neighbor_info(my_sessions,groupID,sequence_num) = 0; } } //API join (any state, the higher layer/application calls macedon_join() ) // //PROCESS: //1- Generate the set of Scribe Keys based on splitstream group_id //2- Call Scribe join for each of the scribe keys //3- Create state for the joined tree (initializes the data tracking to 0 received). API join { //PROCESS: if(!neighbor_query(my_subscriptions, groupID)) { //1- Generate the set of Scribe Keys based on splitstream group_id int* keys = new int[NUM_STRIPES]; generate_keys(groupID, keys); //2- Call Scribe join for each of the scribe keys for(int i=0; i<NUM_STRIPES; i++) { join(keys[i]); } delete[] keys; //3- Create state for the joined tree (initializes the data tracking to 0 received). neighbor_add(my_subscriptions, groupID); } } //API leave (any state, the higher layer/application calls macedon_leave() ) // //PROCESS: //1- Generate the set of Scribe Keys based on splitstream group_id //2- Call Scribe join for each of the scribe keys //3- Remove the state for the splitstream session API leave { //PROCESS: if(neighbor_query(my_subscriptions, groupID)) { //1- Generate the set of Scribe Keys based on splitstream group_id int* keys = new int[NUM_STRIPES]; generate_keys(groupID, keys); //2- Call Scribe join for each of the scribe keys for(int i=0; i<NUM_STRIPES; i++) { leave(keys[i]); } delete[] keys; //3- Remove the state for the splitstream session neighbor_remove(my_subscriptions, groupID); } } //API multicast (any state, macedon_multicast() called by the upper layer/application) //Only to be called by the source. // //PROCESS: //1- Get session from neighbor set //2- Call Scribe API multicast for scribe tree at stripe number X. //3- Increment stripe number & sequence_num. API multicast { return_code = 0; //PROCESS: master.update(); master_useful.update(); if(neighbor_query(my_sessions, groupID)) { //1- Get session from neighbor set neighbor_splitsets *tempsess = neighbor_entry(my_sessions, groupID); //2- Call Scribe API multicast for scribe tree at stripe number X. multicast_data(tempsess->stripeIds.stripeIds[tempsess->current_stripe], groupID, tempsess->sequence_num, tempsess->stripeIds.stripeIds[tempsess->current_stripe], msg, size, transport); return_code = macedon_sendret; //3- Increment stripe number & sequence_num. tempsess->current_stripe = (tempsess->current_stripe + 1) % NUM_STRIPES; tempsess->sequence_num++; } // else printf( "must call create_group first\n" ); } API route { route_unicast_data(dest, msg, size, transport); return_code = macedon_sendret; } API routeIP { routeIP_unicast_data(dest, msg, size, transport); return_code = macedon_sendret; } forward unicast_data { return upcall_forward(dest, msg, size, COMM_TYPE_UNICAST); } recv unicast_data { upcall_deliver(msg, size, COMM_TYPE_UNICAST); } //API notify (any state, upcall_notify is called by a lower layer) // //Indicates the set of trees changed at the scribe layer. //Actions: Update the set of scribe trees mirrored. If there are too // many, ideally take corrective action, but for now just // display a warning API notify { if(type == NBR_TYPE_TREE) { neighbor_scrTree* temptree; for(int i=0; i<size; i++) { if(!neighbor_query(my_trees, neighbors[i])) { neighbor_add(my_trees, neighbors[i]); } } if( neighbor_size(my_trees) != size ) { bool found; foreach_neighbor(neighbor_scrTree*,temptree,my_trees) { found = false; for(int j=0; j<size; j++) { if(temptree->ipaddr == neighbors[j]) { found = true; break; } } if(!found) { neighbor_remove(my_trees,temptree->ipaddr); } } } } } //forward data // //When data is first received, determine whether or not to forward it (and deliver it) //by checking to see if we've already received it. Note the even when data is dropped, //we update the "master" data collection. forward data { if(neighbor_query(my_subscriptions, field(group_id))) { master.update(); neighbor_subsets* subscription; subscription = neighbor_entry(my_subscriptions, field(group_id)); if(subscription->received_filter.contains(field(sequence_num))) { #ifdef DATA_TRACE sprintf(trace_buf_,"WARNING: SS: Dropping msg(seq:%x,ss:%.8x,scr:%.8x) because this sequence number has already been received.\n",field(sequence_num),field(group_id),field(scribe_group)); trace_print(); #endif return 1; } } return 0; } //recv data // //When data is delivered to splitstream, ask upper layer if we should forward it, //update the master_useful records, and deliver it locally. recv data { if(neighbor_query(my_subscriptions, field(group_id))) { neighbor_subsets* subscription; subscription = neighbor_entry(my_subscriptions, field(group_id)); #ifdef DATA_TRACE sprintf(trace_buf_,"SS: Rcvd msg(seq:%x,ss:%.8x,scr:%.8x).\n",field(sequence_num),field(group_id),field(scribe_group)); trace_print(); #endif if(upcall_forward(field(group_id), msg, size, COMM_TYPE_MULTICAST)==0 ) { master_useful.update(); #ifdef DATA_TRACE sprintf(trace_buf_,"SS: Marking msg(seq:%x,ss:%.8x,scr:%.8x) as received.\n",field(sequence_num),field(group_id),field(scribe_group)); trace_print(); #endif subscription->received_filter.insert(field(sequence_num)); upcall_deliver(msg, size, COMM_TYPE_MULTICAST); } else { #ifdef DATA_TRACE sprintf(trace_buf_,"Dropping msg(seq:%x,ss:%.8x,scr:%.8x) because upper layer said not to foward it.\n",field(sequence_num),field(group_id),field(scribe_group));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -